diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index f381ce9f..5b941fbe 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -35,6 +35,7 @@ pub trait Tree: Send + Sync { ) -> Box, Vec)> + 'a>; fn increment(&self, key: &[u8]) -> Result>; + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()>; fn scan_prefix<'a>( &'a self, diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 3c4ae9c6..1e55418b 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -49,11 +49,11 @@ impl Engine { fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result { let conn = Connection::open(&path)?; - conn.pragma_update(Some(Main), "page_size", &32768)?; + conn.pragma_update(Some(Main), "page_size", &1024)?; conn.pragma_update(Some(Main), "journal_mode", &"WAL")?; conn.pragma_update(Some(Main), "synchronous", &"NORMAL")?; conn.pragma_update(Some(Main), "cache_size", &(-i64::from(cache_size_kb)))?; - conn.pragma_update(Some(Main), "wal_autocheckpoint", &0)?; + conn.pragma_update(Some(Main), "wal_autocheckpoint", &8000)?; Ok(conn) } @@ -93,8 +93,9 @@ impl Engine { } pub fn flush_wal(self: &Arc) -> Result<()> { - self.write_lock() - .pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?; + // We use autocheckpoints + //self.write_lock() + //.pragma_update(Some(Main), "wal_checkpoint", &"TRUNCATE")?; Ok(()) } } @@ -248,6 +249,24 @@ impl Tree for SqliteTable { Ok(()) } + #[tracing::instrument(skip(self, iter))] + fn increment_batch<'a>(&self, iter: &mut dyn Iterator>) -> Result<()> { + let guard = self.engine.write_lock(); + + guard.execute("BEGIN", [])?; + for key in iter { + let old = self.get_with_guard(&guard, &key)?; + let new = crate::utils::increment(old.as_deref()) + .expect("utils::increment always returns Some"); + self.insert_with_guard(&guard, &key, &new)?; + } + guard.execute("COMMIT", [])?; + + drop(guard); + + Ok(()) + } + #[tracing::instrument(skip(self, key))] fn remove(&self, key: &[u8]) -> Result<()> { let guard = self.engine.write_lock(); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5baadf9e..d648b7d8 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -92,13 +92,17 @@ pub struct Rooms { pub(super) pdu_cache: Mutex>>, pub(super) auth_chain_cache: Mutex>>, pub(super) shorteventid_cache: Mutex>, - pub(super) stateinfo_cache: Mutex, // full state - HashSet, // added - HashSet, // removed - )>>>, + pub(super) stateinfo_cache: Mutex< + LruCache< + u64, + Vec<( + u64, // sstatehash + HashSet, // full state + HashSet, // added + HashSet, // removed + )>, + >, + >, } impl Rooms { @@ -414,7 +418,8 @@ impl Rooms { HashSet, // removed )>, > { - if let Some(r) = self.stateinfo_cache + if let Some(r) = self + .stateinfo_cache .lock() .unwrap() .get_mut(&shortstatehash) @@ -458,10 +463,6 @@ impl Rooms { response.push((shortstatehash, state, added, removed)); - self.stateinfo_cache - .lock() - .unwrap() - .insert(shortstatehash, response.clone()); Ok(response) } else { let mut response = Vec::new(); @@ -1173,6 +1174,9 @@ impl Rooms { let sync_pdu = pdu.to_sync_room_event(); + let mut notifies = Vec::new(); + let mut highlights = Vec::new(); + for user in db .rooms .room_members(&pdu.room_id) @@ -1218,11 +1222,11 @@ impl Rooms { userroom_id.extend_from_slice(pdu.room_id.as_bytes()); if notify { - self.userroomid_notificationcount.increment(&userroom_id)?; + notifies.push(userroom_id.clone()); } if highlight { - self.userroomid_highlightcount.increment(&userroom_id)?; + highlights.push(userroom_id); } for senderkey in db.pusher.get_pusher_senderkeys(&user) { @@ -1230,6 +1234,11 @@ impl Rooms { } } + self.userroomid_notificationcount + .increment_batch(&mut notifies.into_iter())?; + self.userroomid_highlightcount + .increment_batch(&mut highlights.into_iter())?; + match pdu.kind { EventType::RoomRedaction => { if let Some(redact_id) = &pdu.redacts {