Merge pull request 'fix: send notification count updates when private read receipts change' (#194) from fixrr into master

Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/194
merge-requests/22/head
Timo Kösters 4 years ago
commit 0f524955b2

@ -34,13 +34,14 @@ pub fn set_read_marker_route(
)?;
if let Some(event) = &body.read_receipt {
db.rooms.edus.room_read_set(
db.rooms.edus.private_read_set(
&body.room_id,
&sender_id,
db.rooms.get_pdu_count(event)?.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
))?,
&db.globals,
)?;
let mut user_receipts = BTreeMap::new();
@ -58,7 +59,7 @@ pub fn set_read_marker_route(
},
);
db.rooms.edus.roomlatest_update(
db.rooms.edus.readreceipt_update(
&sender_id,
&body.room_id,
AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt(

@ -81,7 +81,12 @@ pub async fn sync_events_route(
.rev()
.collect::<Vec<_>>();
let send_notification_counts = !timeline_pdus.is_empty();
let send_notification_counts = !timeline_pdus.is_empty()
|| db
.rooms
.edus
.last_privateread_update(&sender_id, &room_id)?
> since;
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are events in non_timeline_pdus
@ -242,7 +247,7 @@ pub async fn sync_events_route(
};
let notification_count = if send_notification_counts {
if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &sender_id)? {
if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_id)? {
Some(
(db.rooms
.pdus_since(&sender_id, &room_id, last_read)?
@ -280,20 +285,15 @@ pub async fn sync_events_route(
let mut edus = db
.rooms
.edus
.roomlatests_since(&room_id, since)?
.readreceipts_since(&room_id, since)?
.filter_map(|r| r.ok()) // Filter out buggy events
.collect::<Vec<_>>();
if db
.rooms
.edus
.last_roomactive_update(&room_id, &db.globals)?
> since
{
if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since {
edus.push(
serde_json::from_str(
&serde_json::to_string(&AnySyncEphemeralRoomEvent::Typing(
db.rooms.edus.roomactives_all(&room_id)?,
db.rooms.edus.typings_all(&room_id)?,
))
.expect("event is valid, we just created it"),
)

@ -16,7 +16,7 @@ pub fn create_typing_event_route(
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
if body.typing {
db.rooms.edus.roomactive_add(
db.rooms.edus.typing_add(
&sender_id,
&body.room_id,
body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000)
@ -26,7 +26,7 @@ pub fn create_typing_event_route(
} else {
db.rooms
.edus
.roomactive_remove(&sender_id, &body.room_id, &db.globals)?;
.typing_remove(&sender_id, &body.room_id, &db.globals)?;
}
Ok(create_typing_event::Response.into())

@ -88,10 +88,11 @@ impl Database {
},
rooms: rooms::Rooms {
edus: rooms::RoomEdus {
roomuserid_lastread: db.open_tree("roomuserid_lastread")?, // "Private" read receipt
roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts
roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs
roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?,
readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?,
roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt
roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?,
typingid_userid: db.open_tree("typingid_userid")?,
roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?,
presenceid_presence: db.open_tree("presenceid_presence")?,
userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?,
},
@ -163,14 +164,14 @@ impl Database {
futures.push(
self.rooms
.edus
.roomid_lastroomactiveupdate
.roomid_lasttypingupdate
.watch_prefix(&roomid_bytes),
);
futures.push(
self.rooms
.edus
.roomlatestid_roomlatest
.readreceiptid_readreceipt
.watch_prefix(&roomid_prefix),
);

@ -621,7 +621,7 @@ impl Rooms {
}
_ => {}
}
self.edus.room_read_set(&room_id, &sender, index)?;
self.edus.private_read_set(&room_id, &sender, index, &globals)?;
Ok(pdu.event_id)
}

@ -14,17 +14,18 @@ use std::{
};
pub struct RoomEdus {
pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User
pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId
pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count
pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId
pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count
pub(in super::super) roomuserid_lastprivatereadupdate: sled::Tree, // LastPrivateReadUpdate = Count
pub(in super::super) typingid_userid: sled::Tree, // TypingId = RoomId + TimeoutTime + Count
pub(in super::super) roomid_lasttypingupdate: sled::Tree, // LastRoomTypingUpdate = Count
pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId
pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count
}
impl RoomEdus {
/// Adds an event which will be saved until a new event replaces it (e.g. read receipt).
pub fn roomlatest_update(
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
@ -36,7 +37,7 @@ impl RoomEdus {
// Remove old entry
if let Some(old) = self
.roomlatestid_roomlatest
.readreceiptid_readreceipt
.scan_prefix(&prefix)
.keys()
.rev()
@ -50,7 +51,7 @@ impl RoomEdus {
})
{
// This is the old room_latest
self.roomlatestid_roomlatest.remove(old)?;
self.readreceiptid_readreceipt.remove(old)?;
}
let mut room_latest_id = prefix;
@ -58,7 +59,7 @@ impl RoomEdus {
room_latest_id.push(0xff);
room_latest_id.extend_from_slice(&user_id.to_string().as_bytes());
self.roomlatestid_roomlatest.insert(
self.readreceiptid_readreceipt.insert(
room_latest_id,
&*serde_json::to_string(&event).expect("EduEvent::to_string always works"),
)?;
@ -67,7 +68,7 @@ impl RoomEdus {
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
pub fn roomlatests_since(
pub fn readreceipts_since(
&self,
room_id: &RoomId,
since: u64,
@ -79,7 +80,7 @@ impl RoomEdus {
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
Ok(self
.roomlatestid_roomlatest
.readreceiptid_readreceipt
.range(&*first_possible_edu..)
.filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix))
@ -90,9 +91,54 @@ impl RoomEdus {
}))
}
/// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is
/// Sets a private read marker at `count`.
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> {
let mut key = room_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.to_string().as_bytes());
self.roomuserid_privateread
.insert(&key, &count.to_be_bytes())?;
self.roomuserid_lastprivatereadupdate
.insert(&key, &globals.next_count()?.to_be_bytes())?;
Ok(())
}
/// Returns the private read marker.
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.to_string().as_bytes());
self.roomuserid_privateread.get(key)?.map_or(Ok(None), |v| {
Ok(Some(utils::u64_from_bytes(&v).map_err(|_| {
Error::bad_database("Invalid private read marker bytes")
})?))
})
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
let mut key = room_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.to_string().as_bytes());
Ok(self
.roomuserid_lastprivatereadupdate
.get(&key)?
.map_or(Ok::<_, Error>(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
})?))
})?
.unwrap_or(0))
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
pub fn roomactive_add(
pub fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
@ -104,22 +150,22 @@ impl RoomEdus {
let count = globals.next_count()?.to_be_bytes();
let mut room_active_id = prefix;
room_active_id.extend_from_slice(&timeout.to_be_bytes());
room_active_id.push(0xff);
room_active_id.extend_from_slice(&count);
let mut room_typing_id = prefix;
room_typing_id.extend_from_slice(&timeout.to_be_bytes());
room_typing_id.push(0xff);
room_typing_id.extend_from_slice(&count);
self.roomactiveid_userid
.insert(&room_active_id, &*user_id.to_string().as_bytes())?;
self.typingid_userid
.insert(&room_typing_id, &*user_id.to_string().as_bytes())?;
self.roomid_lastroomactiveupdate
self.roomid_lasttypingupdate
.insert(&room_id.to_string().as_bytes(), &count)?;
Ok(())
}
/// Removes a user from typing before the timeout is reached.
pub fn roomactive_remove(
pub fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
@ -132,19 +178,19 @@ impl RoomEdus {
let mut found_outdated = false;
// Maybe there are multiple ones from calling roomactive_add multiple times
// Maybe there are multiple ones from calling roomtyping_add multiple times
for outdated_edu in self
.roomactiveid_userid
.typingid_userid
.scan_prefix(&prefix)
.filter_map(|r| r.ok())
.filter(|(_, v)| v == user_id.as_bytes())
{
self.roomactiveid_userid.remove(outdated_edu.0)?;
self.typingid_userid.remove(outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lastroomactiveupdate.insert(
self.roomid_lasttypingupdate.insert(
&room_id.to_string().as_bytes(),
&globals.next_count()?.to_be_bytes(),
)?;
@ -154,7 +200,7 @@ impl RoomEdus {
}
/// Makes sure that typing events with old timestamps get removed.
fn roomactives_maintain(
fn typings_maintain(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
@ -168,7 +214,7 @@ impl RoomEdus {
// Find all outdated edus before inserting a new one
for outdated_edu in self
.roomactiveid_userid
.typingid_userid
.scan_prefix(&prefix)
.keys()
.map(|key| {
@ -176,21 +222,21 @@ impl RoomEdus {
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(key.split(|&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomActive has invalid timestamp or delimiters.")
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?)
.map_err(|_| Error::bad_database("RoomActive has invalid timestamp bytes."))?,
.map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
))
})
.filter_map(|r| r.ok())
.take_while(|&(_, timestamp)| timestamp < current_timestamp)
{
// This is an outdated edu (time > timestamp)
self.roomactiveid_userid.remove(outdated_edu.0)?;
self.typingid_userid.remove(outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lastroomactiveupdate.insert(
self.roomid_lasttypingupdate.insert(
&room_id.to_string().as_bytes(),
&globals.next_count()?.to_be_bytes(),
)?;
@ -199,16 +245,16 @@ impl RoomEdus {
Ok(())
}
/// Returns an iterator over all active events (e.g. typing notifications).
pub fn last_roomactive_update(
/// Returns the count of the last typing update in this room.
pub fn last_typing_update(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<u64> {
self.roomactives_maintain(room_id, globals)?;
self.typings_maintain(room_id, globals)?;
Ok(self
.roomid_lastroomactiveupdate
.roomid_lasttypingupdate
.get(&room_id.to_string().as_bytes())?
.map_or(Ok::<_, Error>(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
@ -218,7 +264,7 @@ impl RoomEdus {
.unwrap_or(0))
}
pub fn roomactives_all(
pub fn typings_all(
&self,
room_id: &RoomId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
@ -228,17 +274,15 @@ impl RoomEdus {
let mut user_ids = Vec::new();
for user_id in self
.roomactiveid_userid
.typingid_userid
.scan_prefix(prefix)
.values()
.map(|user_id| {
Ok::<_, Error>(
UserId::try_from(utils::string_from_bytes(&user_id?).map_err(|_| {
Error::bad_database("User ID in roomactiveid_userid is invalid unicode.")
Error::bad_database("User ID in typingid_userid is invalid unicode.")
})?)
.map_err(|_| {
Error::bad_database("User ID in roomactiveid_userid is invalid.")
})?,
.map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?,
)
})
{
@ -250,30 +294,6 @@ impl RoomEdus {
})
}
/// Sets a private read marker at `count`.
pub fn room_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
let mut key = room_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.to_string().as_bytes());
self.roomuserid_lastread.insert(key, &count.to_be_bytes())?;
Ok(())
}
/// Returns the private read marker.
pub fn room_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.to_string().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&user_id.to_string().as_bytes());
self.roomuserid_lastread.get(key)?.map_or(Ok(None), |v| {
Ok(Some(utils::u64_from_bytes(&v).map_err(|_| {
Error::bad_database("Invalid private read marker bytes")
})?))
})
}
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to

@ -17,6 +17,7 @@ Can invite users to invite-only rooms
Can list tags for a room
Can logout all devices
Can logout current device
Can re-join room if re-invited
Can read configuration endpoint
Can recv a device message using /sync
Can recv device messages until they are acknowledged
@ -113,7 +114,6 @@ Typing events appear in incremental sync
Typing events appear in initial sync
Uninvited users cannot join the room
User appears in user directory
User directory correctly update on display name change
User in dir while user still shares private rooms
User in shared private room does appear in user directory
User is offline if they set_presence=offline in their sync

Loading…
Cancel
Save