From 2a00c547a1baca5e2ca57966ef5ce5c7f063f367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 8 Feb 2022 09:25:44 +0100 Subject: [PATCH] improvement: faster /syncs --- src/client_server/sync.rs | 63 ++++++++++++++++++++++----------------- src/database.rs | 1 + src/database/rooms.rs | 37 ++++++++++++++++++++++- 3 files changed, 72 insertions(+), 29 deletions(-) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 7cfea5af..1ccf7982 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -245,30 +245,41 @@ async fn sync_helper( let insert_lock = mutex_insert.lock().unwrap(); drop(insert_lock); - let mut non_timeline_pdus = db - .rooms - .pdus_until(&sender_user, &room_id, u64::MAX)? - .filter_map(|r| { - // Filter out buggy events - if r.is_err() { - error!("Bad pdu in pdus_since: {:?}", r); - } - r.ok() - }) - .take_while(|(pduid, _)| { - db.rooms - .pdu_count(pduid) - .map_or(false, |count| count > since) - }); - - // Take the last 10 events for the timeline - let timeline_pdus: Vec<_> = non_timeline_pdus - .by_ref() - .take(10) - .collect::>() - .into_iter() - .rev() - .collect(); + let timeline_pdus; + let limited; + if db.rooms.last_timeline_count(&sender_user, &room_id)? > since { + let mut non_timeline_pdus = db + .rooms + .pdus_until(&sender_user, &room_id, u64::MAX)? + .filter_map(|r| { + // Filter out buggy events + if r.is_err() { + error!("Bad pdu in pdus_since: {:?}", r); + } + r.ok() + }) + .take_while(|(pduid, _)| { + db.rooms + .pdu_count(pduid) + .map_or(false, |count| count > since) + }); + + // Take the last 10 events for the timeline + timeline_pdus = non_timeline_pdus + .by_ref() + .take(10) + .collect::>() + .into_iter() + .rev() + .collect::>(); + + // They /sync response doesn't always return all messages, so we say the output is + // limited unless there are events in non_timeline_pdus + limited = non_timeline_pdus.next().is_some(); + } else { + timeline_pdus = Vec::new(); + limited = false; + } let send_notification_counts = !timeline_pdus.is_empty() || db @@ -277,10 +288,6 @@ async fn sync_helper( .last_privateread_update(&sender_user, &room_id)? > since; - // They /sync response doesn't always return all messages, so we say the output is - // limited unless there are events in non_timeline_pdus - let limited = non_timeline_pdus.next().is_some(); - let mut timeline_users = HashSet::new(); for (_, event) in &timeline_pdus { timeline_users.insert(event.sender.as_str().to_owned()); diff --git a/src/database.rs b/src/database.rs index 2b1671cd..8e95b1ef 100644 --- a/src/database.rs +++ b/src/database.rs @@ -263,6 +263,7 @@ impl Database { stateinfo_cache: Mutex::new(LruCache::new( (100.0 * config.conduit_cache_capacity_modifier) as usize, )), + lasttimelinecount_cache: Mutex::new(HashMap::new()), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 0abd2e79..17c9b743 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -32,7 +32,7 @@ use serde::Deserialize; use serde_json::value::to_raw_value; use std::{ borrow::Cow, - collections::{BTreeMap, HashMap, HashSet}, + collections::{hash_map, BTreeMap, HashMap, HashSet}, fmt::Debug, iter, mem::size_of, @@ -128,6 +128,7 @@ pub struct Rooms { )>, >, >, + pub(super) lasttimelinecount_cache: Mutex, u64>>, } impl Rooms { @@ -1331,6 +1332,10 @@ impl Rooms { &pdu_id, &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"), )?; + self.lasttimelinecount_cache + .lock() + .unwrap() + .insert(pdu.room_id.clone(), count2); self.eventid_pduid .insert(pdu.event_id.as_bytes(), &pdu_id)?; @@ -1498,6 +1503,36 @@ impl Rooms { Ok(pdu_id) } + #[tracing::instrument(skip(self))] + pub fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result { + match self + .lasttimelinecount_cache + .lock() + .unwrap() + .entry(room_id.to_owned()) + { + hash_map::Entry::Vacant(v) => { + if let Some(last_count) = self + .pdus_until(&sender_user, &room_id, u64::MAX)? + .filter_map(|r| { + // Filter out buggy events + if r.is_err() { + error!("Bad pdu in pdus_since: {:?}", r); + } + r.ok() + }) + .map(|(pduid, _)| self.pdu_count(&pduid)) + .next() + { + Ok(*v.insert(last_count?)) + } else { + Ok(0) + } + } + hash_map::Entry::Occupied(o) => Ok(*o.get()), + } + } + #[tracing::instrument(skip(self))] pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { let mut userroom_id = user_id.as_bytes().to_vec();