From d891bbb5dc0640794feceece27137563e532d9da Mon Sep 17 00:00:00 2001 From: timokoesters Date: Tue, 28 Jul 2020 15:58:50 +0200 Subject: [PATCH] improve: presence --- Cargo.toml | 1 + src/client_server.rs | 189 ++++++++++++++++++++--------------- src/database.rs | 10 +- src/database/global_edus.rs | 62 ------------ src/database/rooms/edus.rs | 191 +++++++++++++++++++++++++++++++++++- 5 files changed, 304 insertions(+), 149 deletions(-) delete mode 100644 src/database/global_edus.rs diff --git a/Cargo.toml b/Cargo.toml index de8fb47a..c2607a7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_p tokio = "0.2.22" # Used for long polling ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], rev = "d5d2d1d893fa12d27960e4c58d6c09b215d06e95" } # Used for matrix spec type definitions and helpers +#ruma = { path = "../ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } sled = "0.32.0" # Used for storing data permanently log = "0.4.8" # Used for emitting log entries http = "0.2.1" # Used for rocket<->ruma conversions diff --git a/src/client_server.rs b/src/client_server.rs index 6cc8e3d6..de76eef7 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{hash_map, BTreeMap, HashMap}, convert::{TryFrom, TryInto}, time::{Duration, SystemTime}, }; @@ -645,7 +645,7 @@ pub fn set_displayname_route( db.users .set_displayname(&sender_id, body.displayname.clone())?; - // Send a new membership event into all joined rooms + // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; db.rooms.append_pdu( @@ -675,27 +675,29 @@ pub fn set_displayname_route( None, &db.globals, )?; - } - // Presence update - db.global_edus.update_presence( - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: db.users.avatar_url(&sender_id)?, - currently_active: None, - displayname: db.users.displayname(&sender_id)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: ruma::presence::PresenceState::Online, - status_msg: None, + // Presence update + db.rooms.edus.update_presence( + &sender_id, + &room_id, + ruma::events::presence::PresenceEvent { + content: ruma::events::presence::PresenceEventContent { + avatar_url: db.users.avatar_url(&sender_id)?, + currently_active: None, + displayname: db.users.displayname(&sender_id)?, + last_active_ago: Some( + utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"), + ), + presence: ruma::presence::PresenceState::Online, + status_msg: None, + }, + sender: sender_id.clone(), }, - sender: sender_id.clone(), - }, - &db.globals, - )?; + &db.globals, + )?; + } Ok(set_display_name::Response.into()) } @@ -739,7 +741,7 @@ pub fn set_avatar_url_route( db.users .set_avatar_url(&sender_id, body.avatar_url.clone())?; - // Send a new membership event into all joined rooms + // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; db.rooms.append_pdu( @@ -769,27 +771,29 @@ pub fn set_avatar_url_route( None, &db.globals, )?; - } - // Presence update - db.global_edus.update_presence( - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: db.users.avatar_url(&sender_id)?, - currently_active: None, - displayname: db.users.displayname(&sender_id)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: ruma::presence::PresenceState::Online, - status_msg: None, + // Presence update + db.rooms.edus.update_presence( + &sender_id, + &room_id, + ruma::events::presence::PresenceEvent { + content: ruma::events::presence::PresenceEventContent { + avatar_url: db.users.avatar_url(&sender_id)?, + currently_active: None, + displayname: db.users.displayname(&sender_id)?, + last_active_ago: Some( + utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"), + ), + presence: ruma::presence::PresenceState::Online, + status_msg: None, + }, + sender: sender_id.clone(), }, - sender: sender_id.clone(), - }, - &db.globals, - )?; + &db.globals, + )?; + } Ok(set_avatar_url::Response.into()) } @@ -844,24 +848,30 @@ pub fn set_presence_route( ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); - db.global_edus.update_presence( - ruma::events::presence::PresenceEvent { - content: ruma::events::presence::PresenceEventContent { - avatar_url: db.users.avatar_url(&sender_id)?, - currently_active: None, - displayname: db.users.displayname(&sender_id)?, - last_active_ago: Some( - utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"), - ), - presence: body.presence, - status_msg: body.status_msg.clone(), + for room_id in db.rooms.rooms_joined(&sender_id) { + let room_id = room_id?; + + db.rooms.edus.update_presence( + &sender_id, + &room_id, + ruma::events::presence::PresenceEvent { + content: ruma::events::presence::PresenceEventContent { + avatar_url: db.users.avatar_url(&sender_id)?, + currently_active: None, + displayname: db.users.displayname(&sender_id)?, + last_active_ago: Some( + utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"), + ), + presence: body.presence, + status_msg: body.status_msg.clone(), + }, + sender: sender_id.clone(), }, - sender: sender_id.clone(), - }, - &db.globals, - )?; + &db.globals, + )?; + } Ok(set_presence::Response.into()) } @@ -2492,6 +2502,9 @@ pub async fn sync_events_route( let sender_id = body.sender_id.as_ref().expect("user is authenticated"); let device_id = body.device_id.as_ref().expect("user is authenticated"); + // TODO: match body.set_presence { + db.rooms.edus.ping_presence(&sender_id)?; + // Setup watchers, so if there's no response, we can wait for them let watcher = db.watch(sender_id, device_id); @@ -2504,6 +2517,8 @@ pub async fn sync_events_route( .and_then(|string| string.parse().ok()) .unwrap_or(0); + let mut presence_updates = HashMap::new(); + for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; @@ -2735,6 +2750,40 @@ pub async fn sync_events_route( if !joined_room.is_empty() { joined_rooms.insert(room_id.clone(), joined_room); } + + // Take presence updates from this room + for (user_id, presence) in + db.rooms + .edus + .presence_since(&room_id, since, &db.rooms, &db.globals)? + { + match presence_updates.entry(user_id) { + hash_map::Entry::Vacant(v) => { + v.insert(presence); + } + hash_map::Entry::Occupied(mut o) => { + let p = o.get_mut(); + + // Update existing presence event with more info + p.content.presence = presence.content.presence; + if let Some(status_msg) = presence.content.status_msg { + p.content.status_msg = Some(status_msg); + } + if let Some(last_active_ago) = presence.content.last_active_ago { + p.content.last_active_ago = Some(last_active_ago); + } + if let Some(displayname) = presence.content.displayname { + p.content.displayname = Some(displayname); + } + if let Some(avatar_url) = presence.content.avatar_url { + p.content.avatar_url = Some(avatar_url); + } + if let Some(currently_active) = presence.content.currently_active { + p.content.currently_active = Some(currently_active); + } + } + } + } } let mut left_rooms = BTreeMap::new(); @@ -2818,23 +2867,9 @@ pub async fn sync_events_route( invite: invited_rooms, }, presence: sync_events::Presence { - events: db - .global_edus - .presence_since(since)? - .map(|edu| { - let mut edu = edu? - .deserialize() - .map_err(|_| Error::bad_database("EDU in database is invalid."))?; - if let Some(timestamp) = edu.content.last_active_ago { - let mut last_active_ago = utils::millis_since_unix_epoch() - .try_into() - .expect("time is valid"); - last_active_ago -= timestamp; - edu.content.last_active_ago = Some(last_active_ago); - } - Ok::<_, Error>(edu.into()) - }) - .filter_map(|edu| edu.ok()) // Filter out buggy events + events: presence_updates + .into_iter() + .map(|(_, v)| Raw::from(v)) .collect(), }, account_data: sync_events::AccountData { @@ -2878,8 +2913,8 @@ pub async fn sync_events_route( // Hang a few seconds so requests are not spammed // Stop hanging if new info arrives let mut duration = body.timeout.unwrap_or(Duration::default()); - if duration.as_secs() > 10 { - duration = Duration::from_secs(10); + if duration.as_secs() > 30 { + duration = Duration::from_secs(30); } let mut delay = tokio::time::delay_for(duration); tokio::select! { diff --git a/src/database.rs b/src/database.rs index 250de23c..a8376388 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,5 +1,4 @@ pub(self) mod account_data; -pub(self) mod global_edus; pub(self) mod globals; pub(self) mod key_backups; pub(self) mod media; @@ -22,7 +21,6 @@ pub struct Database { pub uiaa: uiaa::Uiaa, pub rooms: rooms::Rooms, pub account_data: account_data::AccountData, - pub global_edus: global_edus::GlobalEdus, pub media: media::Media, pub key_backups: key_backups::KeyBackups, pub _db: sled::Db, @@ -93,6 +91,8 @@ impl Database { roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, + presenceid_presence: db.open_tree("presenceid_presence")?, + userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?, }, pduid_pdu: db.open_tree("pduid_pdu")?, eventid_pduid: db.open_tree("eventid_pduid")?, @@ -112,9 +112,6 @@ impl Database { account_data: account_data::AccountData { roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, }, - global_edus: global_edus::GlobalEdus { - presenceid_presence: db.open_tree("presenceid_presence")?, // Presence - }, media: media::Media { mediaid_file: db.open_tree("mediaid_file")?, }, @@ -146,9 +143,6 @@ impl Database { .watch_prefix(&userdeviceid_prefix), ); - // TODO: only send for user they share a room with - futures.push(self.global_edus.presenceid_presence.watch_prefix(b"")); - futures.push(self.rooms.userroomid_joined.watch_prefix(&userid_prefix)); futures.push(self.rooms.userroomid_invited.watch_prefix(&userid_prefix)); futures.push(self.rooms.userroomid_left.watch_prefix(&userid_prefix)); diff --git a/src/database/global_edus.rs b/src/database/global_edus.rs deleted file mode 100644 index 94f2de82..00000000 --- a/src/database/global_edus.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::{Error, Result}; -use ruma::Raw; - -pub struct GlobalEdus { - //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count - pub(super) presenceid_presence: sled::Tree, // Presence, PresenceId = Count + UserId -} - -impl GlobalEdus { - /// Adds a global event which will be saved until a new event replaces it (e.g. presence updates). - pub fn update_presence( - &self, - presence: ruma::events::presence::PresenceEvent, - globals: &super::globals::Globals, - ) -> Result<()> { - // Remove old entry - if let Some(old) = self - .presenceid_presence - .iter() - .keys() - .rev() - .filter_map(|r| r.ok()) - .find(|key| { - key.rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element") - == presence.sender.to_string().as_bytes() - }) - { - // This is the old global_latest - self.presenceid_presence.remove(old)?; - } - - let mut presence_id = globals.next_count()?.to_be_bytes().to_vec(); - presence_id.push(0xff); - presence_id.extend_from_slice(&presence.sender.to_string().as_bytes()); - - self.presenceid_presence.insert( - presence_id, - &*serde_json::to_string(&presence).expect("PresenceEvent can be serialized"), - )?; - - Ok(()) - } - - /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. - pub fn presence_since( - &self, - since: u64, - ) -> Result>>> { - let first_possible_edu = (since + 1).to_be_bytes().to_vec(); // +1 so we don't send the event at since - - Ok(self - .presenceid_presence - .range(&*first_possible_edu..) - .filter_map(|r| r.ok()) - .map(|(_, v)| { - Ok(serde_json::from_slice(&v) - .map_err(|_| Error::bad_database("Invalid presence event in db."))?) - })) - } -} diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index 22d01666..62df0ccd 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -1,15 +1,25 @@ use crate::{utils, Error, Result}; +use js_int::UInt; use ruma::{ - events::{AnyEvent as EduEvent, SyncEphemeralRoomEvent}, + events::{ + presence::{PresenceEvent, PresenceEventContent}, + AnyEvent as EduEvent, SyncEphemeralRoomEvent, + }, + presence::PresenceState, Raw, RoomId, UserId, }; -use std::convert::TryFrom; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, +}; pub struct RoomEdus { pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count + pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId + pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count } impl RoomEdus { @@ -263,4 +273,181 @@ impl RoomEdus { })?)) }) } + + /// Adds a presence event which will be saved until a new event replaces it. + /// + /// Note: This method takes a RoomId because presence updates are always bound to rooms to + /// make sure users outside these rooms can't see them. + pub fn update_presence( + &self, + user_id: &UserId, + room_id: &RoomId, + presence: ruma::events::presence::PresenceEvent, + globals: &super::super::globals::Globals, + ) -> Result<()> { + // TODO: Remove old entry? Or maybe just wipe completely from time to time? + + let count = globals.next_count()?.to_be_bytes(); + + let mut presence_id = room_id.to_string().as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(&presence.sender.to_string().as_bytes()); + + self.presenceid_presence.insert( + presence_id, + &*serde_json::to_string(&presence).expect("PresenceEvent can be serialized"), + )?; + + self.userid_lastpresenceupdate.insert( + &user_id.to_string().as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + /// Resets the presence timeout, so the user will stay in their current presence state. + pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { + self.userid_lastpresenceupdate.insert( + &user_id.to_string().as_bytes(), + &utils::millis_since_unix_epoch().to_be_bytes(), + )?; + + Ok(()) + } + + /// Returns the timestamp of the last presence update of this user in millis since the unix epoch. + pub fn last_presence_update(&self, user_id: &UserId) -> Result> { + self.userid_lastpresenceupdate + .get(&user_id.to_string().as_bytes())? + .map(|bytes| { + utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + }) + .transpose() + } + + /// Sets all users to offline who have been quiet for too long. + pub fn presence_maintain( + &self, + rooms: &super::Rooms, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let current_timestamp = utils::millis_since_unix_epoch(); + + for (user_id_bytes, last_timestamp) in self + .userid_lastpresenceupdate + .iter() + .filter_map(|r| r.ok()) + .filter_map(|(k, bytes)| { + Some(( + k, + utils::u64_from_bytes(&bytes) + .map_err(|_| { + Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") + }) + .ok()?, + )) + }) + .take_while(|(_, timestamp)| current_timestamp - timestamp > 5 * 60_000) // 5 Minutes + { + self.userid_lastpresenceupdate.remove(&user_id_bytes)?; + + // Send new presence events to set the user offline + let count = globals.next_count()?.to_be_bytes(); + let user_id = utils::string_from_bytes(&user_id_bytes) + .map_err(|_| { + Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") + })? + .try_into() + .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; + for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { + let mut presence_id = room_id.to_string().as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&count); + presence_id.push(0xff); + presence_id.extend_from_slice(&user_id_bytes); + + self.presenceid_presence.insert( + presence_id, + &*serde_json::to_string(&PresenceEvent { + content: PresenceEventContent { + avatar_url: None, + currently_active: None, + displayname: None, + last_active_ago: Some( + last_timestamp.try_into().expect("time is valid"), + ), + presence: PresenceState::Offline, + status_msg: None, + }, + sender: user_id.clone(), + }) + .expect("PresenceEvent can be serialized"), + )?; + } + } + + Ok(()) + } + + /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. + pub fn presence_since( + &self, + room_id: &RoomId, + since: u64, + rooms: &super::Rooms, + globals: &super::super::globals::Globals, + ) -> Result> { + self.presence_maintain(rooms, globals)?; + + let mut prefix = room_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + + let mut first_possible_edu = prefix.clone(); + first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since + let mut hashmap = HashMap::new(); + + for (key, value) in self + .presenceid_presence + .range(&*first_possible_edu..) + .filter_map(|r| r.ok()) + .take_while(|(key, _)| key.starts_with(&prefix)) + { + let user_id = UserId::try_from( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, + ) + .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; + + let mut presence = serde_json::from_slice::(&value) + .map_err(|_| Error::bad_database("Invalid presence event in db."))?; + + let current_timestamp: UInt = utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"); + + if presence.content.presence == PresenceState::Online { + // Don't set last_active_ago when the user is online + presence.content.last_active_ago = None; + } else { + // Convert from timestamp to duration + presence.content.last_active_ago = presence + .content + .last_active_ago + .map(|timestamp| current_timestamp - timestamp); + } + + hashmap.insert(user_id, presence); + } + + Ok(hashmap) + } }