refactor(edus): split edus into separate modules

Nyaaori/refactor-next
Jakub Kubík 2 years ago committed by Nyaaori
parent e39358d375
commit 1869a38b85
No known key found for this signature in database
GPG Key ID: E7819C3ED4D1F82E

@ -156,7 +156,7 @@ impl service::room::directory::Data for KeyValueDatabase {
} }
} }
impl service::room::edus::Data for KeyValueDatabase { impl service::room::edus::read_receipt::Data for KeyValueDatabase {
fn readreceipt_update( fn readreceipt_update(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -203,7 +203,7 @@ impl service::room::edus::Data for KeyValueDatabase {
room_id: &RoomId, room_id: &RoomId,
since: u64, since: u64,
) -> impl Iterator< ) -> impl Iterator<
Item = Result<( Item=Result<(
Box<UserId>, Box<UserId>,
u64, u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>, Raw<ruma::events::AnySyncEphemeralRoomEvent>,
@ -229,7 +229,7 @@ impl service::room::edus::Data for KeyValueDatabase {
Error::bad_database("Invalid readreceiptid userid bytes in db.") Error::bad_database("Invalid readreceiptid userid bytes in db.")
})?, })?,
) )
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.")
@ -293,7 +293,9 @@ impl service::room::edus::Data for KeyValueDatabase {
.transpose()? .transpose()?
.unwrap_or(0)) .unwrap_or(0))
} }
}
impl service::room::edus::typing::Data for KeyValueDatabase {
fn typing_add( fn typing_add(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -379,14 +381,16 @@ impl service::room::edus::Data for KeyValueDatabase {
let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| { let user_id = UserId::parse(utils::string_from_bytes(&user_id).map_err(|_| {
Error::bad_database("User ID in typingid_userid is invalid unicode.") Error::bad_database("User ID in typingid_userid is invalid unicode.")
})?) })?)
.map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?; .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?;
user_ids.insert(user_id); user_ids.insert(user_id);
} }
Ok(user_ids) Ok(user_ids)
} }
}
impl service::room::edus::presence::Data for KeyValueDatabase {
fn update_presence( fn update_presence(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -416,7 +420,7 @@ impl service::room::edus::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.userid_lastpresenceupdate.insert( self.userid_lastpresenceupdate.insert(
user_id.as_bytes(), user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(), &utils::millis_since_unix_epoch().to_be_bytes(),

@ -1,256 +1,3 @@
mod data; pub mod presence;
pub use data::Data; pub mod read_receipt;
pub mod typing;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
/// Replaces the previous read receipt.
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event);
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[tracing::instrument(skip(self))]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
Box<UserId>,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a {
self.db.readreceipts_since(room_id, since)
}
/// Sets a private read marker at `count`.
#[tracing::instrument(skip(self, globals))]
pub fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()> {
self.db.private_read_set(room_id, user_id, count)
}
/// Returns the private read marker.
#[tracing::instrument(skip(self))]
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
self.db.private_read_get(room_id, user_id)
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
pub fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()> {
self.db.typing_add(user_id, room_id, timeout)
}
/// Removes a user from typing before the timeout is reached.
pub fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()> {
self.db.typing_remove(user_id, room_id)
}
/* TODO: Do this in background thread?
/// Makes sure that typing events with old timestamps get removed.
fn typings_maintain(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let current_timestamp = utils::millis_since_unix_epoch();
let mut found_outdated = false;
// Find all outdated edus before inserting a new one
for outdated_edu in self
.typingid_userid
.scan_prefix(prefix)
.map(|(key, _)| {
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(
&key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?[0..mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
))
})
.filter_map(|r| r.ok())
.take_while(|&(_, timestamp)| timestamp < current_timestamp)
{
// This is an outdated edu (time > timestamp)
self.typingid_userid.remove(&outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lasttypingupdate
.insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?;
}
Ok(())
}
*/
/// Returns the count of the last typing update in this room.
#[tracing::instrument(skip(self, globals))]
pub fn last_typing_update(
&self,
room_id: &RoomId,
) -> Result<u64> {
self.db.last_typing_update(room_id)
}
/// Returns a new typing EDU.
pub fn typings_all(
&self,
room_id: &RoomId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
let user_ids = self.db.typings_all(room_id)?;
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent {
user_ids: user_ids.into_iter().collect(),
},
})
}
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence(
&self,
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
self.db.update_presence(user_id, room_id, presence)
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.db.ping_presence(user_id)
}
pub fn get_last_presence_event(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<PresenceEvent>> {
let last_update = match self.db.last_presence_update(user_id)? {
Some(last) => last,
None => return Ok(None),
};
self.db.get_presence_event(room_id, user_id, last_update)
}
/* TODO
/// Sets all users to offline who have been quiet for too long.
fn _presence_maintain(
&self,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch();
for (user_id_bytes, last_timestamp) in self
.userid_lastpresenceupdate
.iter()
.filter_map(|(k, bytes)| {
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, _rooms, _globals))]
pub fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<Box<UserId>, PresenceEvent>> {
self.db.presence_since(room_id, since)
}
}

@ -1,67 +1,4 @@
pub trait Data { pub trait Data {
/// Replaces the previous read receipt.
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()>;
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
fn readreceipts_since(
&self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
Box<UserId>,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
>;
/// Sets a private read marker at `count`.
fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()>;
/// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()>;
/// Removes a user from typing before the timeout is reached.
fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()>;
/// Returns the count of the last typing update in this room.
fn last_typing_update(
&self,
room_id: &RoomId,
) -> Result<u64>;
/// Returns all user ids currently typing.
fn typings_all(
&self,
room_id: &RoomId,
) -> Result<HashSet<UserId>>;
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// ///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to /// Note: This method takes a RoomId because presence updates are always bound to rooms to
@ -80,7 +17,12 @@ pub trait Data {
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>; fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the presence event with correct last_active_ago. /// Returns the presence event with correct last_active_ago.
fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<Option<PresenceEvent>>; fn get_presence_event(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`. /// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since( fn presence_since(

@ -8,143 +8,6 @@ pub struct Service<D: Data> {
} }
impl Service<_> { impl Service<_> {
/// Replaces the previous read receipt.
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event);
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[tracing::instrument(skip(self))]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
Box<UserId>,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a {
self.db.readreceipts_since(room_id, since)
}
/// Sets a private read marker at `count`.
#[tracing::instrument(skip(self, globals))]
pub fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()> {
self.db.private_read_set(room_id, user_id, count)
}
/// Returns the private read marker.
#[tracing::instrument(skip(self))]
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
self.db.private_read_get(room_id, user_id)
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
pub fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()> {
self.db.typing_add(user_id, room_id, timeout)
}
/// Removes a user from typing before the timeout is reached.
pub fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()> {
self.db.typing_remove(user_id, room_id)
}
/* TODO: Do this in background thread?
/// Makes sure that typing events with old timestamps get removed.
fn typings_maintain(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let current_timestamp = utils::millis_since_unix_epoch();
let mut found_outdated = false;
// Find all outdated edus before inserting a new one
for outdated_edu in self
.typingid_userid
.scan_prefix(prefix)
.map(|(key, _)| {
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(
&key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?[0..mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
))
})
.filter_map(|r| r.ok())
.take_while(|&(_, timestamp)| timestamp < current_timestamp)
{
// This is an outdated edu (time > timestamp)
self.typingid_userid.remove(&outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lasttypingupdate
.insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?;
}
Ok(())
}
*/
/// Returns the count of the last typing update in this room.
#[tracing::instrument(skip(self, globals))]
pub fn last_typing_update(
&self,
room_id: &RoomId,
) -> Result<u64> {
self.db.last_typing_update(room_id)
}
/// Returns a new typing EDU.
pub fn typings_all(
&self,
room_id: &RoomId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
let user_ids = self.db.typings_all(room_id)?;
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent {
user_ids: user_ids.into_iter().collect(),
},
})
}
/// Adds a presence event which will be saved until a new event replaces it. /// Adds a presence event which will be saved until a new event replaces it.
/// ///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to /// Note: This method takes a RoomId because presence updates are always bound to rooms to

@ -21,71 +21,11 @@ pub trait Data {
>; >;
/// Sets a private read marker at `count`. /// Sets a private read marker at `count`.
fn private_read_set( fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>;
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()>;
/// Returns the private read marker. /// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>; fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the count of the last typing update in this room. /// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>; fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()>;
/// Removes a user from typing before the timeout is reached.
fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()>;
/// Returns the count of the last typing update in this room.
fn last_typing_update(
&self,
room_id: &RoomId,
) -> Result<u64>;
/// Returns all user ids currently typing.
fn typings_all(
&self,
room_id: &RoomId,
) -> Result<HashSet<UserId>>;
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
fn update_presence(
&self,
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state.
fn ping_presence(&self, user_id: &UserId) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the presence event with correct last_active_ago.
fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<Box<UserId>, PresenceEvent>>;
} }

@ -36,12 +36,7 @@ impl Service<_> {
/// Sets a private read marker at `count`. /// Sets a private read marker at `count`.
#[tracing::instrument(skip(self, globals))] #[tracing::instrument(skip(self, globals))]
pub fn private_read_set( pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()> {
self.db.private_read_set(room_id, user_id, count) self.db.private_read_set(room_id, user_id, count)
} }
@ -55,202 +50,4 @@ impl Service<_> {
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> { pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id) self.db.last_privateread_update(user_id, room_id)
} }
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
pub fn typing_add(
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()> {
self.db.typing_add(user_id, room_id, timeout)
}
/// Removes a user from typing before the timeout is reached.
pub fn typing_remove(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()> {
self.db.typing_remove(user_id, room_id)
}
/* TODO: Do this in background thread?
/// Makes sure that typing events with old timestamps get removed.
fn typings_maintain(
&self,
room_id: &RoomId,
globals: &super::super::globals::Globals,
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let current_timestamp = utils::millis_since_unix_epoch();
let mut found_outdated = false;
// Find all outdated edus before inserting a new one
for outdated_edu in self
.typingid_userid
.scan_prefix(prefix)
.map(|(key, _)| {
Ok::<_, Error>((
key.clone(),
utils::u64_from_bytes(
&key.splitn(2, |&b| b == 0xff).nth(1).ok_or_else(|| {
Error::bad_database("RoomTyping has invalid timestamp or delimiters.")
})?[0..mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?,
))
})
.filter_map(|r| r.ok())
.take_while(|&(_, timestamp)| timestamp < current_timestamp)
{
// This is an outdated edu (time > timestamp)
self.typingid_userid.remove(&outdated_edu.0)?;
found_outdated = true;
}
if found_outdated {
self.roomid_lasttypingupdate
.insert(room_id.as_bytes(), &globals.next_count()?.to_be_bytes())?;
}
Ok(())
}
*/
/// Returns the count of the last typing update in this room.
#[tracing::instrument(skip(self, globals))]
pub fn last_typing_update(
&self,
room_id: &RoomId,
) -> Result<u64> {
self.db.last_typing_update(room_id)
}
/// Returns a new typing EDU.
pub fn typings_all(
&self,
room_id: &RoomId,
) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
let user_ids = self.db.typings_all(room_id)?;
Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent {
user_ids: user_ids.into_iter().collect(),
},
})
}
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence(
&self,
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
self.db.update_presence(user_id, room_id, presence)
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.db.ping_presence(user_id)
}
pub fn get_last_presence_event(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<PresenceEvent>> {
let last_update = match self.db.last_presence_update(user_id)? {
Some(last) => last,
None => return Ok(None),
};
self.db.get_presence_event(room_id, user_id, last_update)
}
/* TODO
/// Sets all users to offline who have been quiet for too long.
fn _presence_maintain(
&self,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch();
for (user_id_bytes, last_timestamp) in self
.userid_lastpresenceupdate
.iter()
.filter_map(|(k, bytes)| {
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, _rooms, _globals))]
pub fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<Box<UserId>, PresenceEvent>> {
self.db.presence_since(room_id, since)
}
} }

@ -1,91 +1,14 @@
pub trait Data { pub trait Data {
/// Replaces the previous read receipt.
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()>;
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
fn readreceipts_since(
&self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
Box<UserId>,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
>;
/// Sets a private read marker at `count`.
fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()>;
/// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called. /// called.
fn typing_add( fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()>;
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()>;
/// Removes a user from typing before the timeout is reached. /// Removes a user from typing before the timeout is reached.
fn typing_remove( fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()>;
/// Returns the count of the last typing update in this room. /// Returns the count of the last typing update in this room.
fn last_typing_update( fn last_typing_update(&self, room_id: &RoomId) -> Result<u64>;
&self,
room_id: &RoomId,
) -> Result<u64>;
/// Returns all user ids currently typing. /// Returns all user ids currently typing.
fn typings_all( fn typings_all(&self, room_id: &RoomId) -> Result<HashSet<UserId>>;
&self,
room_id: &RoomId,
) -> Result<HashSet<UserId>>;
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
fn update_presence(
&self,
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()>;
/// Resets the presence timeout, so the user will stay in their current presence state.
fn ping_presence(&self, user_id: &UserId) -> Result<()>;
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the presence event with correct last_active_ago.
fn get_presence_event(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<Option<PresenceEvent>>;
/// Returns the most recent presence updates that happened after the event with id `since`.
fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<Box<UserId>, PresenceEvent>>;
} }

@ -8,71 +8,14 @@ pub struct Service<D: Data> {
} }
impl Service<_> { impl Service<_> {
/// Replaces the previous read receipt.
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event);
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[tracing::instrument(skip(self))]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
Box<UserId>,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a {
self.db.readreceipts_since(room_id, since)
}
/// Sets a private read marker at `count`.
#[tracing::instrument(skip(self, globals))]
pub fn private_read_set(
&self,
room_id: &RoomId,
user_id: &UserId,
count: u64,
) -> Result<()> {
self.db.private_read_set(room_id, user_id, count)
}
/// Returns the private read marker.
#[tracing::instrument(skip(self))]
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
self.db.private_read_get(room_id, user_id)
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called. /// called.
pub fn typing_add( pub fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> {
&self,
user_id: &UserId,
room_id: &RoomId,
timeout: u64,
) -> Result<()> {
self.db.typing_add(user_id, room_id, timeout) self.db.typing_add(user_id, room_id, timeout)
} }
/// Removes a user from typing before the timeout is reached. /// Removes a user from typing before the timeout is reached.
pub fn typing_remove( pub fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<()> {
self.db.typing_remove(user_id, room_id) self.db.typing_remove(user_id, room_id)
} }
@ -124,10 +67,7 @@ impl Service<_> {
/// Returns the count of the last typing update in this room. /// Returns the count of the last typing update in this room.
#[tracing::instrument(skip(self, globals))] #[tracing::instrument(skip(self, globals))]
pub fn last_typing_update( pub fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
&self,
room_id: &RoomId,
) -> Result<u64> {
self.db.last_typing_update(room_id) self.db.last_typing_update(room_id)
} }
@ -144,113 +84,4 @@ impl Service<_> {
}, },
}) })
} }
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence(
&self,
user_id: &UserId,
room_id: &RoomId,
presence: PresenceEvent,
) -> Result<()> {
self.db.update_presence(user_id, room_id, presence)
}
/// Resets the presence timeout, so the user will stay in their current presence state.
pub fn ping_presence(&self, user_id: &UserId) -> Result<()> {
self.db.ping_presence(user_id)
}
pub fn get_last_presence_event(
&self,
user_id: &UserId,
room_id: &RoomId,
) -> Result<Option<PresenceEvent>> {
let last_update = match self.db.last_presence_update(user_id)? {
Some(last) => last,
None => return Ok(None),
};
self.db.get_presence_event(room_id, user_id, last_update)
}
/* TODO
/// Sets all users to offline who have been quiet for too long.
fn _presence_maintain(
&self,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
) -> Result<()> {
let current_timestamp = utils::millis_since_unix_epoch();
for (user_id_bytes, last_timestamp) in self
.userid_lastpresenceupdate
.iter()
.filter_map(|(k, bytes)| {
Some((
k,
utils::u64_from_bytes(&bytes)
.map_err(|_| {
Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.")
})
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp.saturating_sub(*timestamp) > 5 * 60_000)
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals.next_count()?.to_be_bytes();
let user_id: Box<_> = utils::string_from_bytes(&user_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?;
for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) {
let mut presence_id = room_id.as_bytes().to_vec();
presence_id.push(0xff);
presence_id.extend_from_slice(&count);
presence_id.push(0xff);
presence_id.extend_from_slice(&user_id_bytes);
self.presenceid_presence.insert(
&presence_id,
&serde_json::to_vec(&PresenceEvent {
content: PresenceEventContent {
avatar_url: None,
currently_active: None,
displayname: None,
last_active_ago: Some(
last_timestamp.try_into().expect("time is valid"),
),
presence: PresenceState::Offline,
status_msg: None,
},
sender: user_id.to_owned(),
})
.expect("PresenceEvent can be serialized"),
)?;
}
self.userid_lastpresenceupdate.insert(
user_id.as_bytes(),
&utils::millis_since_unix_epoch().to_be_bytes(),
)?;
}
Ok(())
}*/
/// Returns the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, since, _rooms, _globals))]
pub fn presence_since(
&self,
room_id: &RoomId,
since: u64,
) -> Result<HashMap<Box<UserId>, PresenceEvent>> {
self.db.presence_since(room_id, since)
}
} }

Loading…
Cancel
Save