diff --git a/src/database/key_value.rs b/src/database/key_value.rs index 34916e4b..0be13115 100644 --- a/src/database/key_value.rs +++ b/src/database/key_value.rs @@ -156,7 +156,7 @@ impl service::room::directory::Data for KeyValueDatabase { } } -impl service::room::edus::Data for KeyValueDatabase { +impl service::room::edus::read_receipt::Data for KeyValueDatabase { fn readreceipt_update( &self, user_id: &UserId, @@ -203,7 +203,7 @@ impl service::room::edus::Data for KeyValueDatabase { room_id: &RoomId, since: u64, ) -> impl Iterator< - Item = Result<( + Item=Result<( Box, u64, Raw, @@ -229,7 +229,7 @@ impl service::room::edus::Data for KeyValueDatabase { Error::bad_database("Invalid readreceiptid userid bytes in db.") })?, ) - .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; + .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; let mut json = serde_json::from_slice::(&v).map_err(|_| { Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") @@ -293,7 +293,9 @@ impl service::room::edus::Data for KeyValueDatabase { .transpose()? .unwrap_or(0)) } +} +impl service::room::edus::typing::Data for KeyValueDatabase { fn typing_add( &self, user_id: &UserId, @@ -379,14 +381,16 @@ impl service::room::edus::Data for KeyValueDatabase { let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; user_ids.insert(user_id); } Ok(user_ids) } +} +impl service::room::edus::presence::Data for KeyValueDatabase { fn update_presence( &self, user_id: &UserId, @@ -416,7 +420,7 @@ impl service::room::edus::Data for KeyValueDatabase { Ok(()) } - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + fn ping_presence(&self, user_id: &UserId) -> Result<()> { self.userid_lastpresenceupdate.insert( user_id.as_bytes(), &utils::millis_since_unix_epoch().to_be_bytes(), diff --git a/src/service/rooms/edus/mod.rs b/src/service/rooms/edus/mod.rs index 06adf57e..d8ce5300 100644 --- a/src/service/rooms/edus/mod.rs +++ b/src/service/rooms/edus/mod.rs @@ -1,256 +1,3 @@ -mod data; -pub use data::Data; - -use crate::service::*; - -pub struct Service { - db: D, -} - -impl Service<_> { - /// Replaces the previous read receipt. - pub fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()> { - self.db.readreceipt_update(user_id, room_id, event); - } - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - #[tracing::instrument(skip(self))] - pub fn readreceipts_since<'a>( - &'a self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a { - self.db.readreceipts_since(room_id, since) - } - - /// Sets a private read marker at `count`. - #[tracing::instrument(skip(self, globals))] - pub fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) - } - - /// Returns the private read marker. - #[tracing::instrument(skip(self))] - pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - self.db.private_read_get(room_id, user_id) - } - - /// Returns the count of the last typing update in this room. - pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { - self.db.last_privateread_update(user_id, room_id) - } - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - pub fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) - } - - /// Removes a user from typing before the timeout is reached. - pub fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { - self.db.typing_remove(user_id, room_id) - } - - /* TODO: Do this in background thread? - /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain( - &self, - room_id: &RoomId, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomTyping has invalid timestamp or delimiters.") - })?[0..mem::size_of::()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(|r| r.ok()) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - */ - - /// Returns the count of the last typing update in this room. - #[tracing::instrument(skip(self, globals))] - pub fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result { - self.db.last_typing_update(room_id) - } - - /// Returns a new typing EDU. - pub fn typings_all( - &self, - room_id: &RoomId, - ) -> Result> { - let user_ids = self.db.typings_all(room_id)?; - - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), - }, - }) - } - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - pub fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()> { - self.db.update_presence(user_id, room_id, presence) - } - - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.db.ping_presence(user_id) - } - - pub fn get_last_presence_event( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result> { - let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, - None => return Ok(None), - }; - - self.db.get_presence_event(room_id, user_id, last_update) - } - - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - - /// Returns the most recent presence updates that happened after the event with id `since`. - #[tracing::instrument(skip(self, since, _rooms, _globals))] - pub fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>> { - self.db.presence_since(room_id, since) - } -} +pub mod presence; +pub mod read_receipt; +pub mod typing; diff --git a/src/service/rooms/edus/presence/data.rs b/src/service/rooms/edus/presence/data.rs index 16c14cf3..de72e219 100644 --- a/src/service/rooms/edus/presence/data.rs +++ b/src/service/rooms/edus/presence/data.rs @@ -1,67 +1,4 @@ pub trait Data { - /// Replaces the previous read receipt. - fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()>; - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - fn readreceipts_since( - &self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - >; - - /// Sets a private read marker at `count`. - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()>; - - /// Returns the private read marker. - fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; - - /// Returns the count of the last typing update in this room. - fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()>; - - /// Removes a user from typing before the timeout is reached. - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()>; - - /// Returns the count of the last typing update in this room. - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result; - - /// Returns all user ids currently typing. - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result>; - /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to @@ -80,7 +17,12 @@ pub trait Data { fn last_presence_update(&self, user_id: &UserId) -> Result>; /// Returns the presence event with correct last_active_ago. - fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result>; + fn get_presence_event( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + ) -> Result>; /// Returns the most recent presence updates that happened after the event with id `since`. fn presence_since( diff --git a/src/service/rooms/edus/presence/mod.rs b/src/service/rooms/edus/presence/mod.rs index 06adf57e..5793a799 100644 --- a/src/service/rooms/edus/presence/mod.rs +++ b/src/service/rooms/edus/presence/mod.rs @@ -8,143 +8,6 @@ pub struct Service { } impl Service<_> { - /// Replaces the previous read receipt. - pub fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()> { - self.db.readreceipt_update(user_id, room_id, event); - } - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - #[tracing::instrument(skip(self))] - pub fn readreceipts_since<'a>( - &'a self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a { - self.db.readreceipts_since(room_id, since) - } - - /// Sets a private read marker at `count`. - #[tracing::instrument(skip(self, globals))] - pub fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) - } - - /// Returns the private read marker. - #[tracing::instrument(skip(self))] - pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - self.db.private_read_get(room_id, user_id) - } - - /// Returns the count of the last typing update in this room. - pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { - self.db.last_privateread_update(user_id, room_id) - } - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - pub fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) - } - - /// Removes a user from typing before the timeout is reached. - pub fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { - self.db.typing_remove(user_id, room_id) - } - - /* TODO: Do this in background thread? - /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain( - &self, - room_id: &RoomId, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomTyping has invalid timestamp or delimiters.") - })?[0..mem::size_of::()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(|r| r.ok()) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - */ - - /// Returns the count of the last typing update in this room. - #[tracing::instrument(skip(self, globals))] - pub fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result { - self.db.last_typing_update(room_id) - } - - /// Returns a new typing EDU. - pub fn typings_all( - &self, - room_id: &RoomId, - ) -> Result> { - let user_ids = self.db.typings_all(room_id)?; - - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), - }, - }) - } - /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to diff --git a/src/service/rooms/edus/read_receipt/data.rs b/src/service/rooms/edus/read_receipt/data.rs index 16c14cf3..4befcf2c 100644 --- a/src/service/rooms/edus/read_receipt/data.rs +++ b/src/service/rooms/edus/read_receipt/data.rs @@ -21,71 +21,11 @@ pub trait Data { >; /// Sets a private read marker at `count`. - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()>; + fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>; /// Returns the private read marker. fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; /// Returns the count of the last typing update in this room. fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()>; - - /// Removes a user from typing before the timeout is reached. - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()>; - - /// Returns the count of the last typing update in this room. - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result; - - /// Returns all user ids currently typing. - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result>; - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()>; - - /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; - - /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; - - /// Returns the presence event with correct last_active_ago. - fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result>; - - /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>>; } diff --git a/src/service/rooms/edus/read_receipt/mod.rs b/src/service/rooms/edus/read_receipt/mod.rs index 06adf57e..9cd474fb 100644 --- a/src/service/rooms/edus/read_receipt/mod.rs +++ b/src/service/rooms/edus/read_receipt/mod.rs @@ -36,12 +36,7 @@ impl Service<_> { /// Sets a private read marker at `count`. #[tracing::instrument(skip(self, globals))] - pub fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { + pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { self.db.private_read_set(room_id, user_id, count) } @@ -55,202 +50,4 @@ impl Service<_> { pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { self.db.last_privateread_update(user_id, room_id) } - - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is - /// called. - pub fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { - self.db.typing_add(user_id, room_id, timeout) - } - - /// Removes a user from typing before the timeout is reached. - pub fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { - self.db.typing_remove(user_id, room_id) - } - - /* TODO: Do this in background thread? - /// Makes sure that typing events with old timestamps get removed. - fn typings_maintain( - &self, - room_id: &RoomId, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let mut prefix = room_id.as_bytes().to_vec(); - prefix.push(0xff); - - let current_timestamp = utils::millis_since_unix_epoch(); - - let mut found_outdated = false; - - // Find all outdated edus before inserting a new one - for outdated_edu in self - .typingid_userid - .scan_prefix(prefix) - .map(|(key, _)| { - Ok::<_, Error>(( - key.clone(), - utils::u64_from_bytes( - &key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomTyping has invalid timestamp or delimiters.") - })?[0..mem::size_of::()], - ) - .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, - )) - }) - .filter_map(|r| r.ok()) - .take_while(|&(_, timestamp)| timestamp < current_timestamp) - { - // This is an outdated edu (time > timestamp) - self.typingid_userid.remove(&outdated_edu.0)?; - found_outdated = true; - } - - if found_outdated { - self.roomid_lasttypingupdate - .insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?; - } - - Ok(()) - } - */ - - /// Returns the count of the last typing update in this room. - #[tracing::instrument(skip(self, globals))] - pub fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result { - self.db.last_typing_update(room_id) - } - - /// Returns a new typing EDU. - pub fn typings_all( - &self, - room_id: &RoomId, - ) -> Result> { - let user_ids = self.db.typings_all(room_id)?; - - Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { - user_ids: user_ids.into_iter().collect(), - }, - }) - } - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - pub fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()> { - self.db.update_presence(user_id, room_id, presence) - } - - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.db.ping_presence(user_id) - } - - pub fn get_last_presence_event( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result> { - let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, - None => return Ok(None), - }; - - self.db.get_presence_event(room_id, user_id, last_update) - } - - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - - /// Returns the most recent presence updates that happened after the event with id `since`. - #[tracing::instrument(skip(self, since, _rooms, _globals))] - pub fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>> { - self.db.presence_since(room_id, since) - } } diff --git a/src/service/rooms/edus/typing/data.rs b/src/service/rooms/edus/typing/data.rs index 16c14cf3..83ff90ea 100644 --- a/src/service/rooms/edus/typing/data.rs +++ b/src/service/rooms/edus/typing/data.rs @@ -1,91 +1,14 @@ pub trait Data { - /// Replaces the previous read receipt. - fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()>; - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - fn readreceipts_since( - &self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - >; - - /// Sets a private read marker at `count`. - fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()>; - - /// Returns the private read marker. - fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result>; - - /// Returns the count of the last typing update in this room. - fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result; - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// called. - fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()>; + fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()>; /// Removes a user from typing before the timeout is reached. - fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()>; + fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>; /// Returns the count of the last typing update in this room. - fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result; + fn last_typing_update(&self, room_id: &RoomId) -> Result; /// Returns all user ids currently typing. - fn typings_all( - &self, - room_id: &RoomId, - ) -> Result>; - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()>; - - /// Resets the presence timeout, so the user will stay in their current presence state. - fn ping_presence(&self, user_id: &UserId) -> Result<()>; - - /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. - fn last_presence_update(&self, user_id: &UserId) -> Result>; - - /// Returns the presence event with correct last_active_ago. - fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result>; - - /// Returns the most recent presence updates that happened after the event with id `since`. - fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>>; + fn typings_all(&self, room_id: &RoomId) -> Result>; } diff --git a/src/service/rooms/edus/typing/mod.rs b/src/service/rooms/edus/typing/mod.rs index 06adf57e..b29c7888 100644 --- a/src/service/rooms/edus/typing/mod.rs +++ b/src/service/rooms/edus/typing/mod.rs @@ -8,71 +8,14 @@ pub struct Service { } impl Service<_> { - /// Replaces the previous read receipt. - pub fn readreceipt_update( - &self, - user_id: &UserId, - room_id: &RoomId, - event: ReceiptEvent, - ) -> Result<()> { - self.db.readreceipt_update(user_id, room_id, event); - } - - /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - #[tracing::instrument(skip(self))] - pub fn readreceipts_since<'a>( - &'a self, - room_id: &RoomId, - since: u64, - ) -> impl Iterator< - Item = Result<( - Box, - u64, - Raw, - )>, - > + 'a { - self.db.readreceipts_since(room_id, since) - } - - /// Sets a private read marker at `count`. - #[tracing::instrument(skip(self, globals))] - pub fn private_read_set( - &self, - room_id: &RoomId, - user_id: &UserId, - count: u64, - ) -> Result<()> { - self.db.private_read_set(room_id, user_id, count) - } - - /// Returns the private read marker. - #[tracing::instrument(skip(self))] - pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - self.db.private_read_get(room_id, user_id) - } - - /// Returns the count of the last typing update in this room. - pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { - self.db.last_privateread_update(user_id, room_id) - } - /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// called. - pub fn typing_add( - &self, - user_id: &UserId, - room_id: &RoomId, - timeout: u64, - ) -> Result<()> { + pub fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> { self.db.typing_add(user_id, room_id, timeout) } /// Removes a user from typing before the timeout is reached. - pub fn typing_remove( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result<()> { + pub fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { self.db.typing_remove(user_id, room_id) } @@ -124,10 +67,7 @@ impl Service<_> { /// Returns the count of the last typing update in this room. #[tracing::instrument(skip(self, globals))] - pub fn last_typing_update( - &self, - room_id: &RoomId, - ) -> Result { + pub fn last_typing_update(&self, room_id: &RoomId) -> Result { self.db.last_typing_update(room_id) } @@ -144,113 +84,4 @@ impl Service<_> { }, }) } - - /// Adds a presence event which will be saved until a new event replaces it. - /// - /// Note: This method takes a RoomId because presence updates are always bound to rooms to - /// make sure users outside these rooms can't see them. - pub fn update_presence( - &self, - user_id: &UserId, - room_id: &RoomId, - presence: PresenceEvent, - ) -> Result<()> { - self.db.update_presence(user_id, room_id, presence) - } - - /// Resets the presence timeout, so the user will stay in their current presence state. - pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { - self.db.ping_presence(user_id) - } - - pub fn get_last_presence_event( - &self, - user_id: &UserId, - room_id: &RoomId, - ) -> Result> { - let last_update = match self.db.last_presence_update(user_id)? { - Some(last) => last, - None => return Ok(None), - }; - - self.db.get_presence_event(room_id, user_id, last_update) - } - - /* TODO - /// Sets all users to offline who have been quiet for too long. - fn _presence_maintain( - &self, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, - ) -> Result<()> { - let current_timestamp = utils::millis_since_unix_epoch(); - - for (user_id_bytes, last_timestamp) in self - .userid_lastpresenceupdate - .iter() - .filter_map(|(k, bytes)| { - Some(( - k, - utils::u64_from_bytes(&bytes) - .map_err(|_| { - Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") - }) - .ok()?, - )) - }) - .take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000) - // 5 Minutes - { - // Send new presence events to set the user offline - let count = globals.next_count()?.to_be_bytes(); - let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes) - .map_err(|_| { - Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") - })? - .try_into() - .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; - for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { - let mut presence_id = room_id.as_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&count); - presence_id.push(0xff); - presence_id.extend_from_slice(&user_id_bytes); - - self.presenceid_presence.insert( - &presence_id, - &serde_json::to_vec(&PresenceEvent { - content: PresenceEventContent { - avatar_url: None, - currently_active: None, - displayname: None, - last_active_ago: Some( - last_timestamp.try_into().expect("time is valid"), - ), - presence: PresenceState::Offline, - status_msg: None, - }, - sender: user_id.to_owned(), - }) - .expect("PresenceEvent can be serialized"), - )?; - } - - self.userid_lastpresenceupdate.insert( - user_id.as_bytes(), - &utils::millis_since_unix_epoch().to_be_bytes(), - )?; - } - - Ok(()) - }*/ - - /// Returns the most recent presence updates that happened after the event with id `since`. - #[tracing::instrument(skip(self, since, _rooms, _globals))] - pub fn presence_since( - &self, - room_id: &RoomId, - since: u64, - ) -> Result, PresenceEvent>> { - self.db.presence_since(room_id, since) - } }