@ -1,10 +1,7 @@
pub trait Data {
fn get_room_shortstatehash ( room_id : & RoomId ) ;
}
use crate ::service ;
/// Returns the last state hash key added to the db for the given room.
#[ tracing::instrument(skip(self)) ]
pub fn current_shortstatehash ( & self , room_id : & RoomId ) -> Result < Option < u64 > > {
impl service ::room ::state ::Data for KeyValueDatabase {
fn get_room_shortstatehash ( & self , room_id : & RoomId ) -> Result < Option < u64 > > {
self . roomid_shortstatehash
. get ( room_id . as_bytes ( ) ) ?
. map_or ( Ok ( None ) , | bytes | {
@ -14,77 +11,21 @@ pub trait Data {
} )
}
pub struct Service < D : Data > {
db : D ,
}
impl Service {
/// Set the room to the given statehash and update caches.
#[ tracing::instrument(skip(self, new_state_ids_compressed, db)) ]
pub fn force_state (
& self ,
room_id : & RoomId ,
shortstatehash : u64 ,
statediffnew :HashSet < CompressedStateEvent > ,
statediffremoved :HashSet < CompressedStateEvent > ,
db : & Database ,
) -> Result < ( ) > {
for event_id in statediffnew . into_iter ( ) . filter_map ( | new | {
self . parse_compressed_state_event ( new )
. ok ( )
. map ( | ( _ , id ) | id )
} ) {
let pdu = match self . get_pdu_json ( & event_id ) ? {
Some ( pdu ) = > pdu ,
None = > continue ,
} ;
if pdu . get ( "type" ) . and_then ( | val | val . as_str ( ) ) ! = Some ( "m.room.member" ) {
continue ;
}
let pdu : PduEvent = match serde_json ::from_str (
& serde_json ::to_string ( & pdu ) . expect ( "CanonicalJsonObj can be serialized to JSON" ) ,
) {
Ok ( pdu ) = > pdu ,
Err ( _ ) = > continue ,
} ;
#[ derive(Deserialize) ]
struct ExtractMembership {
membership : MembershipState ,
}
let membership = match serde_json ::from_str ::< ExtractMembership > ( pdu . content . get ( ) ) {
Ok ( e ) = > e . membership ,
Err ( _ ) = > continue ,
} ;
let state_key = match pdu . state_key {
Some ( k ) = > k ,
None = > continue ,
} ;
let user_id = match UserId ::parse ( state_key ) {
Ok ( id ) = > id ,
Err ( _ ) = > continue ,
} ;
self . update_membership ( room_id , & user_id , membership , & pdu . sender , None , db , false ) ? ;
}
self . update_joined_count ( room_id , db ) ? ;
fn set_room_state ( & self , room_id : & RoomId , new_shortstatehash : u64
_mutex_lock : & MutexGuard < ' _ , StateLock > , // Take mutex guard to make sure users get the room state mutex
) -> Result < ( ) > {
self . roomid_shortstatehash
. insert ( room_id . as_bytes ( ) , & new_shortstatehash . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
fn set_event_state ( & self ) -> Result < ( ) > {
db . shorteventid_shortstatehash
. insert ( & shorteventid . to_be_bytes ( ) , & shortstatehash . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
/// Returns the leaf pdus of a room.
#[ tracing::instrument(skip(self)) ]
pub fn get_pdu_leaves ( & self , room_id : & RoomId ) -> Result < HashSet < Arc < EventId > > > {
fn get_pdu_leaves ( & self , room_id : & RoomId ) -> Result < HashSet < Arc < EventId > > > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -99,15 +40,11 @@ impl Service {
. collect ( )
}
/// Replace the leaves of a room.
///
/// The provided `event_ids` become the new leaves, this allows a room to have multiple
/// `prev_events`.
#[ tracing::instrument(skip(self)) ]
pub fn replace_pdu_leaves < ' a > (
fn set_forward_extremities (
& self ,
room_id : & RoomId ,
event_ids : impl IntoIterator < Item = & ' a EventId > + Debug ,
_mutex_lock : & MutexGuard < ' _ , StateLock > , // Take mutex guard to make sure users get the room state mutex
) -> Result < ( ) > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -125,230 +62,48 @@ impl Service {
Ok ( ( ) )
}
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[ tracing::instrument(skip(self, state_ids_compressed, globals)) ]
pub fn set_event_state (
}
impl service ::room ::alias ::Data for KeyValueDatabase {
fn set_alias (
& self ,
event_id : & EventId ,
room_id : & RoomId ,
state_ids_compressed : HashSet < CompressedStateEvent > ,
globals : & super ::globals ::Globals ,
alias : & RoomAliasId ,
room_id : Option < & RoomId >
) -> Result < ( ) > {
let shorteventid = self . get_or_create_shorteventid ( event_id , globals ) ? ;
let previous_shortstatehash = self . current_shortstatehash ( room_id ) ? ;
let state_hash = self . calculate_hash (
& state_ids_compressed
. iter ( )
. map ( | s | & s [ .. ] )
. collect ::< Vec < _ > > ( ) ,
) ;
let ( shortstatehash , already_existed ) =
self . get_or_create_shortstatehash ( & state_hash , globals ) ? ;
if ! already_existed {
let states_parents = previous_shortstatehash
. map_or_else ( | | Ok ( Vec ::new ( ) ) , | p | self . load_shortstatehash_info ( p ) ) ? ;
let ( statediffnew , statediffremoved ) =
if let Some ( parent_stateinfo ) = states_parents . last ( ) {
let statediffnew : HashSet < _ > = state_ids_compressed
. difference ( & parent_stateinfo . 1 )
. copied ( )
. collect ( ) ;
let statediffremoved : HashSet < _ > = parent_stateinfo
. 1
. difference ( & state_ids_compressed )
. copied ( )
. collect ( ) ;
( statediffnew , statediffremoved )
} else {
( state_ids_compressed , HashSet ::new ( ) )
} ;
self . save_state_from_diff (
shortstatehash ,
statediffnew ,
statediffremoved ,
1_000_000 , // high number because no state will be based on this one
states_parents ,
) ? ;
}
self . shorteventid_shortstatehash
. insert ( & shorteventid . to_be_bytes ( ) , & shortstatehash . to_be_bytes ( ) ) ? ;
self . alias_roomid
. insert ( alias . alias ( ) . as_bytes ( ) , room_id . as_bytes ( ) ) ? ;
let mut aliasid = room_id . as_bytes ( ) . to_vec ( ) ;
aliasid . push ( 0xff ) ;
aliasid . extend_from_slice ( & globals . next_count ( ) ? . to_be_bytes ( ) ) ;
self . aliasid_alias . insert ( & aliasid , & * alias . as_bytes ( ) ) ? ;
Ok ( ( ) )
}
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[ tracing::instrument(skip(self, new_pdu, globals)) ]
pub fn append_to_state (
fn remove_alias (
& self ,
new_pdu : & PduEvent ,
globals : & super ::globals ::Globals ,
) -> Result < u64 > {
let shorteventid = self . get_or_create_shorteventid ( & new_pdu . event_id , globals ) ? ;
let previous_shortstatehash = self . current_shortstatehash ( & new_pdu . room_id ) ? ;
if let Some ( p ) = previous_shortstatehash {
self . shorteventid_shortstatehash
. insert ( & shorteventid . to_be_bytes ( ) , & p . to_be_bytes ( ) ) ? ;
}
if let Some ( state_key ) = & new_pdu . state_key {
let states_parents = previous_shortstatehash
. map_or_else ( | | Ok ( Vec ::new ( ) ) , | p | self . load_shortstatehash_info ( p ) ) ? ;
let shortstatekey = self . get_or_create_shortstatekey (
& new_pdu . kind . to_string ( ) . into ( ) ,
state_key ,
globals ,
) ? ;
let new = self . compress_state_event ( shortstatekey , & new_pdu . event_id , globals ) ? ;
let replaces = states_parents
. last ( )
. map ( | info | {
info . 1
. iter ( )
. find ( | bytes | bytes . starts_with ( & shortstatekey . to_be_bytes ( ) ) )
} )
. unwrap_or_default ( ) ;
if Some ( & new ) = = replaces {
return Ok ( previous_shortstatehash . expect ( "must exist" ) ) ;
}
// TODO: statehash with deterministic inputs
let shortstatehash = globals . next_count ( ) ? ;
let mut statediffnew = HashSet ::new ( ) ;
statediffnew . insert ( new ) ;
alias : & RoomAliasId ,
) -> Result < ( ) > {
if let Some ( room_id ) = self . alias_roomid . get ( alias . alias ( ) . as_bytes ( ) ) ? {
let mut prefix = room_id . to_vec ( ) ;
prefix . push ( 0xff ) ;
let mut statediffremoved = HashSet ::new ( ) ;
if let Some ( replaces ) = replaces {
statediffremoved . insert ( * replaces ) ;
for ( key , _ ) in self . aliasid_alias . scan_prefix ( prefix ) {
self . aliasid_alias . remove ( & key ) ? ;
}
self . save_state_from_diff (
shortstatehash ,
statediffnew ,
statediffremoved ,
2 ,
states_parents ,
) ? ;
Ok ( shortstatehash )
self . alias_roomid . remove ( alias . alias ( ) . as_bytes ( ) ) ? ;
} else {
Ok ( previous_shortstatehash . expect ( "first event in room must be a state event" ) )
}
}
#[ tracing::instrument(skip(self, invite_event)) ]
pub fn calculate_invite_state (
& self ,
invite_event : & PduEvent ,
) -> Result < Vec < Raw < AnyStrippedStateEvent > > > {
let mut state = Vec ::new ( ) ;
// Add recommended events
if let Some ( e ) =
self . room_state_get ( & invite_event . room_id , & StateEventType ::RoomCreate , "" ) ?
{
state . push ( e . to_stripped_state_event ( ) ) ;
}
if let Some ( e ) =
self . room_state_get ( & invite_event . room_id , & StateEventType ::RoomJoinRules , "" ) ?
{
state . push ( e . to_stripped_state_event ( ) ) ;
return Err ( Error ::BadRequest (
ErrorKind ::NotFound ,
"Alias does not exist." ,
) ) ;
}
if let Some ( e ) = self . room_state_get (
& invite_event . room_id ,
& StateEventType ::RoomCanonicalAlias ,
"" ,
) ? {
state . push ( e . to_stripped_state_event ( ) ) ;
}
if let Some ( e ) =
self . room_state_get ( & invite_event . room_id , & StateEventType ::RoomAvatar , "" ) ?
{
state . push ( e . to_stripped_state_event ( ) ) ;
}
if let Some ( e ) =
self . room_state_get ( & invite_event . room_id , & StateEventType ::RoomName , "" ) ?
{
state . push ( e . to_stripped_state_event ( ) ) ;
}
if let Some ( e ) = self . room_state_get (
& invite_event . room_id ,
& StateEventType ::RoomMember ,
invite_event . sender . as_str ( ) ,
) ? {
state . push ( e . to_stripped_state_event ( ) ) ;
}
state . push ( invite_event . to_stripped_state_event ( ) ) ;
Ok ( state )
}
#[ tracing::instrument(skip(self)) ]
pub fn set_room_state ( & self , room_id : & RoomId , shortstatehash : u64 ) -> Result < ( ) > {
self . roomid_shortstatehash
. insert ( room_id . as_bytes ( ) , & shortstatehash . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
}
#[ tracing::instrument(skip(self, globals)) ]
pub fn set_alias (
fn resolve_local_alias (
& self ,
alias : & RoomAliasId ,
room_id : Option < & RoomId > ,
globals : & super ::globals ::Globals ,
alias : & RoomAliasId
) -> Result < ( ) > {
if let Some ( room_id ) = room_id {
// New alias
self . alias_roomid
. insert ( alias . alias ( ) . as_bytes ( ) , room_id . as_bytes ( ) ) ? ;
let mut aliasid = room_id . as_bytes ( ) . to_vec ( ) ;
aliasid . push ( 0xff ) ;
aliasid . extend_from_slice ( & globals . next_count ( ) ? . to_be_bytes ( ) ) ;
self . aliasid_alias . insert ( & aliasid , & * alias . as_bytes ( ) ) ? ;
} else {
// room_id=None means remove alias
if let Some ( room_id ) = self . alias_roomid . get ( alias . alias ( ) . as_bytes ( ) ) ? {
let mut prefix = room_id . to_vec ( ) ;
prefix . push ( 0xff ) ;
for ( key , _ ) in self . aliasid_alias . scan_prefix ( prefix ) {
self . aliasid_alias . remove ( & key ) ? ;
}
self . alias_roomid . remove ( alias . alias ( ) . as_bytes ( ) ) ? ;
} else {
return Err ( Error ::BadRequest (
ErrorKind ::NotFound ,
"Alias does not exist." ,
) ) ;
}
}
Ok ( ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn id_from_alias ( & self , alias : & RoomAliasId ) -> Result < Option < Box < RoomId > > > {
self . alias_roomid
. get ( alias . alias ( ) . as_bytes ( ) ) ?
. map ( | bytes | {
@ -360,11 +115,10 @@ impl Service {
. transpose ( )
}
#[ tracing::instrument(skip(self)) ]
pub fn room_aliases < ' a > (
& ' a self ,
fn local_aliases_for_room (
& self ,
room_id : & RoomId ,
) -> impl Iterator < Item = Result < Box < RoomAliasId > > > + ' a {
) -> Result < ( ) > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -375,26 +129,22 @@ impl Service {
. map_err ( | _ | Error ::bad_database ( "Invalid alias in aliasid_alias." ) )
} )
}
}
impl service ::room ::directory ::Data for KeyValueDatabase {
fn set_public ( & self , room_id : & RoomId ) -> Result < ( ) > {
self . publicroomids . insert ( room_id . as_bytes ( ) , & [ ] ) ? ;
}
#[ tracing::instrument(skip(self)) ]
pub fn set_public ( & self , room_id : & RoomId , public : bool ) -> Result < ( ) > {
if public {
self . publicroomids . insert ( room_id . as_bytes ( ) , & [ ] ) ? ;
} else {
self . publicroomids . remove ( room_id . as_bytes ( ) ) ? ;
}
Ok ( ( ) )
fn set_not_public ( & self , room_id : & RoomId ) -> Result < ( ) > {
self . publicroomids . remove ( room_id . as_bytes ( ) ) ? ;
}
#[ tracing::instrument(skip(self)) ]
pub fn is_public_room ( & self , room_id : & RoomId ) -> Result < bool > {
fn is_public_room ( & self , room_id : & RoomId ) -> Result < bool > {
Ok ( self . publicroomids . get ( room_id . as_bytes ( ) ) ? . is_some ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn public_rooms ( & self ) -> impl Iterator < Item = Result < Box < RoomId > > > + ' _ {
fn public_rooms ( & self ) -> impl Iterator < Item = Result < Box < RoomId > > > + ' _ {
self . publicroomids . iter ( ) . map ( | ( bytes , _ ) | {
RoomId ::parse (
utils ::string_from_bytes ( & bytes ) . map_err ( | _ | {
@ -404,43 +154,14 @@ impl Service {
. map_err ( | _ | Error ::bad_database ( "Room ID in publicroomids is invalid." ) )
} )
}
use crate ::{ database ::abstraction ::Tree , utils , Error , Result } ;
use ruma ::{
events ::{
presence ::{ PresenceEvent , PresenceEventContent } ,
receipt ::ReceiptEvent ,
SyncEphemeralRoomEvent ,
} ,
presence ::PresenceState ,
serde ::Raw ,
signatures ::CanonicalJsonObject ,
RoomId , UInt , UserId ,
} ;
use std ::{
collections ::{ HashMap , HashSet } ,
mem ,
sync ::Arc ,
} ;
pub struct RoomEdus {
pub ( in super ::super ) readreceiptid_readreceipt : Arc < dyn Tree > , // ReadReceiptId = RoomId + Count + UserId
pub ( in super ::super ) roomuserid_privateread : Arc < dyn Tree > , // RoomUserId = Room + User, PrivateRead = Count
pub ( in super ::super ) roomuserid_lastprivatereadupdate : Arc < dyn Tree > , // LastPrivateReadUpdate = Count
pub ( in super ::super ) typingid_userid : Arc < dyn Tree > , // TypingId = RoomId + TimeoutTime + Count
pub ( in super ::super ) roomid_lasttypingupdate : Arc < dyn Tree > , // LastRoomTypingUpdate = Count
pub ( in super ::super ) presenceid_presence : Arc < dyn Tree > , // PresenceId = RoomId + Count + UserId
pub ( in super ::super ) userid_lastpresenceupdate : Arc < dyn Tree > , // LastPresenceUpdate = Count
}
impl RoomEdus {
/// Adds an event which will be saved until a new event replaces it (e.g. read receipt).
pub fn readreceipt_update (
impl service ::room ::edus ::Data for KeyValueDatabase {
fn readreceipt_update (
& self ,
user_id : & UserId ,
room_id : & RoomId ,
event : ReceiptEvent ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -477,8 +198,6 @@ impl RoomEdus {
Ok ( ( ) )
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[ tracing::instrument(skip(self)) ]
pub fn readreceipts_since < ' a > (
& ' a self ,
room_id : & RoomId ,
@ -527,14 +246,11 @@ impl RoomEdus {
} )
}
/// Sets a private read marker at `count`.
#[ tracing::instrument(skip(self, globals)) ]
pub fn private_read_set (
fn private_read_set (
& self ,
room_id : & RoomId ,
user_id : & UserId ,
count : u64 ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let mut key = room_id . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
@ -545,13 +261,9 @@ impl RoomEdus {
self . roomuserid_lastprivatereadupdate
. insert ( & key , & globals . next_count ( ) ? . to_be_bytes ( ) ) ? ;
Ok ( ( ) )
}
/// Returns the private read marker.
#[ tracing::instrument(skip(self)) ]
pub fn private_read_get ( & self , room_id : & RoomId , user_id : & UserId ) -> Result < Option < u64 > > {
fn private_read_get ( & self , room_id : & RoomId , user_id : & UserId ) -> Result < Option < u64 > > {
let mut key = room_id . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( user_id . as_bytes ( ) ) ;
@ -565,8 +277,7 @@ impl RoomEdus {
} )
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update ( & self , user_id : & UserId , room_id : & RoomId ) -> Result < u64 > {
fn last_privateread_update ( & self , user_id : & UserId , room_id : & RoomId ) -> Result < u64 > {
let mut key = room_id . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( user_id . as_bytes ( ) ) ;
@ -583,9 +294,7 @@ impl RoomEdus {
. unwrap_or ( 0 ) )
}
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
pub fn typing_add (
fn typing_add (
& self ,
user_id : & UserId ,
room_id : & RoomId ,
@ -611,12 +320,10 @@ impl RoomEdus {
Ok ( ( ) )
}
/// Removes a user from typing before the timeout is reached.
pub fn typing_remove (
fn typing_remove (
& self ,
user_id : & UserId ,
room_id : & RoomId ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -643,59 +350,10 @@ impl RoomEdus {
Ok ( ( ) )
}
/// Makes sure that typing events with old timestamps get removed.
fn typings_maintain (
& self ,
room_id : & RoomId ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
let current_timestamp = utils ::millis_since_unix_epoch ( ) ;
let mut found_outdated = false ;
// Find all outdated edus before inserting a new one
for outdated_edu in self
. typingid_userid
. scan_prefix ( prefix )
. map ( | ( key , _ ) | {
Ok ::< _ , Error > ( (
key . clone ( ) ,
utils ::u64_from_bytes (
& key . splitn ( 2 , | & b | b = = 0xff ) . nth ( 1 ) . ok_or_else ( | | {
Error ::bad_database ( "RoomTyping has invalid timestamp or delimiters." )
} ) ? [ 0 .. mem ::size_of ::< u64 > ( ) ] ,
)
. map_err ( | _ | Error ::bad_database ( "RoomTyping has invalid timestamp bytes." ) ) ? ,
) )
} )
. filter_map ( | r | r . ok ( ) )
. take_while ( | & ( _ , timestamp ) | timestamp < current_timestamp )
{
// This is an outdated edu (time > timestamp)
self . typingid_userid . remove ( & outdated_edu . 0 ) ? ;
found_outdated = true ;
}
if found_outdated {
self . roomid_lasttypingupdate
. insert ( room_id . as_bytes ( ) , & globals . next_count ( ) ? . to_be_bytes ( ) ) ? ;
}
Ok ( ( ) )
}
/// Returns the count of the last typing update in this room.
#[ tracing::instrument(skip(self, globals)) ]
pub fn last_typing_update (
fn last_typing_update (
& self ,
room_id : & RoomId ,
globals : & super ::super ::globals ::Globals ,
) -> Result < u64 > {
self . typings_maintain ( room_id , globals ) ? ;
Ok ( self
. roomid_lasttypingupdate
. get ( room_id . as_bytes ( ) ) ?
@ -708,10 +366,10 @@ impl RoomEdus {
. unwrap_or ( 0 ) )
}
pub fn typings_all (
fn typings_all (
& self ,
room_id : & RoomId ,
) -> Result < SyncEphemeralRoomEvent< ruma ::events ::typing ::TypingEventContent > > {
) -> Result < HashSet< UserId > > {
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -726,23 +384,14 @@ impl RoomEdus {
user_ids . insert ( user_id ) ;
}
Ok ( SyncEphemeralRoomEvent {
content : ruma ::events ::typing ::TypingEventContent {
user_ids : user_ids . into_iter ( ) . collect ( ) ,
} ,
} )
Ok ( user_ids )
}
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to
/// make sure users outside these rooms can't see them.
pub fn update_presence (
fn update_presence (
& self ,
user_id : & UserId ,
room_id : & RoomId ,
presence : PresenceEvent ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
@ -767,8 +416,6 @@ impl RoomEdus {
Ok ( ( ) )
}
/// Resets the presence timeout, so the user will stay in their current presence state.
#[ tracing::instrument(skip(self)) ]
pub fn ping_presence ( & self , user_id : & UserId ) -> Result < ( ) > {
self . userid_lastpresenceupdate . insert (
user_id . as_bytes ( ) ,
@ -778,8 +425,7 @@ impl RoomEdus {
Ok ( ( ) )
}
/// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
pub fn last_presence_update ( & self , user_id : & UserId ) -> Result < Option < u64 > > {
fn last_presence_update ( & self , user_id : & UserId ) -> Result < Option < u64 > > {
self . userid_lastpresenceupdate
. get ( user_id . as_bytes ( ) ) ?
. map ( | bytes | {
@ -790,125 +436,29 @@ impl RoomEdus {
. transpose ( )
}
pub fn ge t_las t_presence_event(
fn ge t_presence_event(
& self ,
user_id : & UserId ,
room_id : & RoomId ,
count : u64 ,
) -> Result < Option < PresenceEvent > > {
let last_update = match self . last_presence_update ( user_id ) ? {
Some ( last ) = > last ,
None = > return Ok ( None ) ,
} ;
let mut presence_id = room_id . as_bytes ( ) . to_vec ( ) ;
presence_id . push ( 0xff ) ;
presence_id . extend_from_slice ( & last_update . to_be_bytes ( ) ) ;
presence_id . extend_from_slice ( & count . to_be_bytes ( ) ) ;
presence_id . push ( 0xff ) ;
presence_id . extend_from_slice ( user_id . as_bytes ( ) ) ;
self . presenceid_presence
. get ( & presence_id ) ?
. map ( | value | {
let mut presence : PresenceEvent = serde_json ::from_slice ( & value )
. map_err ( | _ | Error ::bad_database ( "Invalid presence event in db." ) ) ? ;
let current_timestamp : UInt = utils ::millis_since_unix_epoch ( )
. try_into ( )
. expect ( "time is valid" ) ;
if presence . content . presence = = PresenceState ::Online {
// Don't set last_active_ago when the user is online
presence . content . last_active_ago = None ;
} else {
// Convert from timestamp to duration
presence . content . last_active_ago = presence
. content
. last_active_ago
. map ( | timestamp | current_timestamp - timestamp ) ;
}
Ok ( presence )
} )
. map ( | value | parse_presence_event ( & value ) )
. transpose ( )
}
/// Sets all users to offline who have been quiet for too long.
fn _presence_maintain (
& self ,
rooms : & super ::Rooms ,
globals : & super ::super ::globals ::Globals ,
) -> Result < ( ) > {
let current_timestamp = utils ::millis_since_unix_epoch ( ) ;
for ( user_id_bytes , last_timestamp ) in self
. userid_lastpresenceupdate
. iter ( )
. filter_map ( | ( k , bytes ) | {
Some ( (
k ,
utils ::u64_from_bytes ( & bytes )
. map_err ( | _ | {
Error ::bad_database ( "Invalid timestamp in userid_lastpresenceupdate." )
} )
. ok ( ) ? ,
) )
} )
. take_while ( | ( _ , timestamp ) | current_timestamp . saturating_sub ( * timestamp ) > 5 * 60_000 )
// 5 Minutes
{
// Send new presence events to set the user offline
let count = globals . next_count ( ) ? . to_be_bytes ( ) ;
let user_id : Box < _ > = utils ::string_from_bytes ( & user_id_bytes )
. map_err ( | _ | {
Error ::bad_database ( "Invalid UserId bytes in userid_lastpresenceupdate." )
} ) ?
. try_into ( )
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in userid_lastpresenceupdate." ) ) ? ;
for room_id in rooms . rooms_joined ( & user_id ) . filter_map ( | r | r . ok ( ) ) {
let mut presence_id = room_id . as_bytes ( ) . to_vec ( ) ;
presence_id . push ( 0xff ) ;
presence_id . extend_from_slice ( & count ) ;
presence_id . push ( 0xff ) ;
presence_id . extend_from_slice ( & user_id_bytes ) ;
self . presenceid_presence . insert (
& presence_id ,
& serde_json ::to_vec ( & PresenceEvent {
content : PresenceEventContent {
avatar_url : None ,
currently_active : None ,
displayname : None ,
last_active_ago : Some (
last_timestamp . try_into ( ) . expect ( "time is valid" ) ,
) ,
presence : PresenceState ::Offline ,
status_msg : None ,
} ,
sender : user_id . to_owned ( ) ,
} )
. expect ( "PresenceEvent can be serialized" ) ,
) ? ;
}
self . userid_lastpresenceupdate . insert (
user_id . as_bytes ( ) ,
& utils ::millis_since_unix_epoch ( ) . to_be_bytes ( ) ,
) ? ;
}
Ok ( ( ) )
}
/// Returns an iterator over the most recent presence updates that happened after the event with id `since`.
#[ tracing::instrument(skip(self, since, _rooms, _globals)) ]
pub fn presence_since (
fn presence_since (
& self ,
room_id : & RoomId ,
since : u64 ,
_rooms : & super ::Rooms ,
_globals : & super ::super ::globals ::Globals ,
) -> Result < HashMap < Box < UserId > , PresenceEvent > > {
//self.presence_maintain(rooms, globals)?;
let mut prefix = room_id . as_bytes ( ) . to_vec ( ) ;
prefix . push ( 0xff ) ;
@ -931,23 +481,7 @@ impl RoomEdus {
)
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in presenceid_presence." ) ) ? ;
let mut presence : PresenceEvent = serde_json ::from_slice ( & value )
. map_err ( | _ | Error ::bad_database ( "Invalid presence event in db." ) ) ? ;
let current_timestamp : UInt = utils ::millis_since_unix_epoch ( )
. try_into ( )
. expect ( "time is valid" ) ;
if presence . content . presence = = PresenceState ::Online {
// Don't set last_active_ago when the user is online
presence . content . last_active_ago = None ;
} else {
// Convert from timestamp to duration
presence . content . last_active_ago = presence
. content
. last_active_ago
. map ( | timestamp | current_timestamp - timestamp ) ;
}
let presence = parse_presence_event ( & value ) ? ;
hashmap . insert ( user_id , presence ) ;
}
@ -956,8 +490,28 @@ impl RoomEdus {
}
}
#[ tracing::instrument(skip(self)) ]
pub fn lazy_load_was_sent_before (
fn parse_presence_event ( bytes : & [ u8 ] ) -> Result < PresenceEvent > {
let mut presence : PresenceEvent = serde_json ::from_slice ( bytes )
. map_err ( | _ | Error ::bad_database ( "Invalid presence event in db." ) ) ? ;
let current_timestamp : UInt = utils ::millis_since_unix_epoch ( )
. try_into ( )
. expect ( "time is valid" ) ;
if presence . content . presence = = PresenceState ::Online {
// Don't set last_active_ago when the user is online
presence . content . last_active_ago = None ;
} else {
// Convert from timestamp to duration
presence . content . last_active_ago = presence
. content
. last_active_ago
. map ( | timestamp | current_timestamp - timestamp ) ;
}
}
impl service ::room ::lazy_load ::Data for KeyValueDatabase {
fn lazy_load_was_sent_before (
& self ,
user_id : & UserId ,
device_id : & DeviceId ,
@ -974,28 +528,7 @@ impl RoomEdus {
Ok ( self . lazyloadedids . get ( & key ) ? . is_some ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn lazy_load_mark_sent (
& self ,
user_id : & UserId ,
device_id : & DeviceId ,
room_id : & RoomId ,
lazy_load : HashSet < Box < UserId > > ,
count : u64 ,
) {
self . lazy_load_waiting . lock ( ) . unwrap ( ) . insert (
(
user_id . to_owned ( ) ,
device_id . to_owned ( ) ,
room_id . to_owned ( ) ,
count ,
) ,
lazy_load ,
) ;
}
#[ tracing::instrument(skip(self)) ]
pub fn lazy_load_confirm_delivery (
fn lazy_load_confirm_delivery (
& self ,
user_id : & UserId ,
device_id : & DeviceId ,
@ -1025,8 +558,7 @@ impl RoomEdus {
Ok ( ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn lazy_load_reset (
fn lazy_load_reset (
& self ,
user_id : & UserId ,
device_id : & DeviceId ,
@ -1045,10 +577,10 @@ impl RoomEdus {
Ok ( ( ) )
}
}
/// Checks if a room exists.
#[ tracing::instrument(skip(self)) ]
pub fn exists ( & self , room_id : & RoomId ) -> Result < bool > {
impl service ::room ::metadata ::Data for KeyValueDatabase {
fn exists ( & self , room_id : & RoomId ) -> Result < bool > {
let prefix = match self . get_shortroomid ( room_id ) ? {
Some ( b ) = > b . to_be_bytes ( ) . to_vec ( ) ,
None = > return Ok ( false ) ,
@ -1062,36 +594,10 @@ impl RoomEdus {
. filter ( | ( k , _ ) | k . starts_with ( & prefix ) )
. is_some ( ) )
}
}
pub fn get_shortroomid ( & self , room_id : & RoomId ) -> Result < Option < u64 > > {
self . roomid_shortroomid
. get ( room_id . as_bytes ( ) ) ?
. map ( | bytes | {
utils ::u64_from_bytes ( & bytes )
. map_err ( | _ | Error ::bad_database ( "Invalid shortroomid in db." ) )
} )
. transpose ( )
}
pub fn get_or_create_shortroomid (
& self ,
room_id : & RoomId ,
globals : & super ::globals ::Globals ,
) -> Result < u64 > {
Ok ( match self . roomid_shortroomid . get ( room_id . as_bytes ( ) ) ? {
Some ( short ) = > utils ::u64_from_bytes ( & short )
. map_err ( | _ | Error ::bad_database ( "Invalid shortroomid in db." ) ) ? ,
None = > {
let short = globals . next_count ( ) ? ;
self . roomid_shortroomid
. insert ( room_id . as_bytes ( ) , & short . to_be_bytes ( ) ) ? ;
short
}
} )
}
/// Returns the pdu from the outlier tree.
pub fn get_outlier_pdu_json ( & self , event_id : & EventId ) -> Result < Option < CanonicalJsonObject > > {
impl service ::room ::outlier ::Data for KeyValueDatabase {
fn get_outlier_pdu_json ( & self , event_id : & EventId ) -> Result < Option < CanonicalJsonObject > > {
self . eventid_outlierpdu
. get ( event_id . as_bytes ( ) ) ?
. map_or ( Ok ( None ) , | pdu | {
@ -1099,8 +605,7 @@ impl RoomEdus {
} )
}
/// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier ( & self , event_id : & EventId ) -> Result < Option < PduEvent > > {
fn get_outlier_pdu ( & self , event_id : & EventId ) -> Result < Option < PduEvent > > {
self . eventid_outlierpdu
. get ( event_id . as_bytes ( ) ) ?
. map_or ( Ok ( None ) , | pdu | {
@ -1108,18 +613,16 @@ impl RoomEdus {
} )
}
/// Append the PDU as an outlier.
#[ tracing::instrument(skip(self, pdu)) ]
pub fn add_pdu_outlier ( & self , event_id : & EventId , pdu : & CanonicalJsonObject ) -> Result < ( ) > {
fn add_pdu_outlier ( & self , event_id : & EventId , pdu : & CanonicalJsonObject ) -> Result < ( ) > {
self . eventid_outlierpdu . insert (
event_id . as_bytes ( ) ,
& serde_json ::to_vec ( & pdu ) . expect ( "CanonicalJsonObject is valid" ) ,
)
}
}
#[ tracing::instrument(skip(self, room_id, event_ids)) ]
pub fn mark_as_referenced ( & self , room_id : & RoomId , event_ids : & [ Arc < EventId > ] ) -> Result < ( ) > {
impl service ::room ::pdu_metadata ::Data for KeyValueDatabase {
fn mark_as_referenced ( & self , room_id : & RoomId , event_ids : & [ Arc < EventId > ] ) -> Result < ( ) > {
for prev in event_ids {
let mut key = room_id . as_bytes ( ) . to_vec ( ) ;
key . extend_from_slice ( prev . as_bytes ( ) ) ;
@ -1129,22 +632,19 @@ impl RoomEdus {
Ok ( ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn is_event_referenced ( & self , room_id : & RoomId , event_id : & EventId ) -> Result < bool > {
fn is_event_referenced ( & self , room_id : & RoomId , event_id : & EventId ) -> Result < bool > {
let mut key = room_id . as_bytes ( ) . to_vec ( ) ;
key . extend_from_slice ( event_id . as_bytes ( ) ) ;
Ok ( self . referencedevents . get ( & key ) ? . is_some ( ) )
}
#[ tracing::instrument(skip(self)) ]
pub fn mark_event_soft_failed ( & self , event_id : & EventId ) -> Result < ( ) > {
fn mark_event_soft_failed ( & self , event_id : & EventId ) -> Result < ( ) > {
self . softfailedeventids . insert ( event_id . as_bytes ( ) , & [ ] )
}
#[ tracing::instrument(skip(self)) ]
pub fn is_event_soft_failed ( & self , event_id : & EventId ) -> Result < bool > {
fn is_event_soft_failed ( & self , event_id : & EventId ) -> Result < bool > {
self . softfailedeventids
. get ( event_id . as_bytes ( ) )
. map ( | o | o . is_some ( ) )
}
}