From 28644f236e7e8871b365696a3c03e43adbd19aaa Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:50 +0200 Subject: [PATCH 01/18] refactor: prepare src/database/key_value.rs from src/service/rooms/state/data.rs --- src/{service/rooms/state/data.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/state/data.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/state/data.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/state/data.rs rename to src/database/key_value.rs From 1442c64420345b7e6dff7e19cb628348d3d96c6a Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:50 +0200 Subject: [PATCH 02/18] refactor: restore src/service/rooms/state/data.rs --- src/service/rooms/state/data.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 src/service/rooms/state/data.rs diff --git a/src/service/rooms/state/data.rs b/src/service/rooms/state/data.rs new file mode 100644 index 00000000..4b42ca8e --- /dev/null +++ b/src/service/rooms/state/data.rs @@ -0,0 +1,16 @@ +pub trait Data { + fn get_room_shortstatehash(room_id: &RoomId); +} + + /// Returns the last state hash key added to the db for the given room. + #[tracing::instrument(skip(self))] + pub fn current_shortstatehash(&self, room_id: &RoomId) -> Result> { + self.roomid_shortstatehash + .get(room_id.as_bytes())? + .map_or(Ok(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid shortstatehash in roomid_shortstatehash") + })?)) + }) + } + From 33c0e0f430663e48c012dbb71d328dce5a2a14a8 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:57 +0200 Subject: [PATCH 03/18] refactor: prepare src/database/key_value.rs from src/service/rooms/alias/mod.rs --- src/{service/rooms/alias/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/alias/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/alias/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/alias/mod.rs rename to src/database/key_value.rs From a2a327af7caf309a5ef7d95e2010e25d0e75d019 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:57 +0200 Subject: [PATCH 04/18] refactor: prepare src/database/key_value.rs from src/service/rooms/state/mod.rs --- src/{service/rooms/state/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/state/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/state/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/state/mod.rs rename to src/database/key_value.rs From 05487c7c158a7346c631efbb93b88bd3203ef3bc Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:57 +0200 Subject: [PATCH 05/18] refactor: restore src/service/rooms/alias/mod.rs --- src/service/rooms/alias/mod.rs | 66 ++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/service/rooms/alias/mod.rs diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs new file mode 100644 index 00000000..393ad671 --- /dev/null +++ b/src/service/rooms/alias/mod.rs @@ -0,0 +1,66 @@ + + #[tracing::instrument(skip(self, globals))] + pub fn set_alias( + &self, + alias: &RoomAliasId, + room_id: Option<&RoomId>, + globals: &super::globals::Globals, + ) -> Result<()> { + if let Some(room_id) = room_id { + // New alias + self.alias_roomid + .insert(alias.alias().as_bytes(), room_id.as_bytes())?; + let mut aliasid = room_id.as_bytes().to_vec(); + aliasid.push(0xff); + aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes()); + self.aliasid_alias.insert(&aliasid, &*alias.as_bytes())?; + } else { + // room_id=None means remove alias + if let Some(room_id) = self.alias_roomid.get(alias.alias().as_bytes())? { + let mut prefix = room_id.to_vec(); + prefix.push(0xff); + + for (key, _) in self.aliasid_alias.scan_prefix(prefix) { + self.aliasid_alias.remove(&key)?; + } + self.alias_roomid.remove(alias.alias().as_bytes())?; + } else { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Alias does not exist.", + )); + } + } + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn id_from_alias(&self, alias: &RoomAliasId) -> Result>> { + self.alias_roomid + .get(alias.alias().as_bytes())? + .map(|bytes| { + RoomId::parse(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in alias_roomid is invalid unicode.") + })?) + .map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid.")) + }) + .transpose() + } + + #[tracing::instrument(skip(self))] + pub fn room_aliases<'a>( + &'a self, + room_id: &RoomId, + ) -> impl Iterator>> + 'a { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + self.aliasid_alias.scan_prefix(prefix).map(|(_, bytes)| { + utils::string_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid alias bytes in aliasid_alias."))? + .try_into() + .map_err(|_| Error::bad_database("Invalid alias in aliasid_alias.")) + }) + } + From adafb335ffbfae4097d008685af78c2b15fa0f0d Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:57 +0200 Subject: [PATCH 06/18] refactor: restore src/service/rooms/state/mod.rs --- src/service/rooms/state/mod.rs | 296 +++++++++++++++++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 src/service/rooms/state/mod.rs diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs new file mode 100644 index 00000000..da03ad4c --- /dev/null +++ b/src/service/rooms/state/mod.rs @@ -0,0 +1,296 @@ +pub struct Service { + db: D, +} + +impl Service { + /// Set the room to the given statehash and update caches. + #[tracing::instrument(skip(self, new_state_ids_compressed, db))] + pub fn force_state( + &self, + room_id: &RoomId, + shortstatehash: u64, + statediffnew :HashSet, + statediffremoved :HashSet, + db: &Database, + ) -> Result<()> { + + for event_id in statediffnew.into_iter().filter_map(|new| { + self.parse_compressed_state_event(new) + .ok() + .map(|(_, id)| id) + }) { + let pdu = match self.get_pdu_json(&event_id)? { + Some(pdu) => pdu, + None => continue, + }; + + if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") { + continue; + } + + let pdu: PduEvent = match serde_json::from_str( + &serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"), + ) { + Ok(pdu) => pdu, + Err(_) => continue, + }; + + #[derive(Deserialize)] + struct ExtractMembership { + membership: MembershipState, + } + + let membership = match serde_json::from_str::(pdu.content.get()) { + Ok(e) => e.membership, + Err(_) => continue, + }; + + let state_key = match pdu.state_key { + Some(k) => k, + None => continue, + }; + + let user_id = match UserId::parse(state_key) { + Ok(id) => id, + Err(_) => continue, + }; + + self.update_membership(room_id, &user_id, membership, &pdu.sender, None, db, false)?; + } + + self.update_joined_count(room_id, db)?; + + self.roomid_shortstatehash + .insert(room_id.as_bytes(), &new_shortstatehash.to_be_bytes())?; + + Ok(()) + } + + /// Returns the leaf pdus of a room. + #[tracing::instrument(skip(self))] + pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result>> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + self.roomid_pduleaves + .scan_prefix(prefix) + .map(|(_, bytes)| { + EventId::parse_arc(utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("EventID in roomid_pduleaves is invalid unicode.") + })?) + .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid.")) + }) + .collect() + } + + /// Replace the leaves of a room. + /// + /// The provided `event_ids` become the new leaves, this allows a room to have multiple + /// `prev_events`. + #[tracing::instrument(skip(self))] + pub fn replace_pdu_leaves<'a>( + &self, + room_id: &RoomId, + event_ids: impl IntoIterator + Debug, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + for (key, _) in self.roomid_pduleaves.scan_prefix(prefix.clone()) { + self.roomid_pduleaves.remove(&key)?; + } + + for event_id in event_ids { + let mut key = prefix.to_owned(); + key.extend_from_slice(event_id.as_bytes()); + self.roomid_pduleaves.insert(&key, event_id.as_bytes())?; + } + + Ok(()) + } + + /// Generates a new StateHash and associates it with the incoming event. + /// + /// This adds all current state events (not including the incoming event) + /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. + #[tracing::instrument(skip(self, state_ids_compressed, globals))] + pub fn set_event_state( + &self, + event_id: &EventId, + room_id: &RoomId, + state_ids_compressed: HashSet, + globals: &super::globals::Globals, + ) -> Result<()> { + let shorteventid = self.get_or_create_shorteventid(event_id, globals)?; + + let previous_shortstatehash = self.current_shortstatehash(room_id)?; + + let state_hash = self.calculate_hash( + &state_ids_compressed + .iter() + .map(|s| &s[..]) + .collect::>(), + ); + + let (shortstatehash, already_existed) = + self.get_or_create_shortstatehash(&state_hash, globals)?; + + if !already_existed { + let states_parents = previous_shortstatehash + .map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; + + let (statediffnew, statediffremoved) = + if let Some(parent_stateinfo) = states_parents.last() { + let statediffnew: HashSet<_> = state_ids_compressed + .difference(&parent_stateinfo.1) + .copied() + .collect(); + + let statediffremoved: HashSet<_> = parent_stateinfo + .1 + .difference(&state_ids_compressed) + .copied() + .collect(); + + (statediffnew, statediffremoved) + } else { + (state_ids_compressed, HashSet::new()) + }; + self.save_state_from_diff( + shortstatehash, + statediffnew, + statediffremoved, + 1_000_000, // high number because no state will be based on this one + states_parents, + )?; + } + + self.shorteventid_shortstatehash + .insert(&shorteventid.to_be_bytes(), &shortstatehash.to_be_bytes())?; + + Ok(()) + } + + /// Generates a new StateHash and associates it with the incoming event. + /// + /// This adds all current state events (not including the incoming event) + /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. + #[tracing::instrument(skip(self, new_pdu, globals))] + pub fn append_to_state( + &self, + new_pdu: &PduEvent, + globals: &super::globals::Globals, + ) -> Result { + let shorteventid = self.get_or_create_shorteventid(&new_pdu.event_id, globals)?; + + let previous_shortstatehash = self.current_shortstatehash(&new_pdu.room_id)?; + + if let Some(p) = previous_shortstatehash { + self.shorteventid_shortstatehash + .insert(&shorteventid.to_be_bytes(), &p.to_be_bytes())?; + } + + if let Some(state_key) = &new_pdu.state_key { + let states_parents = previous_shortstatehash + .map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?; + + let shortstatekey = self.get_or_create_shortstatekey( + &new_pdu.kind.to_string().into(), + state_key, + globals, + )?; + + let new = self.compress_state_event(shortstatekey, &new_pdu.event_id, globals)?; + + let replaces = states_parents + .last() + .map(|info| { + info.1 + .iter() + .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) + }) + .unwrap_or_default(); + + if Some(&new) == replaces { + return Ok(previous_shortstatehash.expect("must exist")); + } + + // TODO: statehash with deterministic inputs + let shortstatehash = globals.next_count()?; + + let mut statediffnew = HashSet::new(); + statediffnew.insert(new); + + let mut statediffremoved = HashSet::new(); + if let Some(replaces) = replaces { + statediffremoved.insert(*replaces); + } + + self.save_state_from_diff( + shortstatehash, + statediffnew, + statediffremoved, + 2, + states_parents, + )?; + + Ok(shortstatehash) + } else { + Ok(previous_shortstatehash.expect("first event in room must be a state event")) + } + } + + #[tracing::instrument(skip(self, invite_event))] + pub fn calculate_invite_state( + &self, + invite_event: &PduEvent, + ) -> Result>> { + let mut state = Vec::new(); + // Add recommended events + if let Some(e) = + self.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")? + { + state.push(e.to_stripped_state_event()); + } + if let Some(e) = + self.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")? + { + state.push(e.to_stripped_state_event()); + } + if let Some(e) = self.room_state_get( + &invite_event.room_id, + &StateEventType::RoomCanonicalAlias, + "", + )? { + state.push(e.to_stripped_state_event()); + } + if let Some(e) = + self.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")? + { + state.push(e.to_stripped_state_event()); + } + if let Some(e) = + self.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")? + { + state.push(e.to_stripped_state_event()); + } + if let Some(e) = self.room_state_get( + &invite_event.room_id, + &StateEventType::RoomMember, + invite_event.sender.as_str(), + )? { + state.push(e.to_stripped_state_event()); + } + + state.push(invite_event.to_stripped_state_event()); + Ok(state) + } + + #[tracing::instrument(skip(self))] + pub fn set_room_state(&self, room_id: &RoomId, shortstatehash: u64) -> Result<()> { + self.roomid_shortstatehash + .insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?; + + Ok(()) + } +} From 9e1ab74bb438c60a8ccb90af98af36f1da4fb3df Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:58 +0200 Subject: [PATCH 07/18] refactor: prepare src/database/key_value.rs from src/service/rooms/directory/mod.rs --- src/{service/rooms/directory/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/directory/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/directory/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/directory/mod.rs rename to src/database/key_value.rs From a563b1ba9a9e60ab0d4c6c7b787b3855e048647e Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:58 +0200 Subject: [PATCH 08/18] refactor: prepare src/database/key_value.rs from src/service/rooms/edus/mod.rs --- src/{service/rooms/edus/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/edus/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/edus/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/edus/mod.rs rename to src/database/key_value.rs From 0071a9cbf4bef95b7125d11d7950dfee991cd625 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:58 +0200 Subject: [PATCH 09/18] refactor: restore src/service/rooms/directory/mod.rs --- src/service/rooms/directory/mod.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 src/service/rooms/directory/mod.rs diff --git a/src/service/rooms/directory/mod.rs b/src/service/rooms/directory/mod.rs new file mode 100644 index 00000000..8be7bd57 --- /dev/null +++ b/src/service/rooms/directory/mod.rs @@ -0,0 +1,29 @@ + + #[tracing::instrument(skip(self))] + pub fn set_public(&self, room_id: &RoomId, public: bool) -> Result<()> { + if public { + self.publicroomids.insert(room_id.as_bytes(), &[])?; + } else { + self.publicroomids.remove(room_id.as_bytes())?; + } + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn is_public_room(&self, room_id: &RoomId) -> Result { + Ok(self.publicroomids.get(room_id.as_bytes())?.is_some()) + } + + #[tracing::instrument(skip(self))] + pub fn public_rooms(&self) -> impl Iterator>> + '_ { + self.publicroomids.iter().map(|(bytes, _)| { + RoomId::parse( + utils::string_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Room ID in publicroomids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in publicroomids is invalid.")) + }) + } + From 85e571baddb6e5ac7e3bc81ac94bce48a0d14981 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:59 +0200 Subject: [PATCH 10/18] refactor: prepare src/database/key_value.rs from src/service/rooms/lazy_loading/mod.rs --- src/{service/rooms/lazy_loading/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/lazy_loading/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/lazy_loading/mod.rs rename to src/database/key_value.rs From 931c8ece4a8c984effc8ff87b9228b237611a7dc Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:59 +0200 Subject: [PATCH 11/18] refactor: prepare src/database/key_value.rs from src/service/rooms/metadata/mod.rs --- src/{service/rooms/metadata/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/metadata/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/metadata/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/metadata/mod.rs rename to src/database/key_value.rs From 06bfddf0daed72d1c0a408faa565091ad6208ffb Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:59 +0200 Subject: [PATCH 12/18] refactor: restore src/service/rooms/lazy_loading/mod.rs --- src/service/rooms/lazy_loading/mod.rs | 91 +++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 src/service/rooms/lazy_loading/mod.rs diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs new file mode 100644 index 00000000..a402702a --- /dev/null +++ b/src/service/rooms/lazy_loading/mod.rs @@ -0,0 +1,91 @@ + + #[tracing::instrument(skip(self))] + pub fn lazy_load_was_sent_before( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + ll_user: &UserId, + ) -> Result { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(room_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(ll_user.as_bytes()); + Ok(self.lazyloadedids.get(&key)?.is_some()) + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_mark_sent( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + lazy_load: HashSet>, + count: u64, + ) { + self.lazy_load_waiting.lock().unwrap().insert( + ( + user_id.to_owned(), + device_id.to_owned(), + room_id.to_owned(), + count, + ), + lazy_load, + ); + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_confirm_delivery( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + since: u64, + ) -> Result<()> { + if let Some(user_ids) = self.lazy_load_waiting.lock().unwrap().remove(&( + user_id.to_owned(), + device_id.to_owned(), + room_id.to_owned(), + since, + )) { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(device_id.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(room_id.as_bytes()); + prefix.push(0xff); + + for ll_id in user_ids { + let mut key = prefix.clone(); + key.extend_from_slice(ll_id.as_bytes()); + self.lazyloadedids.insert(&key, &[])?; + } + } + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn lazy_load_reset( + &self, + user_id: &UserId, + device_id: &DeviceId, + room_id: &RoomId, + ) -> Result<()> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(device_id.as_bytes()); + prefix.push(0xff); + prefix.extend_from_slice(room_id.as_bytes()); + prefix.push(0xff); + + for (key, _) in self.lazyloadedids.scan_prefix(prefix) { + self.lazyloadedids.remove(&key)?; + } + + Ok(()) + } + From 42fe118cbe608e0b39a589b9192c47f77b2a313f Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:46:59 +0200 Subject: [PATCH 13/18] refactor: restore src/service/rooms/edus/mod.rs --- src/service/rooms/edus/mod.rs | 550 ++++++++++++++++++++++++++++++++++ 1 file changed, 550 insertions(+) create mode 100644 src/service/rooms/edus/mod.rs diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs new file mode 100644 index 00000000..118efd4c --- /dev/null +++ b/src/service/rooms/edus/mod.rs @@ -0,0 +1,550 @@ +use crate::{database::abstraction::Tree, utils, Error, Result}; +use ruma::{ + events::{ + presence::{PresenceEvent, PresenceEventContent}, + receipt::ReceiptEvent, + SyncEphemeralRoomEvent, + }, + presence::PresenceState, + serde::Raw, + signatures::CanonicalJsonObject, + RoomId, UInt, UserId, +}; +use std::{ + collections::{HashMap, HashSet}, + mem, + sync::Arc, +}; + +pub struct RoomEdus { + pub(in super::super) readreceiptid_readreceipt: Arc, // ReadReceiptId = RoomId + Count + UserId + pub(in super::super) roomuserid_privateread: Arc, // RoomUserId = Room + User, PrivateRead = Count + pub(in super::super) roomuserid_lastprivatereadupdate: Arc, // LastPrivateReadUpdate = Count + pub(in super::super) typingid_userid: Arc, // TypingId = RoomId + TimeoutTime + Count + pub(in super::super) roomid_lasttypingupdate: Arc, // LastRoomTypingUpdate = Count + pub(in super::super) presenceid_presence: Arc, // PresenceId = RoomId + Count + UserId + pub(in super::super) userid_lastpresenceupdate: Arc, // LastPresenceUpdate = Count +} + +impl RoomEdus { + /// Adds an event which will be saved until a new event replaces it (e.g. read receipt). + pub fn readreceipt_update( + &self, + user_id: &UserId, + room_id: &RoomId, + event: ReceiptEvent, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut last_possible_key = prefix.clone(); + last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes()); + + // Remove old entry + if let Some((old, _)) = self + .readreceiptid_readreceipt + .iter_from(&last_possible_key, true) + .take_while(|(key, _)| key.starts_with(&prefix)) + .find(|(key, _)| { + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element") + == user_id.as_bytes() + }) + { + // This is the old room_latest + self.readreceiptid_readreceipt.remove(&old)?; + } + + let mut room_latest_id = prefix; + room_latest_id.extend_from_slice(&globals.next_count()?.to_be_bytes()); + room_latest_id.push(0xff); + room_latest_id.extend_from_slice(user_id.as_bytes()); + + self.readreceiptid_readreceipt.insert( + &room_latest_id, + &serde_json::to_vec(&event).expect("EduEvent::to_string always works"), + )?; + + Ok(()) + } + + /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. + #[tracing::instrument(skip(self))] + pub fn readreceipts_since<'a>( + &'a self, + room_id: &RoomId, + since: u64, + ) -> impl Iterator< + Item = Result<( + Box, + u64, + Raw, + )>, + > + 'a { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + let prefix2 = prefix.clone(); + + let mut first_possible_edu = prefix.clone(); + first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since + + self.readreceiptid_readreceipt + .iter_from(&first_possible_edu, false) + .take_while(move |(k, _)| k.starts_with(&prefix2)) + .map(move |(k, v)| { + let count = + utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::()]) + .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; + let user_id = UserId::parse( + utils::string_from_bytes(&k[prefix.len() + mem::size_of::() + 1..]) + .map_err(|_| { + Error::bad_database("Invalid readreceiptid userid bytes in db.") + })?, + ) + .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; + + let mut json = serde_json::from_slice::(&v).map_err(|_| { + Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") + })?; + json.remove("room_id"); + + Ok(( + user_id, + count, + Raw::from_json( + serde_json::value::to_raw_value(&json).expect("json is valid raw value"), + ), + )) + }) + } + + /// Sets a private read marker at `count`. + #[tracing::instrument(skip(self, globals))] + pub fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_privateread + .insert(&key, &count.to_be_bytes())?; + + self.roomuserid_lastprivatereadupdate + .insert(&key, &globals.next_count()?.to_be_bytes())?; + + Ok(()) + } + + /// Returns the private read marker. + #[tracing::instrument(skip(self))] + pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_privateread + .get(&key)? + .map_or(Ok(None), |v| { + Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { + Error::bad_database("Invalid private read marker bytes") + })?)) + }) + } + + /// Returns the count of the last typing update in this room. + pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + Ok(self + .roomuserid_lastprivatereadupdate + .get(&key)? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") + }) + }) + .transpose()? + .unwrap_or(0)) + } + + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is + /// called. + pub fn typing_add( + &self, + user_id: &UserId, + room_id: &RoomId, + timeout: u64, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let count = globals.next_count()?.to_be_bytes(); + + let mut room_typing_id = prefix; + room_typing_id.extend_from_slice(&timeout.to_be_bytes()); + room_typing_id.push(0xff); + room_typing_id.extend_from_slice(&count); + + self.typingid_userid + .insert(&room_typing_id, &*user_id.as_bytes())?; + + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &count)?; + + Ok(()) + } + + /// Removes a user from typing before the timeout is reached. + pub fn typing_remove( + &self, + user_id: &UserId, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let user_id = user_id.to_string(); + + let mut found_outdated = false; + + // Maybe there are multiple ones from calling roomtyping_add multiple times + for outdated_edu in self + .typingid_userid + .scan_prefix(prefix) + .filter(|(_, v)| &**v == user_id.as_bytes()) + { + self.typingid_userid.remove(&outdated_edu.0)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; + } + + Ok(()) + } + + /// Makes sure that typing events with old timestamps get removed. + fn typings_maintain( + &self, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let current_timestamp = utils::millis_since_unix_epoch(); + + let mut found_outdated = false; + + // Find all outdated edus before inserting a new one + for outdated_edu in self + .typingid_userid + .scan_prefix(prefix) + .map(|(key, _)| { + Ok::<_, Error>(( + key.clone(), + utils::u64_from_bytes( + &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { + Error::bad_database("RoomTyping has invalid timestamp or delimiters.") + })?[0..mem::size_of::()], + ) + .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, + )) + }) + .filter_map(|r| r.ok()) + .take_while(|&(_, timestamp)| timestamp < current_timestamp) + { + // This is an outdated edu (time > timestamp) + self.typingid_userid.remove(&outdated_edu.0)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lasttypingupdate + .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; + } + + Ok(()) + } + + /// Returns the count of the last typing update in this room. + #[tracing::instrument(skip(self, globals))] + pub fn last_typing_update( + &self, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result { + self.typings_maintain(room_id, globals)?; + + Ok(self + .roomid_lasttypingupdate + .get(room_id.as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.") + }) + }) + .transpose()? + .unwrap_or(0)) + } + + pub fn typings_all( + &self, + room_id: &RoomId, + ) -> Result> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut user_ids = HashSet::new(); + + for (_, user_id) in self.typingid_userid.scan_prefix(prefix) { + let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { + Error::bad_database("User ID in typingid_userid is invalid unicode.") + })?) + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + + user_ids.insert(user_id); + } + + Ok(SyncEphemeralRoomEvent { + content: ruma::events::typing::TypingEventContent { + user_ids: user_ids.into_iter().collect(), + }, + }) + } + + /// Adds a presence event which will be saved until a new event replaces it. + /// + /// Note: This method takes a RoomId because presence updates are always bound to rooms to + /// make sure users outside these rooms can't see them. + pub fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: PresenceEvent, + globals: &super::super::globals::Globals, + ) -> Result<()> { + // TODO: Remove old entry? Or maybe just wipe completely from time to time? + + let count = globals.next_count()?.to_be_bytes(); + + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(presence.sender.as_bytes()); + + self.presenceid_presence.insert( + &presence_id, + &serde_json::to_vec(&presence).expect("PresenceEvent can be serialized"), + )?; + + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + /// Resets the presence timeout, so the user will stay in their current presence state. + #[tracing::instrument(skip(self))] + pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. + pub fn last_presence_update(&self, user_id: &UserId) -> Result> { + self.userid_lastpresenceupdate + .get(user_id.as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + }) + .transpose() + } + + pub fn get_last_presence_event( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result> { + let last_update = match self.last_presence_update(user_id)? { + Some(last) => last, + None => return Ok(None), + }; + + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&last_update.to_be_bytes()); + presence_id.push(0xff); + presence_id.extend_from_slice(user_id.as_bytes()); + + self.presenceid_presence + .get(&presence_id)? + .map(|value| { + let mut presence: PresenceEvent = serde_json::from_slice(&value) + .map_err(|_| Error::bad_database("Invalid presence event in db."))?; + let current_timestamp: UInt = utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"); + + if presence.content.presence == PresenceState::Online { + // Don't set last_active_ago when the user is online + presence.content.last_active_ago = None; + } else { + // Convert from timestamp to duration + presence.content.last_active_ago = presence + .content + .last_active_ago + .map(|timestamp| current_timestamp - timestamp); + } + + Ok(presence) + }) + .transpose() + } + + /// Sets all users to offline who have been quiet for too long. + fn _presence_maintain( + &self, + rooms: &super::Rooms, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let current_timestamp = utils::millis_since_unix_epoch(); + + for (user_id_bytes, last_timestamp) in self + .userid_lastpresenceupdate + .iter() + .filter_map(|(k, bytes)| { + Some(( + k, + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + .ok()?, + )) + }) + .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) + // 5 Minutes + { + // Send new presence events to set the user offline + let count = globals.next_count()?.to_be_bytes(); + let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) + .map_err(|_| { + Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") + })? + .try_into() + .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; + for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(&user_id_bytes); + + self.presenceid_presence.insert( + &presence_id, + &serde_json::to_vec(&PresenceEvent { + content: PresenceEventContent { + avatar_url: None, + currently_active: None, + displayname: None, + last_active_ago: Some( + last_timestamp.try_into().expect("time is valid"), + ), + presence: PresenceState::Offline, + status_msg: None, + }, + sender: user_id.to_owned(), + }) + .expect("PresenceEvent can be serialized"), + )?; + } + + self.userid_lastpresenceupdate.insert( + user_id.as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + } + + Ok(()) + } + + /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. + #[tracing::instrument(skip(self, since, _rooms, _globals))] + pub fn presence_since( + &self, + room_id: &RoomId, + since: u64, + _rooms: &super::Rooms, + _globals: &super::super::globals::Globals, + ) -> Result, PresenceEvent>> { + //self.presence_maintain(rooms, globals)?; + + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + let mut first_possible_edu = prefix.clone(); + first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since + let mut hashmap = HashMap::new(); + + for (key, value) in self + .presenceid_presence + .iter_from(&*first_possible_edu, false) + .take_while(|(key, _)| key.starts_with(&prefix)) + { + let user_id = UserId::parse( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, + ) + .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; + + let mut presence: PresenceEvent = serde_json::from_slice(&value) + .map_err(|_| Error::bad_database("Invalid presence event in db."))?; + + let current_timestamp: UInt = utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"); + + if presence.content.presence == PresenceState::Online { + // Don't set last_active_ago when the user is online + presence.content.last_active_ago = None; + } else { + // Convert from timestamp to duration + presence.content.last_active_ago = presence + .content + .last_active_ago + .map(|timestamp| current_timestamp - timestamp); + } + + hashmap.insert(user_id, presence); + } + + Ok(hashmap) + } +} From 715b30a2b5ea9f2511e157ac23ced07183f25a85 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:47:00 +0200 Subject: [PATCH 14/18] refactor: prepare src/database/key_value.rs from src/service/rooms/outlier/mod.rs --- src/{service/rooms/outlier/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/outlier/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/outlier/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/outlier/mod.rs rename to src/database/key_value.rs From daa969508fe17a7d87bdfff5c8163aae469545af Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:47:00 +0200 Subject: [PATCH 15/18] refactor: restore src/service/rooms/outlier/mod.rs --- src/service/rooms/outlier/mod.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 src/service/rooms/outlier/mod.rs diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs new file mode 100644 index 00000000..340e93e4 --- /dev/null +++ b/src/service/rooms/outlier/mod.rs @@ -0,0 +1,27 @@ + /// Returns the pdu from the outlier tree. + pub fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result> { + self.eventid_outlierpdu + .get(event_id.as_bytes())? + .map_or(Ok(None), |pdu| { + serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + } + + /// Returns the pdu from the outlier tree. + pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result> { + self.eventid_outlierpdu + .get(event_id.as_bytes())? + .map_or(Ok(None), |pdu| { + serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) + }) + } + + /// Append the PDU as an outlier. + #[tracing::instrument(skip(self, pdu))] + pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> { + self.eventid_outlierpdu.insert( + event_id.as_bytes(), + &serde_json::to_vec(&pdu).expect("CanonicalJsonObject is valid"), + ) + } + From 0ce4446b1ab264378fb96029d420391aefdfdb91 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:47:00 +0200 Subject: [PATCH 16/18] refactor: restore src/service/rooms/metadata/mod.rs --- src/service/rooms/metadata/mod.rs | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/service/rooms/metadata/mod.rs diff --git a/src/service/rooms/metadata/mod.rs b/src/service/rooms/metadata/mod.rs new file mode 100644 index 00000000..5d703451 --- /dev/null +++ b/src/service/rooms/metadata/mod.rs @@ -0,0 +1,44 @@ + /// Checks if a room exists. + #[tracing::instrument(skip(self))] + pub fn exists(&self, room_id: &RoomId) -> Result { + let prefix = match self.get_shortroomid(room_id)? { + Some(b) => b.to_be_bytes().to_vec(), + None => return Ok(false), + }; + + // Look for PDUs in that room. + Ok(self + .pduid_pdu + .iter_from(&prefix, false) + .next() + .filter(|(k, _)| k.starts_with(&prefix)) + .is_some()) + } + + pub fn get_shortroomid(&self, room_id: &RoomId) -> Result> { + self.roomid_shortroomid + .get(room_id.as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid shortroomid in db.")) + }) + .transpose() + } + + pub fn get_or_create_shortroomid( + &self, + room_id: &RoomId, + globals: &super::globals::Globals, + ) -> Result { + Ok(match self.roomid_shortroomid.get(room_id.as_bytes())? { + Some(short) => utils::u64_from_bytes(&short) + .map_err(|_| Error::bad_database("Invalid shortroomid in db."))?, + None => { + let short = globals.next_count()?; + self.roomid_shortroomid + .insert(room_id.as_bytes(), &short.to_be_bytes())?; + short + } + }) + } + From 1ccc226c6b195d84c14e1fdf5e0ade5feefb6872 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:47:01 +0200 Subject: [PATCH 17/18] refactor: prepare src/database/key_value.rs from src/service/rooms/pdu_metadata/mod.rs --- src/{service/rooms/pdu_metadata/mod.rs => database/key_value.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/{service/rooms/pdu_metadata/mod.rs => database/key_value.rs} (100%) diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/database/key_value.rs similarity index 100% rename from src/service/rooms/pdu_metadata/mod.rs rename to src/database/key_value.rs From 81ac01c2f56669521fa55409efbeba6785230239 Mon Sep 17 00:00:00 2001 From: Nyaaori <+@nyaaori.cat> Date: Mon, 15 Aug 2022 18:47:01 +0200 Subject: [PATCH 18/18] refactor: restore src/service/rooms/pdu_metadata/mod.rs --- src/service/rooms/pdu_metadata/mod.rs | 31 +++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/service/rooms/pdu_metadata/mod.rs diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs new file mode 100644 index 00000000..f8ffcee1 --- /dev/null +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -0,0 +1,31 @@ + + #[tracing::instrument(skip(self, room_id, event_ids))] + pub fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc]) -> Result<()> { + for prev in event_ids { + let mut key = room_id.as_bytes().to_vec(); + key.extend_from_slice(prev.as_bytes()); + self.referencedevents.insert(&key, &[])?; + } + + Ok(()) + } + + #[tracing::instrument(skip(self))] + pub fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result { + let mut key = room_id.as_bytes().to_vec(); + key.extend_from_slice(event_id.as_bytes()); + Ok(self.referencedevents.get(&key)?.is_some()) + } + + #[tracing::instrument(skip(self))] + pub fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()> { + self.softfailedeventids.insert(event_id.as_bytes(), &[]) + } + + #[tracing::instrument(skip(self))] + pub fn is_event_soft_failed(&self, event_id: &EventId) -> Result { + self.softfailedeventids + .get(event_id.as_bytes()) + .map(|o| o.is_some()) + } +