@ -14,17 +14,18 @@ use std::{
} ;
} ;
pub struct RoomEdus {
pub struct RoomEdus {
pub ( in super ::super ) roomuserid_lastread : sled ::Tree , // RoomUserId = Room + User
pub ( in super ::super ) readreceiptid_readreceipt : sled ::Tree , // ReadReceiptId = RoomId + Count + UserId
pub ( in super ::super ) roomlatestid_roomlatest : sled ::Tree , // Read Receipts, RoomLatestId = RoomId + Count + UserId
pub ( in super ::super ) roomuserid_privateread : sled ::Tree , // RoomUserId = Room + User, PrivateRead = Count
pub ( in super ::super ) roomactiveid_userid : sled ::Tree , // Typing, RoomActiveId = RoomId + TimeoutTime + Count
pub ( in super ::super ) roomuserid_lastprivatereadupdate : sled ::Tree , // LastPrivateReadUpdate = Count
pub ( in super ::super ) roomid_lastroomactiveupdate : sled ::Tree , // LastRoomActiveUpdate = Count
pub ( in super ::super ) typingid_userid : sled ::Tree , // TypingId = RoomId + TimeoutTime + Count
pub ( in super ::super ) roomid_lasttypingupdate : sled ::Tree , // LastRoomTypingUpdate = Count
pub ( in super ::super ) presenceid_presence : sled ::Tree , // PresenceId = RoomId + Count + UserId
pub ( in super ::super ) presenceid_presence : sled ::Tree , // PresenceId = RoomId + Count + UserId
pub ( in super ::super ) userid_lastpresenceupdate : sled ::Tree , // LastPresenceUpdate = Count
pub ( in super ::super ) userid_lastpresenceupdate : sled ::Tree , // LastPresenceUpdate = Count
}
}
impl RoomEdus {
impl RoomEdus {
/// Adds an event which will be saved until a new event replaces it (e.g. read receipt).
/// Adds an event which will be saved until a new event replaces it (e.g. read receipt).
pub fn r oomlates t_update(
pub fn r eadreceip t_update(
& self ,
& self ,
user_id : & UserId ,
user_id : & UserId ,
room_id : & RoomId ,
room_id : & RoomId ,
@ -36,7 +37,7 @@ impl RoomEdus {
// Remove old entry
// Remove old entry
if let Some ( old ) = self
if let Some ( old ) = self
. r oomlatestid_roomlates t
. r eadreceiptid_readreceip t
. scan_prefix ( & prefix )
. scan_prefix ( & prefix )
. keys ( )
. keys ( )
. rev ( )
. rev ( )
@ -50,7 +51,7 @@ impl RoomEdus {
} )
} )
{
{
// This is the old room_latest
// This is the old room_latest
self . r oomlatestid_roomlates t. remove ( old ) ? ;
self . r eadreceiptid_readreceip t. remove ( old ) ? ;
}
}
let mut room_latest_id = prefix ;
let mut room_latest_id = prefix ;
@ -58,7 +59,7 @@ impl RoomEdus {
room_latest_id . push ( 0xff ) ;
room_latest_id . push ( 0xff ) ;
room_latest_id . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
room_latest_id . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
self . r oomlatestid_roomlates t. insert (
self . r eadreceiptid_readreceip t. insert (
room_latest_id ,
room_latest_id ,
& * serde_json ::to_string ( & event ) . expect ( "EduEvent::to_string always works" ) ,
& * serde_json ::to_string ( & event ) . expect ( "EduEvent::to_string always works" ) ,
) ? ;
) ? ;
@ -67,7 +68,7 @@ impl RoomEdus {
}
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
pub fn r oomlates ts_since(
pub fn r eadreceip ts_since(
& self ,
& self ,
room_id : & RoomId ,
room_id : & RoomId ,
since : u64 ,
since : u64 ,
@ -79,7 +80,7 @@ impl RoomEdus {
first_possible_edu . extend_from_slice ( & ( since + 1 ) . to_be_bytes ( ) ) ; // +1 so we don't send the event at since
first_possible_edu . extend_from_slice ( & ( since + 1 ) . to_be_bytes ( ) ) ; // +1 so we don't send the event at since
Ok ( self
Ok ( self
. r oomlatestid_roomlates t
. r eadreceiptid_readreceip t
. range ( & * first_possible_edu .. )
. range ( & * first_possible_edu .. )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | r | r . ok ( ) )
. take_while ( move | ( k , _ ) | k . starts_with ( & prefix ) )
. take_while ( move | ( k , _ ) | k . starts_with ( & prefix ) )
@ -90,9 +91,60 @@ impl RoomEdus {
} ) )
} ) )
}
}
/// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is
/// Sets a private read marker at `count`.
pub fn private_read_set (
& self ,
room_id : & RoomId ,
user_id : & UserId ,
count : u64 ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let mut key = room_id . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
self . roomuserid_privateread
. insert ( & key , & count . to_be_bytes ( ) ) ? ;
self . roomuserid_lastprivatereadupdate
. insert ( & key , & globals . next_count ( ) ? . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
/// Returns the private read marker.
pub fn private_read_get ( & self , room_id : & RoomId , user_id : & UserId ) -> Result < Option < u64 > > {
let mut key = room_id . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
self . roomuserid_privateread . get ( key ) ? . map_or ( Ok ( None ) , | v | {
Ok ( Some ( utils ::u64_from_bytes ( & v ) . map_err ( | _ | {
Error ::bad_database ( "Invalid private read marker bytes" )
} ) ? ) )
} )
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update ( & self , user_id : & UserId , room_id : & RoomId ) -> Result < u64 > {
let mut key = room_id . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
Ok ( self
. roomuserid_lastprivatereadupdate
. get ( & key ) ?
. map_or ( Ok ::< _ , Error > ( None ) , | bytes | {
Ok ( Some ( utils ::u64_from_bytes ( & bytes ) . map_err ( | _ | {
Error ::bad_database ( "Count in roomuserid_lastprivatereadupdate is invalid." )
} ) ? ) )
} ) ?
. unwrap_or ( 0 ) )
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
/// called.
pub fn roomactive_add (
pub fn typing _add(
& self ,
& self ,
user_id : & UserId ,
user_id : & UserId ,
room_id : & RoomId ,
room_id : & RoomId ,
@ -104,22 +156,22 @@ impl RoomEdus {
let count = globals . next_count ( ) ? . to_be_bytes ( ) ;
let count = globals . next_count ( ) ? . to_be_bytes ( ) ;
let mut room_ active _id = prefix ;
let mut room_ typing _id = prefix ;
room_ active _id. extend_from_slice ( & timeout . to_be_bytes ( ) ) ;
room_ typing _id. extend_from_slice ( & timeout . to_be_bytes ( ) ) ;
room_ active _id. push ( 0xff ) ;
room_ typing _id. push ( 0xff ) ;
room_ active _id. extend_from_slice ( & count ) ;
room_ typing _id. extend_from_slice ( & count ) ;
self . roomactive id_userid
self . typing id_userid
. insert ( & room_ active _id, & * user_id . to_string ( ) . as_bytes ( ) ) ? ;
. insert ( & room_ typing _id, & * user_id . to_string ( ) . as_bytes ( ) ) ? ;
self . roomid_last roomactive update
self . roomid_last typing update
. insert ( & room_id . to_string ( ) . as_bytes ( ) , & count ) ? ;
. insert ( & room_id . to_string ( ) . as_bytes ( ) , & count ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
/// Removes a user from typing before the timeout is reached.
/// Removes a user from typing before the timeout is reached.
pub fn roomactive _remove(
pub fn typing _remove(
& self ,
& self ,
user_id : & UserId ,
user_id : & UserId ,
room_id : & RoomId ,
room_id : & RoomId ,
@ -132,19 +184,19 @@ impl RoomEdus {
let mut found_outdated = false ;
let mut found_outdated = false ;
// Maybe there are multiple ones from calling room active _add multiple times
// Maybe there are multiple ones from calling room typing _add multiple times
for outdated_edu in self
for outdated_edu in self
. roomactive id_userid
. typing id_userid
. scan_prefix ( & prefix )
. scan_prefix ( & prefix )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | r | r . ok ( ) )
. filter ( | ( _ , v ) | v = = user_id . as_bytes ( ) )
. filter ( | ( _ , v ) | v = = user_id . as_bytes ( ) )
{
{
self . roomactive id_userid. remove ( outdated_edu . 0 ) ? ;
self . typing id_userid. remove ( outdated_edu . 0 ) ? ;
found_outdated = true ;
found_outdated = true ;
}
}
if found_outdated {
if found_outdated {
self . roomid_last roomactive update. insert (
self . roomid_last typing update. insert (
& room_id . to_string ( ) . as_bytes ( ) ,
& room_id . to_string ( ) . as_bytes ( ) ,
& globals . next_count ( ) ? . to_be_bytes ( ) ,
& globals . next_count ( ) ? . to_be_bytes ( ) ,
) ? ;
) ? ;
@ -154,7 +206,7 @@ impl RoomEdus {
}
}
/// Makes sure that typing events with old timestamps get removed.
/// Makes sure that typing events with old timestamps get removed.
fn roomactive s_maintain(
fn typing s_maintain(
& self ,
& self ,
room_id : & RoomId ,
room_id : & RoomId ,
globals : & super ::super ::globals ::Globals ,
globals : & super ::super ::globals ::Globals ,
@ -168,7 +220,7 @@ impl RoomEdus {
// Find all outdated edus before inserting a new one
// Find all outdated edus before inserting a new one
for outdated_edu in self
for outdated_edu in self
. roomactive id_userid
. typing id_userid
. scan_prefix ( & prefix )
. scan_prefix ( & prefix )
. keys ( )
. keys ( )
. map ( | key | {
. map ( | key | {
@ -176,21 +228,21 @@ impl RoomEdus {
Ok ::< _ , Error > ( (
Ok ::< _ , Error > ( (
key . clone ( ) ,
key . clone ( ) ,
utils ::u64_from_bytes ( key . split ( | & b | b = = 0xff ) . nth ( 1 ) . ok_or_else ( | | {
utils ::u64_from_bytes ( key . split ( | & b | b = = 0xff ) . nth ( 1 ) . ok_or_else ( | | {
Error ::bad_database ( "Room Active has invalid timestamp or delimiters.")
Error ::bad_database ( "Room Typing has invalid timestamp or delimiters.")
} ) ? )
} ) ? )
. map_err ( | _ | Error ::bad_database ( "Room Active has invalid timestamp bytes.") ) ? ,
. map_err ( | _ | Error ::bad_database ( "Room Typing has invalid timestamp bytes.") ) ? ,
) )
) )
} )
} )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | r | r . ok ( ) )
. take_while ( | & ( _ , timestamp ) | timestamp < current_timestamp )
. take_while ( | & ( _ , timestamp ) | timestamp < current_timestamp )
{
{
// This is an outdated edu (time > timestamp)
// This is an outdated edu (time > timestamp)
self . roomactive id_userid. remove ( outdated_edu . 0 ) ? ;
self . typing id_userid. remove ( outdated_edu . 0 ) ? ;
found_outdated = true ;
found_outdated = true ;
}
}
if found_outdated {
if found_outdated {
self . roomid_last roomactive update. insert (
self . roomid_last typing update. insert (
& room_id . to_string ( ) . as_bytes ( ) ,
& room_id . to_string ( ) . as_bytes ( ) ,
& globals . next_count ( ) ? . to_be_bytes ( ) ,
& globals . next_count ( ) ? . to_be_bytes ( ) ,
) ? ;
) ? ;
@ -199,16 +251,16 @@ impl RoomEdus {
Ok ( ( ) )
Ok ( ( ) )
}
}
/// Returns an iterator over all active events (e.g. typing notifications) .
/// Returns the count of the last typing update in this room .
pub fn last_ roomactive _update(
pub fn last_ typing _update(
& self ,
& self ,
room_id : & RoomId ,
room_id : & RoomId ,
globals : & super ::super ::globals ::Globals ,
globals : & super ::super ::globals ::Globals ,
) -> Result < u64 > {
) -> Result < u64 > {
self . roomactive s_maintain( room_id , globals ) ? ;
self . typing s_maintain( room_id , globals ) ? ;
Ok ( self
Ok ( self
. roomid_last roomactive update
. roomid_last typing update
. get ( & room_id . to_string ( ) . as_bytes ( ) ) ?
. get ( & room_id . to_string ( ) . as_bytes ( ) ) ?
. map_or ( Ok ::< _ , Error > ( None ) , | bytes | {
. map_or ( Ok ::< _ , Error > ( None ) , | bytes | {
Ok ( Some ( utils ::u64_from_bytes ( & bytes ) . map_err ( | _ | {
Ok ( Some ( utils ::u64_from_bytes ( & bytes ) . map_err ( | _ | {
@ -218,7 +270,7 @@ impl RoomEdus {
. unwrap_or ( 0 ) )
. unwrap_or ( 0 ) )
}
}
pub fn roomactive s_all(
pub fn typing s_all(
& self ,
& self ,
room_id : & RoomId ,
room_id : & RoomId ,
) -> Result < SyncEphemeralRoomEvent < ruma ::events ::typing ::TypingEventContent > > {
) -> Result < SyncEphemeralRoomEvent < ruma ::events ::typing ::TypingEventContent > > {
@ -228,17 +280,15 @@ impl RoomEdus {
let mut user_ids = Vec ::new ( ) ;
let mut user_ids = Vec ::new ( ) ;
for user_id in self
for user_id in self
. roomactive id_userid
. typing id_userid
. scan_prefix ( prefix )
. scan_prefix ( prefix )
. values ( )
. values ( )
. map ( | user_id | {
. map ( | user_id | {
Ok ::< _ , Error > (
Ok ::< _ , Error > (
UserId ::try_from ( utils ::string_from_bytes ( & user_id ? ) . map_err ( | _ | {
UserId ::try_from ( utils ::string_from_bytes ( & user_id ? ) . map_err ( | _ | {
Error ::bad_database ( "User ID in roomactive id_userid is invalid unicode.")
Error ::bad_database ( "User ID in typing id_userid is invalid unicode.")
} ) ? )
} ) ? )
. map_err ( | _ | {
. map_err ( | _ | Error ::bad_database ( "User ID in typingid_userid is invalid." ) ) ? ,
Error ::bad_database ( "User ID in roomactiveid_userid is invalid." )
} ) ? ,
)
)
} )
} )
{
{
@ -250,30 +300,6 @@ impl RoomEdus {
} )
} )
}
}
/// Sets a private read marker at `count`.
pub fn room_read_set ( & self , room_id : & RoomId , user_id : & UserId , count : u64 ) -> Result < ( ) > {
let mut key = room_id . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
self . roomuserid_lastread . insert ( key , & count . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
/// Returns the private read marker.
pub fn room_read_get ( & self , room_id : & RoomId , user_id : & UserId ) -> Result < Option < u64 > > {
let mut key = room_id . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & user_id . to_string ( ) . as_bytes ( ) ) ;
self . roomuserid_lastread . get ( key ) ? . map_or ( Ok ( None ) , | v | {
Ok ( Some ( utils ::u64_from_bytes ( & v ) . map_err ( | _ | {
Error ::bad_database ( "Invalid private read marker bytes" )
} ) ? ) )
} )
}
/// Adds a presence event which will be saved until a new event replaces it.
/// Adds a presence event which will be saved until a new event replaces it.
///
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// Note: This method takes a RoomId because presence updates are always bound to rooms to