From 41ec7cf5d028bb382dec1cdb7d1ecbf79b047689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 3 Aug 2021 16:14:07 +0200 Subject: [PATCH] improvement: batch inserts for stateids --- src/database/abstraction/sqlite.rs | 5 +- src/database/rooms.rs | 149 +++++++++++++++++------------ src/database/rooms/edus.rs | 4 +- 3 files changed, 92 insertions(+), 66 deletions(-) diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index ce30a569..72fb5f7f 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -243,7 +243,10 @@ impl Tree for SqliteTable { let statement = Box::leak(Box::new( guard - .prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name)) + .prepare(&format!( + "SELECT key, value FROM {} ORDER BY key ASC", + &self.name + )) .unwrap(), )); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index e5eb2eea..48a135a0 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -313,41 +313,50 @@ impl Rooms { let new_state = if !already_existed { let mut new_state = HashSet::new(); - for ((event_type, state_key), eventid) in state { - new_state.insert(eventid.clone()); + let batch = state + .iter() + .filter_map(|((event_type, state_key), eventid)| { + new_state.insert(eventid.clone()); + + let mut statekey = event_type.as_ref().as_bytes().to_vec(); + statekey.push(0xff); + statekey.extend_from_slice(&state_key.as_bytes()); + + let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = db.globals.next_count().ok()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes()) + .ok()?; + shortstatekey.to_be_bytes().to_vec() + } + }; - let mut statekey = event_type.as_ref().as_bytes().to_vec(); - statekey.push(0xff); - statekey.extend_from_slice(&state_key.as_bytes()); + let shorteventid = + match self.eventid_shorteventid.get(eventid.as_bytes()).ok()? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = db.globals.next_count().ok()?; + self.eventid_shorteventid + .insert(eventid.as_bytes(), &shorteventid.to_be_bytes()) + .ok()?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), eventid.as_bytes()) + .ok()?; + shorteventid.to_be_bytes().to_vec() + } + }; - let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { - Some(shortstatekey) => shortstatekey.to_vec(), - None => { - let shortstatekey = db.globals.next_count()?; - self.statekey_shortstatekey - .insert(&statekey, &shortstatekey.to_be_bytes())?; - shortstatekey.to_be_bytes().to_vec() - } - }; + let mut state_id = shortstatehash.to_be_bytes().to_vec(); + state_id.extend_from_slice(&shortstatekey); - let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = db.globals.next_count()?; - self.eventid_shorteventid - .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; - - let mut state_id = shortstatehash.to_be_bytes().to_vec(); - state_id.extend_from_slice(&shortstatekey); + Some((state_id, shorteventid)) + }) + .collect::>(); - self.stateid_shorteventid - .insert(&state_id, &*shorteventid)?; - } + self.stateid_shorteventid + .insert_batch(&mut batch.into_iter())?; new_state } else { @@ -1120,39 +1129,51 @@ impl Rooms { } }; - for ((event_type, state_key), pdu) in state { - let mut statekey = event_type.as_ref().as_bytes().to_vec(); - statekey.push(0xff); - statekey.extend_from_slice(&state_key.as_bytes()); + let batch = state + .iter() + .filter_map(|((event_type, state_key), pdu)| { + let mut statekey = event_type.as_ref().as_bytes().to_vec(); + statekey.push(0xff); + statekey.extend_from_slice(&state_key.as_bytes()); - let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { - Some(shortstatekey) => shortstatekey.to_vec(), - None => { - let shortstatekey = globals.next_count()?; - self.statekey_shortstatekey - .insert(&statekey, &shortstatekey.to_be_bytes())?; - shortstatekey.to_be_bytes().to_vec() - } - }; + let shortstatekey = match self.statekey_shortstatekey.get(&statekey).ok()? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = globals.next_count().ok()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes()) + .ok()?; + shortstatekey.to_be_bytes().to_vec() + } + }; - let shorteventid = match self.eventid_shorteventid.get(pdu.event_id.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count()?; - self.eventid_shorteventid - .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; + let shorteventid = match self + .eventid_shorteventid + .get(pdu.event_id.as_bytes()) + .ok()? + { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = globals.next_count().ok()?; + self.eventid_shorteventid + .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes()) + .ok()?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes()) + .ok()?; + shorteventid.to_be_bytes().to_vec() + } + }; - let mut state_id = shortstatehash.clone(); - state_id.extend_from_slice(&shortstatekey); + let mut state_id = shortstatehash.clone(); + state_id.extend_from_slice(&shortstatekey); - self.stateid_shorteventid - .insert(&*state_id, &*shorteventid)?; - } + Some((state_id, shorteventid)) + }) + .collect::>(); + + self.stateid_shorteventid + .insert_batch(&mut batch.into_iter())?; self.shorteventid_shortstatehash .insert(&shorteventid, &*shortstatehash)?; @@ -1257,11 +1278,13 @@ impl Rooms { } }; - for (shortstatekey, shorteventid) in new_state { + let mut batch = new_state.into_iter().map(|(shortstatekey, shorteventid)| { let mut state_id = shortstatehash.to_be_bytes().to_vec(); state_id.extend_from_slice(&shortstatekey); - self.stateid_shorteventid.insert(&state_id, &shorteventid)?; - } + (state_id, shorteventid) + }); + + self.stateid_shorteventid.insert_batch(&mut batch)?; Ok(shortstatehash) } else { diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index 664c1710..ff28436b 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -422,7 +422,7 @@ impl RoomEdus { } /// Sets all users to offline who have been quiet for too long. - pub fn presence_maintain( + fn presence_maintain( &self, rooms: &super::Rooms, globals: &super::super::globals::Globals, @@ -497,7 +497,7 @@ impl RoomEdus { rooms: &super::Rooms, globals: &super::super::globals::Globals, ) -> Result> { - self.presence_maintain(rooms, globals)?; + //self.presence_maintain(rooms, globals)?; let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff);