diff --git a/src/client_server.rs b/src/client_server.rs index 9dfe2d71..6d99a829 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, SystemTime}, }; +use crate::{utils, Database, MatrixResult, Ruma}; use log::{debug, warn}; use rocket::{delete, get, options, post, put, State}; use ruma_client_api::{ @@ -55,7 +56,6 @@ use ruma_events::{ }; use ruma_identifiers::{DeviceId, RoomAliasId, RoomId, RoomVersionId, UserId}; use serde_json::{json, value::RawValue}; -use crate::{utils, Database, MatrixResult, Ruma}; const GUEST_NAME_LENGTH: usize = 10; const DEVICE_ID_LENGTH: usize = 10; @@ -921,18 +921,12 @@ pub fn create_typing_event_route( _user_id: String, ) -> MatrixResult { let user_id = body.user_id.as_ref().expect("user is authenticated"); - let edu = EduEvent::Typing(ruma_events::typing::TypingEvent { - content: ruma_events::typing::TypingEventContent { - user_ids: vec![user_id.clone()], - }, - room_id: None, // None because it can be inferred - }); if body.typing { db.rooms .edus .roomactive_add( - edu, + &user_id, &body.room_id, body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000) + utils::millis_since_unix_epoch().try_into().unwrap_or(0), @@ -940,7 +934,10 @@ pub fn create_typing_event_route( ) .unwrap(); } else { - db.rooms.edus.roomactive_remove(edu, &body.room_id).unwrap(); + db.rooms + .edus + .roomactive_remove(&user_id, &body.room_id, &db.globals) + .unwrap(); } MatrixResult(Ok(create_typing_event::Response)) @@ -2083,30 +2080,23 @@ pub fn sync_route( let mut edus = db .rooms .edus - .roomactives_all(&room_id) + .roomlatests_since(&room_id, since) + .unwrap() .map(|r| r.unwrap()) .collect::>(); - if edus.is_empty() { - edus.push( - EduEvent::Typing(ruma_events::typing::TypingEvent { - content: ruma_events::typing::TypingEventContent { - user_ids: Vec::new(), - }, - room_id: None, // None because it can be inferred - }) - .into(), - ); + if db + .rooms + .edus + .last_roomactive_update(&room_id, &db.globals) + .unwrap() + > since + { + edus.push(serde_json::from_str(&serde_json::to_string( + &EduEvent::Typing(db.rooms.edus.roomactives_all(&room_id).unwrap()), + ).unwrap()).unwrap()); } - edus.extend( - db.rooms - .edus - .roomlatests_since(&room_id, since) - .unwrap() - .map(|r| r.unwrap()), - ); - joined_rooms.insert( room_id.clone().try_into().unwrap(), sync_events::JoinedRoom { @@ -2173,7 +2163,17 @@ pub fn sync_route( .map(|r| r.unwrap()) .collect::>(); - edus.extend(db.rooms.edus.roomactives_all(&room_id).map(|r| r.unwrap())); + if db + .rooms + .edus + .last_roomactive_update(&room_id, &db.globals) + .unwrap() + > since + { + edus.push(serde_json::from_str(&serde_json::to_string( + &EduEvent::Typing(db.rooms.edus.roomactives_all(&room_id).unwrap()), + ).unwrap()).unwrap()); + } left_rooms.insert( room_id.clone().try_into().unwrap(), @@ -2324,7 +2324,6 @@ pub fn get_message_events_route( #[get("/_matrix/client/r0/voip/turnServer")] pub fn turn_server_route() -> MatrixResult { - warn!("TODO: turn_server_route"); MatrixResult(Err(Error { kind: ErrorKind::NotFound, message: "There is no turn server yet.".to_owned(), diff --git a/src/database.rs b/src/database.rs index d4927a70..7be0dc70 100644 --- a/src/database.rs +++ b/src/database.rs @@ -70,7 +70,10 @@ impl Database { edus: rooms::RoomEdus { roomuserid_lastread: db.open_tree("roomuserid_lastread").unwrap(), // "Private" read receipt roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest").unwrap(), // Read receipts - roomactiveid_roomactive: db.open_tree("roomactiveid_roomactive").unwrap(), // Typing notifs + roomactiveid_userid: db.open_tree("roomactiveid_userid").unwrap(), // Typing notifs + roomid_lastroomactiveupdate: db + .open_tree("roomid_lastroomactiveupdate") + .unwrap(), }, pduid_pdu: db.open_tree("pduid_pdu").unwrap(), eventid_pduid: db.open_tree("eventid_pduid").unwrap(), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index ea124bee..a9a93067 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -52,31 +52,6 @@ impl Rooms { .is_some()) } - // TODO: Remove and replace with public room dir - /// Returns a vector over all rooms. - pub fn all_rooms(&self) -> Vec { - let mut room_ids = self - .roomid_pduleaves - .iter() - .keys() - .map(|key| { - RoomId::try_from( - &*utils::string_from_bytes( - &key.unwrap() - .iter() - .copied() - .take_while(|&x| x != 0xff) // until delimiter - .collect::>(), - ) - .unwrap(), - ) - .unwrap() - }) - .collect::>(); - room_ids.dedup(); - room_ids - } - /// Returns the full room state. pub fn room_state(&self, room_id: &RoomId) -> Result> { let mut hashmap = HashMap::new(); diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index a2ade552..0519b437 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -1,11 +1,13 @@ -use crate::{utils, Result}; +use crate::{utils, Error, Result}; use ruma_events::{collections::only::Event as EduEvent, EventJson}; use ruma_identifiers::{RoomId, UserId}; +use std::convert::TryFrom; pub struct RoomEdus { pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId - pub(in super::super) roomactiveid_roomactive: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count + pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count + pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count } impl RoomEdus { @@ -79,10 +81,11 @@ impl RoomEdus { .map(|(_, v)| Ok(serde_json::from_slice(&v)?))) } - /// Adds an event that will be saved until the `timeout` timestamp (e.g. typing notifications). + /// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is + /// called. pub fn roomactive_add( &self, - event: EduEvent, + user_id: &UserId, room_id: &RoomId, timeout: u64, globals: &super::super::globals::Globals, @@ -90,71 +93,134 @@ impl RoomEdus { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); - // Cleanup all outdated edus before inserting a new one + let count = globals.next_count()?.to_be_bytes(); + + let mut room_active_id = prefix; + room_active_id.extend_from_slice(&timeout.to_be_bytes()); + room_active_id.push(0xff); + room_active_id.extend_from_slice(&count); + + self.roomactiveid_userid + .insert(&room_active_id, &*user_id.to_string().as_bytes())?; + + self.roomid_lastroomactiveupdate + .insert(&room_id.to_string().as_bytes(), &count)?; + + Ok(()) + } + + /// Removes a user from typing before the timeout is reached. + pub fn roomactive_remove( + &self, + user_id: &UserId, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result<()> { + let mut prefix = room_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + + let user_id = user_id.to_string(); + + let mut found_outdated = false; + + // Maybe there are multiple ones from calling roomactive_add multiple times for outdated_edu in self - .roomactiveid_roomactive + .roomactiveid_userid .scan_prefix(&prefix) - .keys() .filter_map(|r| r.ok()) - .take_while(|k| { - utils::u64_from_bytes( - k.split(|&c| c == 0xff) - .nth(1) - .expect("roomactive has valid timestamp and delimiters"), - ) < utils::millis_since_unix_epoch() - }) + .filter(|(_, v)| v == user_id.as_bytes()) { - // This is an outdated edu (time > timestamp) - self.roomlatestid_roomlatest.remove(outdated_edu)?; + self.roomactiveid_userid.remove(outdated_edu.0)?; + found_outdated = true; } - let mut room_active_id = prefix; - room_active_id.extend_from_slice(&timeout.to_be_bytes()); - room_active_id.push(0xff); - room_active_id.extend_from_slice(&globals.next_count()?.to_be_bytes()); - - self.roomactiveid_roomactive - .insert(room_active_id, &*serde_json::to_string(&event)?)?; + if found_outdated { + self.roomid_lastroomactiveupdate.insert( + &room_id.to_string().as_bytes(), + &globals.next_count()?.to_be_bytes(), + )?; + } Ok(()) } - /// Removes an active event manually (before the timeout is reached). - pub fn roomactive_remove(&self, event: EduEvent, room_id: &RoomId) -> Result<()> { + /// Makes sure that typing events with old timestamps get removed. + fn roomactives_maintain( + &self, + room_id: &RoomId, + globals: &super::super::globals::Globals, + ) -> Result<()> { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); - let json = serde_json::to_string(&event)?; + let current_timestamp = utils::millis_since_unix_epoch(); - // Remove outdated entries + let mut found_outdated = false; + + // Find all outdated edus before inserting a new one for outdated_edu in self - .roomactiveid_roomactive + .roomactiveid_userid .scan_prefix(&prefix) + .keys() .filter_map(|r| r.ok()) - .filter(|(_, v)| v == json.as_bytes()) + .take_while(|k| { + utils::u64_from_bytes( + k.split(|&c| c == 0xff) + .nth(1) + .expect("roomactive has valid timestamp and delimiters"), + ) < current_timestamp + }) { - self.roomactiveid_roomactive.remove(outdated_edu.0)?; + // This is an outdated edu (time > timestamp) + self.roomlatestid_roomlatest.remove(outdated_edu)?; + found_outdated = true; + } + + if found_outdated { + self.roomid_lastroomactiveupdate.insert( + &room_id.to_string().as_bytes(), + &globals.next_count()?.to_be_bytes(), + )?; } Ok(()) } /// Returns an iterator over all active events (e.g. typing notifications). - pub fn roomactives_all( + pub fn last_roomactive_update( &self, room_id: &RoomId, - ) -> impl Iterator>> { + globals: &super::super::globals::Globals, + ) -> Result { + self.roomactives_maintain(room_id, globals)?; + + Ok(self + .roomid_lastroomactiveupdate + .get(&room_id.to_string().as_bytes())? + .map(|bytes| utils::u64_from_bytes(&bytes)) + .unwrap_or(0)) + } + + /// Returns an iterator over all active events (e.g. typing notifications). + pub fn roomactives_all(&self, room_id: &RoomId) -> Result { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); - let mut first_active_edu = prefix.clone(); - first_active_edu.extend_from_slice(&utils::millis_since_unix_epoch().to_be_bytes()); + let mut user_ids = Vec::new(); - self.roomactiveid_roomactive - .range(first_active_edu..) - .filter_map(|r| r.ok()) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(|(_, v)| Ok(serde_json::from_slice(&v)?)) + for user_id in self + .roomactiveid_userid + .scan_prefix(prefix) + .values() + .map(|user_id| Ok::<_, Error>(UserId::try_from(utils::string_from_bytes(&user_id?)?)?)) + { + user_ids.push(user_id?); + } + + Ok(ruma_events::typing::TypingEvent { + content: ruma_events::typing::TypingEventContent { user_ids }, + room_id: None, // Can be inferred + }) } /// Sets a private read marker at `count`.