diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index d971e6b7..a91d079a 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -669,24 +669,21 @@ async fn join_room_by_id_helper( .add_pdu_outlier(&event_id, &value)?; } - let statehash_before_join = services().rooms.state.set_event_state( - event_id, + let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state( room_id, state .into_iter() - .map(|(k, id)| { - services() - .rooms - .state_compressor - .compress_state_event(k, &id) - }) + .map(|(k, id)| services().rooms.state_compressor.compress_state_event(k, &id)) .collect::>()?, )?; services() .rooms .state - .set_room_state(room_id, statehash_before_join, &state_lock)?; + .force_state(room_id, statehash_before_join, new, removed, &state_lock) + .await?; + + services().rooms.state_cache.update_joined_count(room_id)?; // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs index 39c261f3..70e59acb 100644 --- a/src/database/key_value/rooms/state_accessor.rs +++ b/src/database/key_value/rooms/state_accessor.rs @@ -23,7 +23,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let parsed = services() .rooms .state_compressor - .parse_compressed_state_event(compressed)?; + .parse_compressed_state_event(&compressed)?; result.insert(parsed.0, parsed.1); i += 1; @@ -52,7 +52,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { let (_, eventid) = services() .rooms .state_compressor - .parse_compressed_state_event(compressed)?; + .parse_compressed_state_event(&compressed)?; if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? { result.insert( ( @@ -104,7 +104,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase { services() .rooms .state_compressor - .parse_compressed_state_event(compressed) + .parse_compressed_state_event(&compressed) .ok() .map(|(_, id)| id) })) diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index e5f8424b..cfe0fbf4 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -970,14 +970,11 @@ impl Service { // Set the new room state to the resolved state if update_state { info!("Forcing new room state"); - let sstatehash = services() + let (sstatehash, new, removed) = services() .rooms .state_compressor .save_state(room_id, new_room_state)?; - services() - .rooms - .state - .set_room_state(room_id, sstatehash, &state_lock)?; + services().rooms.state.force_state(room_id, sstatehash, new, removed, &state_lock).await?; } } diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs index 3aa49146..8e80b5e3 100644 --- a/src/service/rooms/state/data.rs +++ b/src/service/rooms/state/data.rs @@ -8,7 +8,7 @@ pub trait Data: Send + Sync { /// Returns the last state hash key added to the db for the given room. fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result>; - /// Update the current state of the room. + /// Set the state hash to a new version, but does not update state_cache. fn set_room_state( &self, room_id: &RoomId, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 7b8b0fde..15fa79b8 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -34,23 +34,13 @@ impl Service { shortstatehash: u64, statediffnew: HashSet, _statediffremoved: HashSet, + state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex ) -> Result<()> { - let mutex_state = Arc::clone( - services() - .globals - .roomid_mutex_state - .write() - .unwrap() - .entry(room_id.to_owned()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; - for event_id in statediffnew.into_iter().filter_map(|new| { services() .rooms .state_compressor - .parse_compressed_state_event(new) + .parse_compressed_state_event(&new) .ok() .map(|(_, id)| id) }) { @@ -105,8 +95,6 @@ impl Service { self.db .set_room_state(room_id, shortstatehash, &state_lock)?; - drop(state_lock); - Ok(()) } @@ -312,6 +300,7 @@ impl Service { Ok(state) } + /// Set the state hash to a new version, but does not update state_cache. #[tracing::instrument(skip(self))] pub fn set_room_state( &self, @@ -412,7 +401,7 @@ impl Service { services() .rooms .state_compressor - .parse_compressed_state_event(compressed) + .parse_compressed_state_event(&compressed) .ok() }) .filter_map(|(shortstatekey, event_id)| { diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index b927cb72..bcd3b9a1 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -100,7 +100,7 @@ impl Service { /// Returns shortstatekey, event id pub fn parse_compressed_state_event( &self, - compressed_event: CompressedStateEvent, + compressed_event: &CompressedStateEvent, ) -> Result<(u64, Arc)> { Ok(( utils::u64_from_bytes(&compressed_event[0..size_of::()]) @@ -246,12 +246,12 @@ impl Service { Ok(()) } - /// Returns the new shortstatehash + /// Returns the new shortstatehash, and the state diff from the previous room state pub fn save_state( &self, room_id: &RoomId, new_state_ids_compressed: HashSet, - ) -> Result { + ) -> Result<(u64, HashSet, HashSet)> { let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?; let state_hash = utils::calculate_hash( @@ -267,7 +267,7 @@ impl Service { .get_or_create_shortstatehash(&state_hash)?; if Some(new_shortstatehash) == previous_shortstatehash { - return Ok(new_shortstatehash); + return Ok((new_shortstatehash, HashSet::new(), HashSet::new())); } let states_parents = previous_shortstatehash @@ -295,12 +295,12 @@ impl Service { self.save_state_from_diff( new_shortstatehash, statediffnew.clone(), - statediffremoved, + statediffremoved.clone(), 2, // every state change is 2 event changes on average states_parents, )?; }; - Ok(new_shortstatehash) + Ok((new_shortstatehash, statediffnew, statediffremoved)) } }