fix: update state_cache on join over federation

Nyaaori/refactor-next
Timo Kösters 2 years ago committed by Nyaaori
parent 5a04559cb4
commit 1a7893dbbd
No known key found for this signature in database
GPG Key ID: E7819C3ED4D1F82E

@ -669,24 +669,21 @@ async fn join_room_by_id_helper(
.add_pdu_outlier(&event_id, &value)?;
}
let statehash_before_join = services().rooms.state.set_event_state(
event_id,
let (statehash_before_join, new, removed) = services().rooms.state_compressor.save_state(
room_id,
state
.into_iter()
.map(|(k, id)| {
services()
.rooms
.state_compressor
.compress_state_event(k, &id)
})
.map(|(k, id)| services().rooms.state_compressor.compress_state_event(k, &id))
.collect::<Result<_>>()?,
)?;
services()
.rooms
.state
.set_room_state(room_id, statehash_before_join, &state_lock)?;
.force_state(room_id, statehash_before_join, new, removed, &state_lock)
.await?;
services().rooms.state_cache.update_joined_count(room_id)?;
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.

@ -23,7 +23,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
let parsed = services()
.rooms
.state_compressor
.parse_compressed_state_event(compressed)?;
.parse_compressed_state_event(&compressed)?;
result.insert(parsed.0, parsed.1);
i += 1;
@ -52,7 +52,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
let (_, eventid) = services()
.rooms
.state_compressor
.parse_compressed_state_event(compressed)?;
.parse_compressed_state_event(&compressed)?;
if let Some(pdu) = services().rooms.timeline.get_pdu(&eventid)? {
result.insert(
(
@ -104,7 +104,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
services()
.rooms
.state_compressor
.parse_compressed_state_event(compressed)
.parse_compressed_state_event(&compressed)
.ok()
.map(|(_, id)| id)
}))

@ -970,14 +970,11 @@ impl Service {
// Set the new room state to the resolved state
if update_state {
info!("Forcing new room state");
let sstatehash = services()
let (sstatehash, new, removed) = services()
.rooms
.state_compressor
.save_state(room_id, new_room_state)?;
services()
.rooms
.state
.set_room_state(room_id, sstatehash, &state_lock)?;
services().rooms.state.force_state(room_id, sstatehash, new, removed, &state_lock).await?;
}
}

@ -8,7 +8,7 @@ pub trait Data: Send + Sync {
/// Returns the last state hash key added to the db for the given room.
fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>>;
/// Update the current state of the room.
/// Set the state hash to a new version, but does not update state_cache.
fn set_room_state(
&self,
room_id: &RoomId,

@ -34,23 +34,13 @@ impl Service {
shortstatehash: u64,
statediffnew: HashSet<CompressedStateEvent>,
_statediffremoved: HashSet<CompressedStateEvent>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
for event_id in statediffnew.into_iter().filter_map(|new| {
services()
.rooms
.state_compressor
.parse_compressed_state_event(new)
.parse_compressed_state_event(&new)
.ok()
.map(|(_, id)| id)
}) {
@ -105,8 +95,6 @@ impl Service {
self.db
.set_room_state(room_id, shortstatehash, &state_lock)?;
drop(state_lock);
Ok(())
}
@ -312,6 +300,7 @@ impl Service {
Ok(state)
}
/// Set the state hash to a new version, but does not update state_cache.
#[tracing::instrument(skip(self))]
pub fn set_room_state(
&self,
@ -412,7 +401,7 @@ impl Service {
services()
.rooms
.state_compressor
.parse_compressed_state_event(compressed)
.parse_compressed_state_event(&compressed)
.ok()
})
.filter_map(|(shortstatekey, event_id)| {

@ -100,7 +100,7 @@ impl Service {
/// Returns shortstatekey, event id
pub fn parse_compressed_state_event(
&self,
compressed_event: CompressedStateEvent,
compressed_event: &CompressedStateEvent,
) -> Result<(u64, Arc<EventId>)> {
Ok((
utils::u64_from_bytes(&compressed_event[0..size_of::<u64>()])
@ -246,12 +246,12 @@ impl Service {
Ok(())
}
/// Returns the new shortstatehash
/// Returns the new shortstatehash, and the state diff from the previous room state
pub fn save_state(
&self,
room_id: &RoomId,
new_state_ids_compressed: HashSet<CompressedStateEvent>,
) -> Result<u64> {
) -> Result<(u64, HashSet<CompressedStateEvent>, HashSet<CompressedStateEvent>)> {
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
let state_hash = utils::calculate_hash(
@ -267,7 +267,7 @@ impl Service {
.get_or_create_shortstatehash(&state_hash)?;
if Some(new_shortstatehash) == previous_shortstatehash {
return Ok(new_shortstatehash);
return Ok((new_shortstatehash, HashSet::new(), HashSet::new()));
}
let states_parents = previous_shortstatehash
@ -295,12 +295,12 @@ impl Service {
self.save_state_from_diff(
new_shortstatehash,
statediffnew.clone(),
statediffremoved,
statediffremoved.clone(),
2, // every state change is 2 event changes on average
states_parents,
)?;
};
Ok(new_shortstatehash)
Ok((new_shortstatehash, statediffnew, statediffremoved))
}
}

Loading…
Cancel
Save