diff --git a/src/database/key_value/rooms/auth_chain.rs b/src/database/key_value/rooms/auth_chain.rs index 197ce844..57dbb147 100644 --- a/src/database/key_value/rooms/auth_chain.rs +++ b/src/database/key_value/rooms/auth_chain.rs @@ -1,358 +1,24 @@ - - /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. - #[tracing::instrument(skip(self))] - pub fn load_shortstatehash_info( - &self, - shortstatehash: u64, - ) -> Result< - Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - > { - if let Some(r) = self - .stateinfo_cache - .lock() - .unwrap() - .get_mut(&shortstatehash) - { - return Ok(r.clone()); - } - - let value = self - .shortstatehash_statediff - .get(&shortstatehash.to_be_bytes())? - .ok_or_else(|| Error::bad_database("State hash does not exist"))?; - let parent = - utils::u64_from_bytes(&value[0..size_of::()]).expect("bytes have right length"); - - let mut add_mode = true; - let mut added = HashSet::new(); - let mut removed = HashSet::new(); - - let mut i = size_of::(); - while let Some(v) = value.get(i..i + 2 * size_of::()) { - if add_mode && v.starts_with(&0_u64.to_be_bytes()) { - add_mode = false; - i += size_of::(); - continue; - } - if add_mode { - added.insert(v.try_into().expect("we checked the size above")); - } else { - removed.insert(v.try_into().expect("we checked the size above")); - } - i += 2 * size_of::(); - } - - if parent != 0_u64 { - let mut response = self.load_shortstatehash_info(parent)?; - let mut state = response.last().unwrap().1.clone(); - state.extend(added.iter().copied()); - for r in &removed { - state.remove(r); - } - - response.push((shortstatehash, state, added, removed)); - - Ok(response) - } else { - let response = vec![(shortstatehash, added.clone(), added, removed)]; - self.stateinfo_cache - .lock() - .unwrap() - .insert(shortstatehash, response.clone()); - Ok(response) - } - } - - pub fn compress_state_event( - &self, - shortstatekey: u64, - event_id: &EventId, - globals: &super::globals::Globals, - ) -> Result { - let mut v = shortstatekey.to_be_bytes().to_vec(); - v.extend_from_slice( - &self - .get_or_create_shorteventid(event_id, globals)? - .to_be_bytes(), - ); - Ok(v.try_into().expect("we checked the size above")) - } - - /// Returns shortstatekey, event id - pub fn parse_compressed_state_event( - &self, - compressed_event: CompressedStateEvent, - ) -> Result<(u64, Arc)> { - Ok(( - utils::u64_from_bytes(&compressed_event[0..size_of::()]) - .expect("bytes have right length"), - self.get_eventid_from_short( - utils::u64_from_bytes(&compressed_event[size_of::()..]) - .expect("bytes have right length"), - )?, - )) - } - - /// Creates a new shortstatehash that often is just a diff to an already existing - /// shortstatehash and therefore very efficient. - /// - /// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer - /// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0 - /// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's - /// based on layer n-2. If that layer is also too big, it will recursively fix above layers too. - /// - /// * `shortstatehash` - Shortstatehash of this state - /// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid - /// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid - /// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer - /// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer - #[tracing::instrument(skip( - self, - statediffnew, - statediffremoved, - diff_to_sibling, - parent_states - ))] - pub fn save_state_from_diff( - &self, - shortstatehash: u64, - statediffnew: HashSet, - statediffremoved: HashSet, - diff_to_sibling: usize, - mut parent_states: Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - ) -> Result<()> { - let diffsum = statediffnew.len() + statediffremoved.len(); - - if parent_states.len() > 3 { - // Number of layers - // To many layers, we have to go deeper - let parent = parent_states.pop().unwrap(); - - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } - - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - - return Ok(()); - } - - if parent_states.is_empty() { - // There is no parent layer, create a new state - let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - warn!("Tried to create new state with removals"); - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - - return Ok(()); - }; - - // Else we have two options. - // 1. We add the current diff on top of the parent layer. - // 2. We replace a layer above - - let parent = parent_states.pop().unwrap(); - let parent_diff = parent.2.len() + parent.3.len(); - - if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { - // Diff too big, we replace above layer(s) - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } - - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - } else { - // Diff small enough, we add diff as layer on top of parent - let mut value = parent.0.to_be_bytes().to_vec(); - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - value.extend_from_slice(&0_u64.to_be_bytes()); - for removed in &statediffremoved { - value.extend_from_slice(&removed[..]); - } - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - } - - Ok(()) - } - - /// Returns the new shortstatehash - pub fn save_state( - room_id: &RoomId, - new_state_ids_compressed: HashSet, - ) -> Result<(u64, - HashSet, // added - HashSet)> // removed - { - let previous_shortstatehash = self.d.current_shortstatehash(room_id)?; - - let state_hash = self.calculate_hash( - &new_state_ids_compressed - .iter() - .map(|bytes| &bytes[..]) - .collect::>(), - ); - - let (new_shortstatehash, already_existed) = - self.get_or_create_shortstatehash(&state_hash, &db.globals)?; - - if Some(new_shortstatehash) == previous_shortstatehash { - return Ok(()); - } - - let states_parents = previous_shortstatehash - .map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; - - let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() - { - let statediffnew: HashSet<_> = new_state_ids_compressed - .difference(&parent_stateinfo.1) - .copied() - .collect(); - - let statediffremoved: HashSet<_> = parent_stateinfo - .1 - .difference(&new_state_ids_compressed) - .copied() - .collect(); - - (statediffnew, statediffremoved) - } else { - (new_state_ids_compressed, HashSet::new()) - }; - - if !already_existed { - self.save_state_from_diff( - new_shortstatehash, - statediffnew.clone(), - statediffremoved, - 2, // every state change is 2 event changes on average - states_parents, - )?; - }; - - Ok((new_shortstatehash, statediffnew, statediffremoved)) - } - - #[tracing::instrument(skip(self))] - pub fn get_auth_chain_from_cache<'a>( - &'a self, - key: &[u64], - ) -> Result>>> { - // Check RAM cache - if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) { - return Ok(Some(Arc::clone(result))); - } - - // Check DB cache - if key.len() == 1 { - if let Some(chain) = - self.shorteventid_authchain - .get(&key[0].to_be_bytes())? - .map(|chain| { - chain - .chunks_exact(size_of::()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) - .collect() +impl service::room::auth_chain::Data for KeyValueDatabase { + fn get_cached_eventid_authchain<'a>() -> Result> { + self.shorteventid_authchain + .get(&shorteventid.to_be_bytes())? + .map(|chain| { + chain + .chunks_exact(size_of::()) + .map(|chunk| { + utils::u64_from_bytes(chunk).expect("byte length is correct") }) - { - let chain = Arc::new(chain); - - // Cache in RAM - self.auth_chain_cache - .lock() - .unwrap() - .insert(vec![key[0]], Arc::clone(&chain)); - - return Ok(Some(chain)); - } - } - - Ok(None) + .collect() + }) } - #[tracing::instrument(skip(self))] - pub fn cache_auth_chain(&self, key: Vec, chain: Arc>) -> Result<()> { - // Persist in db - if key.len() == 1 { - self.shorteventid_authchain.insert( - &key[0].to_be_bytes(), - &chain - .iter() - .flat_map(|s| s.to_be_bytes().to_vec()) - .collect::>(), - )?; - } - - // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, chain); - - Ok(()) + fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet) -> Result<()> { + shorteventid_authchain.insert( + &shorteventid.to_be_bytes(), + &auth_chain + .iter() + .flat_map(|s| s.to_be_bytes().to_vec()) + .collect::>(), + ) } +} diff --git a/src/database/key_value/rooms/state_compressor.rs b/src/database/key_value/rooms/state_compressor.rs index 197ce844..71a2f3a0 100644 --- a/src/database/key_value/rooms/state_compressor.rs +++ b/src/database/key_value/rooms/state_compressor.rs @@ -1,26 +1,5 @@ - - /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. - #[tracing::instrument(skip(self))] - pub fn load_shortstatehash_info( - &self, - shortstatehash: u64, - ) -> Result< - Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - > { - if let Some(r) = self - .stateinfo_cache - .lock() - .unwrap() - .get_mut(&shortstatehash) - { - return Ok(r.clone()); - } - +impl service::room::state_compressor::Data for KeyValueDatabase { + fn get_statediff(shortstatehash: u64) -> Result { let value = self .shortstatehash_statediff .get(&shortstatehash.to_be_bytes())? @@ -47,312 +26,23 @@ i += 2 * size_of::(); } - if parent != 0_u64 { - let mut response = self.load_shortstatehash_info(parent)?; - let mut state = response.last().unwrap().1.clone(); - state.extend(added.iter().copied()); - for r in &removed { - state.remove(r); - } - - response.push((shortstatehash, state, added, removed)); - - Ok(response) - } else { - let response = vec![(shortstatehash, added.clone(), added, removed)]; - self.stateinfo_cache - .lock() - .unwrap() - .insert(shortstatehash, response.clone()); - Ok(response) - } - } - - pub fn compress_state_event( - &self, - shortstatekey: u64, - event_id: &EventId, - globals: &super::globals::Globals, - ) -> Result { - let mut v = shortstatekey.to_be_bytes().to_vec(); - v.extend_from_slice( - &self - .get_or_create_shorteventid(event_id, globals)? - .to_be_bytes(), - ); - Ok(v.try_into().expect("we checked the size above")) - } - - /// Returns shortstatekey, event id - pub fn parse_compressed_state_event( - &self, - compressed_event: CompressedStateEvent, - ) -> Result<(u64, Arc)> { - Ok(( - utils::u64_from_bytes(&compressed_event[0..size_of::()]) - .expect("bytes have right length"), - self.get_eventid_from_short( - utils::u64_from_bytes(&compressed_event[size_of::()..]) - .expect("bytes have right length"), - )?, - )) + StateDiff { parent, added, removed } } - /// Creates a new shortstatehash that often is just a diff to an already existing - /// shortstatehash and therefore very efficient. - /// - /// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer - /// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0 - /// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's - /// based on layer n-2. If that layer is also too big, it will recursively fix above layers too. - /// - /// * `shortstatehash` - Shortstatehash of this state - /// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid - /// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid - /// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer - /// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer - #[tracing::instrument(skip( - self, - statediffnew, - statediffremoved, - diff_to_sibling, - parent_states - ))] - pub fn save_state_from_diff( - &self, - shortstatehash: u64, - statediffnew: HashSet, - statediffremoved: HashSet, - diff_to_sibling: usize, - mut parent_states: Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - ) -> Result<()> { - let diffsum = statediffnew.len() + statediffremoved.len(); - - if parent_states.len() > 3 { - // Number of layers - // To many layers, we have to go deeper - let parent = parent_states.pop().unwrap(); - - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } - - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - - return Ok(()); + fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()> { + let mut value = diff.parent.to_be_bytes().to_vec(); + for new in &diff.new { + value.extend_from_slice(&new[..]); } - if parent_states.is_empty() { - // There is no parent layer, create a new state - let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - warn!("Tried to create new state with removals"); + if !diff.removed.is_empty() { + value.extend_from_slice(&0_u64.to_be_bytes()); + for removed in &diff.removed { + value.extend_from_slice(&removed[..]); } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - - return Ok(()); - }; - - // Else we have two options. - // 1. We add the current diff on top of the parent layer. - // 2. We replace a layer above - - let parent = parent_states.pop().unwrap(); - let parent_diff = parent.2.len() + parent.3.len(); - - if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { - // Diff too big, we replace above layer(s) - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } - - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - } else { - // Diff small enough, we add diff as layer on top of parent - let mut value = parent.0.to_be_bytes().to_vec(); - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - value.extend_from_slice(&0_u64.to_be_bytes()); - for removed in &statediffremoved { - value.extend_from_slice(&removed[..]); - } - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - } - - Ok(()) - } - - /// Returns the new shortstatehash - pub fn save_state( - room_id: &RoomId, - new_state_ids_compressed: HashSet, - ) -> Result<(u64, - HashSet, // added - HashSet)> // removed - { - let previous_shortstatehash = self.d.current_shortstatehash(room_id)?; - - let state_hash = self.calculate_hash( - &new_state_ids_compressed - .iter() - .map(|bytes| &bytes[..]) - .collect::>(), - ); - - let (new_shortstatehash, already_existed) = - self.get_or_create_shortstatehash(&state_hash, &db.globals)?; - - if Some(new_shortstatehash) == previous_shortstatehash { - return Ok(()); - } - - let states_parents = previous_shortstatehash - .map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; - - let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() - { - let statediffnew: HashSet<_> = new_state_ids_compressed - .difference(&parent_stateinfo.1) - .copied() - .collect(); - - let statediffremoved: HashSet<_> = parent_stateinfo - .1 - .difference(&new_state_ids_compressed) - .copied() - .collect(); - - (statediffnew, statediffremoved) - } else { - (new_state_ids_compressed, HashSet::new()) - }; - - if !already_existed { - self.save_state_from_diff( - new_shortstatehash, - statediffnew.clone(), - statediffremoved, - 2, // every state change is 2 event changes on average - states_parents, - )?; - }; - - Ok((new_shortstatehash, statediffnew, statediffremoved)) - } - - #[tracing::instrument(skip(self))] - pub fn get_auth_chain_from_cache<'a>( - &'a self, - key: &[u64], - ) -> Result>>> { - // Check RAM cache - if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) { - return Ok(Some(Arc::clone(result))); } - // Check DB cache - if key.len() == 1 { - if let Some(chain) = - self.shorteventid_authchain - .get(&key[0].to_be_bytes())? - .map(|chain| { - chain - .chunks_exact(size_of::()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) - .collect() - }) - { - let chain = Arc::new(chain); - - // Cache in RAM - self.auth_chain_cache - .lock() - .unwrap() - .insert(vec![key[0]], Arc::clone(&chain)); - - return Ok(Some(chain)); - } - } - - Ok(None) - } - - #[tracing::instrument(skip(self))] - pub fn cache_auth_chain(&self, key: Vec, chain: Arc>) -> Result<()> { - // Persist in db - if key.len() == 1 { - self.shorteventid_authchain.insert( - &key[0].to_be_bytes(), - &chain - .iter() - .flat_map(|s| s.to_be_bytes().to_vec()) - .collect::>(), - )?; - } - - // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, chain); - - Ok(()) + self.shortstatehash_statediff + .insert(&shortstatehash.to_be_bytes(), &value)?; } +} diff --git a/src/service/rooms/auth_chain/data.rs b/src/service/rooms/auth_chain/data.rs new file mode 100644 index 00000000..d8fde958 --- /dev/null +++ b/src/service/rooms/auth_chain/data.rs @@ -0,0 +1,4 @@ +pub trait Data { + fn get_cached_eventid_authchain<'a>() -> Result>; + fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet) -> Result>; +} diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 197ce844..dfc289f3 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -1,327 +1,27 @@ +mod data; +pub use data::Data; - /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. - #[tracing::instrument(skip(self))] - pub fn load_shortstatehash_info( - &self, - shortstatehash: u64, - ) -> Result< - Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - > { - if let Some(r) = self - .stateinfo_cache - .lock() - .unwrap() - .get_mut(&shortstatehash) - { - return Ok(r.clone()); - } - - let value = self - .shortstatehash_statediff - .get(&shortstatehash.to_be_bytes())? - .ok_or_else(|| Error::bad_database("State hash does not exist"))?; - let parent = - utils::u64_from_bytes(&value[0..size_of::()]).expect("bytes have right length"); - - let mut add_mode = true; - let mut added = HashSet::new(); - let mut removed = HashSet::new(); - - let mut i = size_of::(); - while let Some(v) = value.get(i..i + 2 * size_of::()) { - if add_mode && v.starts_with(&0_u64.to_be_bytes()) { - add_mode = false; - i += size_of::(); - continue; - } - if add_mode { - added.insert(v.try_into().expect("we checked the size above")); - } else { - removed.insert(v.try_into().expect("we checked the size above")); - } - i += 2 * size_of::(); - } - - if parent != 0_u64 { - let mut response = self.load_shortstatehash_info(parent)?; - let mut state = response.last().unwrap().1.clone(); - state.extend(added.iter().copied()); - for r in &removed { - state.remove(r); - } - - response.push((shortstatehash, state, added, removed)); - - Ok(response) - } else { - let response = vec![(shortstatehash, added.clone(), added, removed)]; - self.stateinfo_cache - .lock() - .unwrap() - .insert(shortstatehash, response.clone()); - Ok(response) - } - } - - pub fn compress_state_event( - &self, - shortstatekey: u64, - event_id: &EventId, - globals: &super::globals::Globals, - ) -> Result { - let mut v = shortstatekey.to_be_bytes().to_vec(); - v.extend_from_slice( - &self - .get_or_create_shorteventid(event_id, globals)? - .to_be_bytes(), - ); - Ok(v.try_into().expect("we checked the size above")) - } - - /// Returns shortstatekey, event id - pub fn parse_compressed_state_event( - &self, - compressed_event: CompressedStateEvent, - ) -> Result<(u64, Arc)> { - Ok(( - utils::u64_from_bytes(&compressed_event[0..size_of::()]) - .expect("bytes have right length"), - self.get_eventid_from_short( - utils::u64_from_bytes(&compressed_event[size_of::()..]) - .expect("bytes have right length"), - )?, - )) - } - - /// Creates a new shortstatehash that often is just a diff to an already existing - /// shortstatehash and therefore very efficient. - /// - /// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer - /// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0 - /// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's - /// based on layer n-2. If that layer is also too big, it will recursively fix above layers too. - /// - /// * `shortstatehash` - Shortstatehash of this state - /// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid - /// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid - /// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer - /// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer - #[tracing::instrument(skip( - self, - statediffnew, - statediffremoved, - diff_to_sibling, - parent_states - ))] - pub fn save_state_from_diff( - &self, - shortstatehash: u64, - statediffnew: HashSet, - statediffremoved: HashSet, - diff_to_sibling: usize, - mut parent_states: Vec<( - u64, // sstatehash - HashSet, // full state - HashSet, // added - HashSet, // removed - )>, - ) -> Result<()> { - let diffsum = statediffnew.len() + statediffremoved.len(); - - if parent_states.len() > 3 { - // Number of layers - // To many layers, we have to go deeper - let parent = parent_states.pop().unwrap(); - - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } - - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - - return Ok(()); - } - - if parent_states.is_empty() { - // There is no parent layer, create a new state - let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - warn!("Tried to create new state with removals"); - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - - return Ok(()); - }; - - // Else we have two options. - // 1. We add the current diff on top of the parent layer. - // 2. We replace a layer above - - let parent = parent_states.pop().unwrap(); - let parent_diff = parent.2.len() + parent.3.len(); - - if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { - // Diff too big, we replace above layer(s) - let mut parent_new = parent.2; - let mut parent_removed = parent.3; - - for removed in statediffremoved { - if !parent_new.remove(&removed) { - // It was not added in the parent and we removed it - parent_removed.insert(removed); - } - // Else it was added in the parent and we removed it again. We can forget this change - } +use crate::service::*; - for new in statediffnew { - if !parent_removed.remove(&new) { - // It was not touched in the parent and we added it - parent_new.insert(new); - } - // Else it was removed in the parent and we added it again. We can forget this change - } - - self.save_state_from_diff( - shortstatehash, - parent_new, - parent_removed, - diffsum, - parent_states, - )?; - } else { - // Diff small enough, we add diff as layer on top of parent - let mut value = parent.0.to_be_bytes().to_vec(); - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - value.extend_from_slice(&0_u64.to_be_bytes()); - for removed in &statediffremoved { - value.extend_from_slice(&removed[..]); - } - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; - } - - Ok(()) - } - - /// Returns the new shortstatehash - pub fn save_state( - room_id: &RoomId, - new_state_ids_compressed: HashSet, - ) -> Result<(u64, - HashSet, // added - HashSet)> // removed - { - let previous_shortstatehash = self.d.current_shortstatehash(room_id)?; - - let state_hash = self.calculate_hash( - &new_state_ids_compressed - .iter() - .map(|bytes| &bytes[..]) - .collect::>(), - ); - - let (new_shortstatehash, already_existed) = - self.get_or_create_shortstatehash(&state_hash, &db.globals)?; - - if Some(new_shortstatehash) == previous_shortstatehash { - return Ok(()); - } - - let states_parents = previous_shortstatehash - .map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; - - let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() - { - let statediffnew: HashSet<_> = new_state_ids_compressed - .difference(&parent_stateinfo.1) - .copied() - .collect(); - - let statediffremoved: HashSet<_> = parent_stateinfo - .1 - .difference(&new_state_ids_compressed) - .copied() - .collect(); - - (statediffnew, statediffremoved) - } else { - (new_state_ids_compressed, HashSet::new()) - }; - - if !already_existed { - self.save_state_from_diff( - new_shortstatehash, - statediffnew.clone(), - statediffremoved, - 2, // every state change is 2 event changes on average - states_parents, - )?; - }; - - Ok((new_shortstatehash, statediffnew, statediffremoved)) - } +pub struct Service { + db: D, +} +impl Service<_> { #[tracing::instrument(skip(self))] - pub fn get_auth_chain_from_cache<'a>( + pub fn get_cached_eventid_authchain<'a>( &'a self, key: &[u64], ) -> Result>>> { // Check RAM cache - if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) { + if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key.to_be_bytes()) { return Ok(Some(Arc::clone(result))); } - // Check DB cache - if key.len() == 1 { - if let Some(chain) = - self.shorteventid_authchain - .get(&key[0].to_be_bytes())? - .map(|chain| { - chain - .chunks_exact(size_of::()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) - .collect() - }) + // We only save auth chains for single events in the db + if key.len == 1 { + // Check DB cache + if let Some(chain) = self.db.get_cached_eventid_authchain(key[0]) { let chain = Arc::new(chain); @@ -339,20 +39,15 @@ } #[tracing::instrument(skip(self))] - pub fn cache_auth_chain(&self, key: Vec, chain: Arc>) -> Result<()> { - // Persist in db + pub fn cache_auth_chain(&self, key: Vec, auth_chain: Arc>) -> Result<()> { + // Only persist single events in db if key.len() == 1 { - self.shorteventid_authchain.insert( - &key[0].to_be_bytes(), - &chain - .iter() - .flat_map(|s| s.to_be_bytes().to_vec()) - .collect::>(), - )?; + self.db.cache_auth_chain(key[0], auth_chain)?; } // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, chain); + self.auth_chain_cache.lock().unwrap().insert(key, auth_chain); Ok(()) } +} diff --git a/src/service/rooms/state_compressor/data.rs b/src/service/rooms/state_compressor/data.rs new file mode 100644 index 00000000..8b855cd2 --- /dev/null +++ b/src/service/rooms/state_compressor/data.rs @@ -0,0 +1,10 @@ +struct StateDiff { + parent: Option, + added: Vec, + removed: Vec, +} + +pub trait Data { + fn get_statediff(shortstatehash: u64) -> Result; + fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()>; +} diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 197ce844..d6d88e25 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -1,4 +1,13 @@ +mod data; +pub use data::Data; +use crate::service::*; + +pub struct Service { + db: D, +} + +impl Service<_> { /// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer. #[tracing::instrument(skip(self))] pub fn load_shortstatehash_info( @@ -21,31 +30,7 @@ return Ok(r.clone()); } - let value = self - .shortstatehash_statediff - .get(&shortstatehash.to_be_bytes())? - .ok_or_else(|| Error::bad_database("State hash does not exist"))?; - let parent = - utils::u64_from_bytes(&value[0..size_of::()]).expect("bytes have right length"); - - let mut add_mode = true; - let mut added = HashSet::new(); - let mut removed = HashSet::new(); - - let mut i = size_of::(); - while let Some(v) = value.get(i..i + 2 * size_of::()) { - if add_mode && v.starts_with(&0_u64.to_be_bytes()) { - add_mode = false; - i += size_of::(); - continue; - } - if add_mode { - added.insert(v.try_into().expect("we checked the size above")); - } else { - removed.insert(v.try_into().expect("we checked the size above")); - } - i += 2 * size_of::(); - } + self.db.get_statediff(shortstatehash)?; if parent != 0_u64 { let mut response = self.load_shortstatehash_info(parent)?; @@ -170,17 +155,7 @@ if parent_states.is_empty() { // There is no parent layer, create a new state - let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - warn!("Tried to create new state with removals"); - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; + self.db.save_statediff(shortstatehash, StateDiff { parent: 0, new: statediffnew, removed: statediffremoved })?; return Ok(()); }; @@ -222,20 +197,7 @@ )?; } else { // Diff small enough, we add diff as layer on top of parent - let mut value = parent.0.to_be_bytes().to_vec(); - for new in &statediffnew { - value.extend_from_slice(&new[..]); - } - - if !statediffremoved.is_empty() { - value.extend_from_slice(&0_u64.to_be_bytes()); - for removed in &statediffremoved { - value.extend_from_slice(&removed[..]); - } - } - - self.shortstatehash_statediff - .insert(&shortstatehash.to_be_bytes(), &value)?; + self.db.save_statediff(shortstatehash, StateDiff { parent: parent.0, new: statediffnew, removed: statediffremoved })?; } Ok(()) @@ -298,61 +260,4 @@ Ok((new_shortstatehash, statediffnew, statediffremoved)) } - - #[tracing::instrument(skip(self))] - pub fn get_auth_chain_from_cache<'a>( - &'a self, - key: &[u64], - ) -> Result>>> { - // Check RAM cache - if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) { - return Ok(Some(Arc::clone(result))); - } - - // Check DB cache - if key.len() == 1 { - if let Some(chain) = - self.shorteventid_authchain - .get(&key[0].to_be_bytes())? - .map(|chain| { - chain - .chunks_exact(size_of::()) - .map(|chunk| { - utils::u64_from_bytes(chunk).expect("byte length is correct") - }) - .collect() - }) - { - let chain = Arc::new(chain); - - // Cache in RAM - self.auth_chain_cache - .lock() - .unwrap() - .insert(vec![key[0]], Arc::clone(&chain)); - - return Ok(Some(chain)); - } - } - - Ok(None) - } - - #[tracing::instrument(skip(self))] - pub fn cache_auth_chain(&self, key: Vec, chain: Arc>) -> Result<()> { - // Persist in db - if key.len() == 1 { - self.shorteventid_authchain.insert( - &key[0].to_be_bytes(), - &chain - .iter() - .flat_map(|s| s.to_be_bytes().to_vec()) - .collect::>(), - )?; - } - - // Cache in RAM - self.auth_chain_cache.lock().unwrap().insert(key, chain); - - Ok(()) - } +}