diff --git a/src/client_server.rs b/src/client_server.rs index 9676f7b1..0c085e36 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -2659,6 +2659,9 @@ pub fn sync_route( } } + // Remove all to-device events the device received *last time* + db.users.remove_to_device_events(user_id, device_id, since)?; + Ok(sync_events::Response { next_batch, rooms: sync_events::Rooms { @@ -2711,7 +2714,7 @@ pub fn sync_route( }, device_one_time_keys_count: Default::default(), // TODO to_device: sync_events::ToDevice { - events: db.users.take_to_device_events(user_id, device_id, 100)?, + events: db.users.get_to_device_events(user_id, device_id)?, }, } .into()) diff --git a/src/database/users.rs b/src/database/users.rs index 3a02a9d8..6e6258f5 100644 --- a/src/database/users.rs +++ b/src/database/users.rs @@ -11,7 +11,7 @@ use ruma::{ events::{AnyToDeviceEvent, EventJson, EventType}, identifiers::{DeviceId, UserId}, }; -use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime}; +use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; pub struct Users { pub(super) userid_password: sled::Tree, @@ -660,11 +660,10 @@ impl Users { Ok(()) } - pub fn take_to_device_events( + pub fn get_to_device_events( &self, user_id: &UserId, device_id: &DeviceId, - max: usize, ) -> Result>> { let mut events = Vec::new(); @@ -673,18 +672,51 @@ impl Users { prefix.extend_from_slice(device_id.as_str().as_bytes()); prefix.push(0xff); - for result in self.todeviceid_events.scan_prefix(&prefix).take(max) { - let (key, value) = result?; + for value in self.todeviceid_events.scan_prefix(&prefix).values() { events.push( - serde_json::from_slice(&*value) + serde_json::from_slice(&*value?) .map_err(|_| Error::bad_database("Event in todeviceid_events is invalid."))?, ); - self.todeviceid_events.remove(key)?; } Ok(events) } + pub fn remove_to_device_events( + &self, + user_id: &UserId, + device_id: &DeviceId, + until: u64, + ) -> Result<()> { + let mut prefix = user_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(device_id.as_ref().as_bytes()); + prefix.push(0xff); + + let mut last = prefix.clone(); + last.extend_from_slice(&until.to_be_bytes()); + + for (key, _) in self + .todeviceid_events + .range(&*prefix..=&*last) + .keys() + .map(|key| { + let key = key?; + Ok::<_, Error>(( + key.clone(), + utils::u64_from_bytes(&key[key.len() - mem::size_of::()..key.len()]) + .map_err(|_| Error::bad_database("ToDeviceId has invalid count bytes."))?, + )) + }) + .filter_map(|r| r.ok()) + .take_while(|&(_, count)| count <= until) + { + self.todeviceid_events.remove(key)?; + } + + Ok(()) + } + pub fn update_device_metadata( &self, user_id: &UserId,