fix: make incremental sync efficient again

merge-requests/217/head
Timo Kösters 3 years ago
parent 93d225fd1e
commit f285c89006
No known key found for this signature in database
GPG Key ID: 356E705610F626D5

@ -242,7 +242,7 @@ pub async fn get_message_events_route(
&sender_user, &sender_user,
&sender_device, &sender_device,
&body.room_id, &body.room_id,
lazy_loaded.into_iter().collect(), lazy_loaded,
next_token, next_token,
); );
} }

@ -374,7 +374,7 @@ async fn sync_helper(
let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?; let current_state_ids = db.rooms.state_full_ids(current_shortstatehash)?;
let mut state_events = Vec::new(); let mut state_events = Vec::new();
let mut lazy_loaded = Vec::new(); let mut lazy_loaded = HashSet::new();
for (shortstatekey, id) in current_state_ids { for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?; let (event_type, state_key) = db.rooms.get_statekey_from_short(shortstatekey)?;
@ -399,7 +399,7 @@ async fn sync_helper(
continue; continue;
} }
}; };
lazy_loaded.push( lazy_loaded.insert(
UserId::parse(state_key.as_ref()) UserId::parse(state_key.as_ref())
.expect("they are in timeline_users, so they should be correct"), .expect("they are in timeline_users, so they should be correct"),
); );
@ -456,46 +456,57 @@ async fn sync_helper(
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?; let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
let mut state_events = Vec::new(); let mut state_events = Vec::new();
let mut lazy_loaded = Vec::new(); let mut lazy_loaded = HashSet::new();
for (key, id) in current_state_ids { for (key, id) in current_state_ids {
let pdu = match db.rooms.get_pdu(&id)? { if body.full_state || since_state_ids.get(&key) != Some(&id) {
Some(pdu) => pdu, let pdu = match db.rooms.get_pdu(&id)? {
None => { Some(pdu) => pdu,
error!("Pdu in state not found: {}", id); None => {
continue; error!("Pdu in state not found: {}", id);
} continue;
}; }
};
let state_key = pdu
.state_key
.as_ref()
.expect("state events have state keys");
if pdu.kind != EventType::RoomMember { if pdu.kind == EventType::RoomMember {
if body.full_state || since_state_ids.get(&key) != Some(&id) { match UserId::parse(
state_events.push(pdu); pdu.state_key
.as_ref()
.expect("State event has state key")
.clone(),
) {
Ok(state_key_userid) => {
lazy_loaded.insert(state_key_userid);
}
Err(e) => error!("Invalid state key for member event: {}", e),
}
} }
continue;
}
// Pdu has to be a member event
let state_key_userid = UserId::parse(state_key.as_ref())
.expect("they are in timeline_users, so they should be correct");
if body.full_state || since_state_ids.get(&key) != Some(&id) {
lazy_loaded.push(state_key_userid);
state_events.push(pdu); state_events.push(pdu);
} else if timeline_users.contains(state_key) }
&& (!db.rooms.lazy_load_was_sent_before( for (_, event) in &timeline_pdus {
if lazy_loaded.contains(&event.sender) {
continue;
}
if !db.rooms.lazy_load_was_sent_before(
&sender_user, &sender_user,
&sender_device, &sender_device,
&room_id, &room_id,
&state_key_userid, &event.sender,
)? || lazy_load_send_redundant) )? || lazy_load_send_redundant
{ {
lazy_loaded.push(state_key_userid); let pdu = match db.rooms.get_pdu(&id)? {
state_events.push(pdu); Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
lazy_loaded.insert(event.sender.clone());
state_events.push(pdu);
}
} }
} }

@ -120,7 +120,7 @@ pub struct Rooms {
pub(super) our_real_users_cache: RwLock<HashMap<Box<RoomId>, Arc<HashSet<Box<UserId>>>>>, pub(super) our_real_users_cache: RwLock<HashMap<Box<RoomId>, Arc<HashSet<Box<UserId>>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<Box<RoomId>, HashMap<String, bool>>>, pub(super) appservice_in_room_cache: RwLock<HashMap<Box<RoomId>, HashMap<String, bool>>>,
pub(super) lazy_load_waiting: pub(super) lazy_load_waiting:
Mutex<HashMap<(Box<UserId>, Box<DeviceId>, Box<RoomId>, u64), Vec<Box<UserId>>>>, Mutex<HashMap<(Box<UserId>, Box<DeviceId>, Box<RoomId>, u64), HashSet<Box<UserId>>>>,
pub(super) stateinfo_cache: Mutex< pub(super) stateinfo_cache: Mutex<
LruCache< LruCache<
u64, u64,
@ -3495,7 +3495,7 @@ impl Rooms {
user_id: &UserId, user_id: &UserId,
device_id: &DeviceId, device_id: &DeviceId,
room_id: &RoomId, room_id: &RoomId,
lazy_load: Vec<Box<UserId>>, lazy_load: HashSet<Box<UserId>>,
count: u64, count: u64,
) { ) {
self.lazy_load_waiting.lock().unwrap().insert( self.lazy_load_waiting.lock().unwrap().insert(

Loading…
Cancel
Save