diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index ff72765f..1b8bd8e4 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -34,13 +34,14 @@ pub fn set_read_marker_route( )?; if let Some(event) = &body.read_receipt { - db.rooms.edus.room_read_set( + db.rooms.edus.private_read_set( &body.room_id, &sender_id, db.rooms.get_pdu_count(event)?.ok_or(Error::BadRequest( ErrorKind::InvalidParam, "Event does not exist.", ))?, + &db.globals, )?; let mut user_receipts = BTreeMap::new(); @@ -58,7 +59,7 @@ pub fn set_read_marker_route( }, ); - db.rooms.edus.roomlatest_update( + db.rooms.edus.readreceipt_update( &sender_id, &body.room_id, AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 2307f028..8f373544 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -81,7 +81,12 @@ pub async fn sync_events_route( .rev() .collect::>(); - let send_notification_counts = !timeline_pdus.is_empty(); + let send_notification_counts = !timeline_pdus.is_empty() + || db + .rooms + .edus + .last_privateread_update(&sender_id, &room_id)? + > since; // They /sync response doesn't always return all messages, so we say the output is // limited unless there are events in non_timeline_pdus @@ -242,7 +247,7 @@ pub async fn sync_events_route( }; let notification_count = if send_notification_counts { - if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &sender_id)? { + if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_id)? { Some( (db.rooms .pdus_since(&sender_id, &room_id, last_read)? @@ -280,20 +285,15 @@ pub async fn sync_events_route( let mut edus = db .rooms .edus - .roomlatests_since(&room_id, since)? + .readreceipts_since(&room_id, since)? .filter_map(|r| r.ok()) // Filter out buggy events .collect::>(); - if db - .rooms - .edus - .last_roomactive_update(&room_id, &db.globals)? - > since - { + if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since { edus.push( serde_json::from_str( &serde_json::to_string(&AnySyncEphemeralRoomEvent::Typing( - db.rooms.edus.roomactives_all(&room_id)?, + db.rooms.edus.typings_all(&room_id)?, )) .expect("event is valid, we just created it"), ) diff --git a/src/client_server/typing.rs b/src/client_server/typing.rs index 7eba13e1..89e1e4ab 100644 --- a/src/client_server/typing.rs +++ b/src/client_server/typing.rs @@ -16,7 +16,7 @@ pub fn create_typing_event_route( let sender_id = body.sender_id.as_ref().expect("user is authenticated"); if body.typing { - db.rooms.edus.roomactive_add( + db.rooms.edus.typing_add( &sender_id, &body.room_id, body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000) @@ -26,7 +26,7 @@ pub fn create_typing_event_route( } else { db.rooms .edus - .roomactive_remove(&sender_id, &body.room_id, &db.globals)?; + .typing_remove(&sender_id, &body.room_id, &db.globals)?; } Ok(create_typing_event::Response.into()) diff --git a/src/database.rs b/src/database.rs index 7bbb6dd7..41781b95 100644 --- a/src/database.rs +++ b/src/database.rs @@ -88,10 +88,11 @@ impl Database { }, rooms: rooms::Rooms { edus: rooms::RoomEdus { - roomuserid_lastread: db.open_tree("roomuserid_lastread")?, // "Private" read receipt - roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts - roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs - roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, + readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, + roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt + roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?, + typingid_userid: db.open_tree("typingid_userid")?, + roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, presenceid_presence: db.open_tree("presenceid_presence")?, userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?, }, @@ -163,14 +164,14 @@ impl Database { futures.push( self.rooms .edus - .roomid_lastroomactiveupdate + .roomid_lasttypingupdate .watch_prefix(&roomid_bytes), ); futures.push( self.rooms .edus - .roomlatestid_roomlatest + .readreceiptid_readreceipt .watch_prefix(&roomid_prefix), ); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 3c1febd4..bb14c8a5 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -621,7 +621,7 @@ impl Rooms { } _ => {} } - self.edus.room_read_set(&room_id, &sender, index)?; + self.edus.private_read_set(&room_id, &sender, index, &globals)?; Ok(pdu.event_id) } diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index fff30c22..fbd3edb6 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -14,17 +14,18 @@ use std::{ }; pub struct RoomEdus { - pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User - pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId - pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count - pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count + pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId + pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count + pub(in super::super) roomuserid_lastprivatereadupdate: sled::Tree, // LastPrivateReadUpdate = Count + pub(in super::super) typingid_userid: sled::Tree, // TypingId = RoomId + TimeoutTime + Count + pub(in super::super) roomid_lasttypingupdate: sled::Tree, // LastRoomTypingUpdate = Count pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count } impl RoomEdus { /// Adds an event which will be saved until a new event replaces it (e.g. read receipt). - pub fn roomlatest_update( + pub fn readreceipt_update( &self, user_id: &UserId, room_id: &RoomId, @@ -36,7 +37,7 @@ impl RoomEdus { // Remove old entry if let Some(old) = self - .roomlatestid_roomlatest + .readreceiptid_readreceipt .scan_prefix(&prefix) .keys() .rev() @@ -50,7 +51,7 @@ impl RoomEdus { }) { // This is the old room_latest - self.roomlatestid_roomlatest.remove(old)?; + self.readreceiptid_readreceipt.remove(old)?; } let mut room_latest_id = prefix; @@ -58,7 +59,7 @@ impl RoomEdus { room_latest_id.push(0xff); room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); - self.roomlatestid_roomlatest.insert( + self.readreceiptid_readreceipt.insert( room_latest_id, &*serde_json::to_string(&event).expect("EduEvent::to_string always works"), )?; @@ -67,7 +68,7 @@ impl RoomEdus { } /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`. - pub fn roomlatests_since( + pub fn readreceipts_since( &self, room_id: &RoomId, since: u64, @@ -79,7 +80,7 @@ impl RoomEdus { first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since Ok(self - .roomlatestid_roomlatest + .readreceiptid_readreceipt .range(&*first_possible_edu..) .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) @@ -90,9 +91,54 @@ impl RoomEdus { })) } - /// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is + /// Sets a private read marker at `count`. + pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + self.roomuserid_privateread + .insert(&key, &count.to_be_bytes())?; + + self.roomuserid_lastprivatereadupdate + .insert(&key, &globals.next_count()?.to_be_bytes())?; + + Ok(()) + } + + /// Returns the private read marker. + pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + self.roomuserid_privateread.get(key)?.map_or(Ok(None), |v| { + Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { + Error::bad_database("Invalid private read marker bytes") + })?)) + }) + } + + /// Returns the count of the last typing update in this room. + pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut key = room_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&user_id.to_string().as_bytes()); + + Ok(self + .roomuserid_lastprivatereadupdate + .get(&key)? + .map_or(Ok::<_, Error>(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") + })?)) + })? + .unwrap_or(0)) + } + + /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// called. - pub fn roomactive_add( + pub fn typing_add( &self, user_id: &UserId, room_id: &RoomId, @@ -104,22 +150,22 @@ impl RoomEdus { let count = globals.next_count()?.to_be_bytes(); - let mut room_active_id = prefix; - room_active_id.extend_from_slice(&timeout.to_be_bytes()); - room_active_id.push(0xff); - room_active_id.extend_from_slice(&count); + let mut room_typing_id = prefix; + room_typing_id.extend_from_slice(&timeout.to_be_bytes()); + room_typing_id.push(0xff); + room_typing_id.extend_from_slice(&count); - self.roomactiveid_userid - .insert(&room_active_id, &*user_id.to_string().as_bytes())?; + self.typingid_userid + .insert(&room_typing_id, &*user_id.to_string().as_bytes())?; - self.roomid_lastroomactiveupdate + self.roomid_lasttypingupdate .insert(&room_id.to_string().as_bytes(), &count)?; Ok(()) } /// Removes a user from typing before the timeout is reached. - pub fn roomactive_remove( + pub fn typing_remove( &self, user_id: &UserId, room_id: &RoomId, @@ -132,19 +178,19 @@ impl RoomEdus { let mut found_outdated = false; - // Maybe there are multiple ones from calling roomactive_add multiple times + // Maybe there are multiple ones from calling roomtyping_add multiple times for outdated_edu in self - .roomactiveid_userid + .typingid_userid .scan_prefix(&prefix) .filter_map(|r| r.ok()) .filter(|(_, v)| v == user_id.as_bytes()) { - self.roomactiveid_userid.remove(outdated_edu.0)?; + self.typingid_userid.remove(outdated_edu.0)?; found_outdated = true; } if found_outdated { - self.roomid_lastroomactiveupdate.insert( + self.roomid_lasttypingupdate.insert( &room_id.to_string().as_bytes(), &globals.next_count()?.to_be_bytes(), )?; @@ -154,7 +200,7 @@ impl RoomEdus { } /// Makes sure that typing events with old timestamps get removed. - fn roomactives_maintain( + fn typings_maintain( &self, room_id: &RoomId, globals: &super::super::globals::Globals, @@ -168,7 +214,7 @@ impl RoomEdus { // Find all outdated edus before inserting a new one for outdated_edu in self - .roomactiveid_userid + .typingid_userid .scan_prefix(&prefix) .keys() .map(|key| { @@ -176,21 +222,21 @@ impl RoomEdus { Ok::<_, Error>(( key.clone(), utils::u64_from_bytes(key.split(|&b| b == 0xff).nth(1).ok_or_else(|| { - Error::bad_database("RoomActive has invalid timestamp or delimiters.") + Error::bad_database("RoomTyping has invalid timestamp or delimiters.") })?) - .map_err(|_| Error::bad_database("RoomActive has invalid timestamp bytes."))?, + .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, )) }) .filter_map(|r| r.ok()) .take_while(|&(_, timestamp)| timestamp < current_timestamp) { // This is an outdated edu (time > timestamp) - self.roomactiveid_userid.remove(outdated_edu.0)?; + self.typingid_userid.remove(outdated_edu.0)?; found_outdated = true; } if found_outdated { - self.roomid_lastroomactiveupdate.insert( + self.roomid_lasttypingupdate.insert( &room_id.to_string().as_bytes(), &globals.next_count()?.to_be_bytes(), )?; @@ -199,16 +245,16 @@ impl RoomEdus { Ok(()) } - /// Returns an iterator over all active events (e.g. typing notifications). - pub fn last_roomactive_update( + /// Returns the count of the last typing update in this room. + pub fn last_typing_update( &self, room_id: &RoomId, globals: &super::super::globals::Globals, ) -> Result { - self.roomactives_maintain(room_id, globals)?; + self.typings_maintain(room_id, globals)?; Ok(self - .roomid_lastroomactiveupdate + .roomid_lasttypingupdate .get(&room_id.to_string().as_bytes())? .map_or(Ok::<_, Error>(None), |bytes| { Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { @@ -218,7 +264,7 @@ impl RoomEdus { .unwrap_or(0)) } - pub fn roomactives_all( + pub fn typings_all( &self, room_id: &RoomId, ) -> Result> { @@ -228,17 +274,15 @@ impl RoomEdus { let mut user_ids = Vec::new(); for user_id in self - .roomactiveid_userid + .typingid_userid .scan_prefix(prefix) .values() .map(|user_id| { Ok::<_, Error>( UserId::try_from(utils::string_from_bytes(&user_id?).map_err(|_| { - Error::bad_database("User ID in roomactiveid_userid is invalid unicode.") + Error::bad_database("User ID in typingid_userid is invalid unicode.") })?) - .map_err(|_| { - Error::bad_database("User ID in roomactiveid_userid is invalid.") - })?, + .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?, ) }) { @@ -250,30 +294,6 @@ impl RoomEdus { }) } - /// Sets a private read marker at `count`. - pub fn room_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { - let mut key = room_id.to_string().as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(&user_id.to_string().as_bytes()); - - self.roomuserid_lastread.insert(key, &count.to_be_bytes())?; - - Ok(()) - } - - /// Returns the private read marker. - pub fn room_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result> { - let mut key = room_id.to_string().as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(&user_id.to_string().as_bytes()); - - self.roomuserid_lastread.get(key)?.map_or(Ok(None), |v| { - Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { - Error::bad_database("Invalid private read marker bytes") - })?)) - }) - } - /// Adds a presence event which will be saved until a new event replaces it. /// /// Note: This method takes a RoomId because presence updates are always bound to rooms to