@ -2,17 +2,29 @@ use super::State;
use crate ::{ ConduitResult , Database , Error , Ruma } ;
use ruma ::{
api ::client ::r0 ::sync ::sync_events ,
events ::{ AnySyncEphemeralRoomEvent, EventType } ,
Raw ,
events ::{ room::member ::MembershipState , AnySyncEphemeralRoomEvent, EventType } ,
Raw , RoomId , UserId ,
} ;
#[ cfg(feature = " conduit_bin " ) ]
use rocket ::{ get , tokio } ;
use std ::{
collections ::{ hash_map , BTreeMap , HashMap , HashSet } ,
convert ::TryFrom ,
time ::Duration ,
} ;
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
///
/// - This endpoint takes a `since` parameter which should be the `next_batch` value from a
/// previous request.
/// - Calling this endpoint without a `since` parameter will return all recent events, the state
/// of all rooms and more data. This should only be called on the initial login of the device.
/// - To get incremental updates, you can call this endpoint with a `since` parameter. This will
/// return all recent events, state updates and more data that happened since the last /sync
/// request.
#[ cfg_attr(
feature = "conduit_bin" ,
get ( "/_matrix/client/r0/sync" , data = "<body>" )
@ -40,7 +52,9 @@ pub async fn sync_events_route(
. unwrap_or ( 0 ) ;
let mut presence_updates = HashMap ::new ( ) ;
let mut left_encrypted_users = HashSet ::new ( ) ; // Users that have left any encrypted rooms the sender was in
let mut device_list_updates = HashSet ::new ( ) ;
let mut device_list_left = HashSet ::new ( ) ;
// Look for device list updates of this account
device_list_updates . extend (
@ -67,46 +81,100 @@ pub async fn sync_events_route(
. rev ( )
. collect ::< Vec < _ > > ( ) ;
let send_notification_counts = ! timeline_pdus . is_empty ( ) ;
// They /sync response doesn't always return all messages, so we say the output is
// limited unless there are events in non_timeline_pdus
//let mut limited = false;
let mut limited = false ;
let mut state_pdus = Vec ::new ( ) ;
for pdu in non_timeline_pdus {
if pdu . state_key . is_some ( ) {
state_pdus . push ( pdu ) ;
}
limited = true ;
}
let encrypted_room = db
. rooms
. room_state_get ( & room_id , & EventType ::RoomEncryption , "" ) ?
. is_some ( ) ;
// TODO: optimize this?
let mut send_member_count = false ;
let mut joined_since_last_sync = false ;
let mut send_notification_counts = false ;
for pdu in db
let mut new_encrypted_room = false ;
for ( state_key , pdu ) in db
. rooms
. pdus_since ( & sender_id , & room_id , since ) ?
. filter_map ( | r | r . ok ( ) )
. filter_map ( | pdu | Some ( ( pdu . state_key . clone ( ) ? , pdu ) ) )
{
let pdu = pdu ? ;
send_notification_counts = true ;
if pdu . kind = = EventType ::RoomMember {
send_member_count = true ;
if ! joined_since_last_sync & & pdu . state_key = = Some ( sender_id . to_string ( ) ) {
let content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( pdu . content . clone ( ) )
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ? ;
if content . membership = = ruma ::events ::room ::member ::MembershipState ::Join {
joined_since_last_sync = true ;
// Both send_member_count and joined_since_last_sync are set. There's
// nothing more to do
break ;
let content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( pdu . content . clone ( ) )
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ? ;
if pdu . state_key = = Some ( sender_id . to_string ( ) )
& & content . membership = = MembershipState ::Join
{
joined_since_last_sync = true ;
} else if encrypted_room & & content . membership = = MembershipState ::Join {
// A new user joined an encrypted room
let user_id = UserId ::try_from ( state_key )
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in member PDU." ) ) ? ;
// Add encryption update if we didn't share an encrypted room already
if ! share_encrypted_room ( & db , & sender_id , & user_id , & room_id ) {
device_list_updates . insert ( user_id ) ;
}
} else if encrypted_room & & content . membership = = MembershipState ::Leave {
// Write down users that have left encrypted rooms we are in
left_encrypted_users . insert (
UserId ::try_from ( state_key )
. map_err ( | _ | Error ::bad_database ( "Invalid UserId in member PDU." ) ) ? ,
) ;
}
} else if pdu . kind = = EventType ::RoomEncryption {
new_encrypted_room = true ;
}
}
let members = db . rooms . room_state_type ( & room_id , & EventType ::RoomMember ) ? ;
if joined_since_last_sync & & encrypted_room | | new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_updates . extend (
db . rooms
. room_members ( & room_id )
. filter_map ( | user_id | {
Some (
UserId ::try_from ( user_id . ok ( ) ? . clone ( ) )
. map_err ( | _ | {
Error ::bad_database ( "Invalid member event state key in db." )
} )
. ok ( ) ? ,
)
} )
. filter ( | user_id | {
// Don't send key updates from the sender to the sender
sender_id ! = user_id
} )
. filter ( | user_id | {
// Only send keys if the sender doesn't share an encrypted room with the target already
! share_encrypted_room ( & db , sender_id , user_id , & room_id )
} ) ,
) ;
}
// Look for device list updates in this room
device_list_updates . extend (
db . users
. keys_changed ( & room_id . to_string ( ) , since , None )
. filter_map ( | r | r . ok ( ) ) ,
) ;
let ( joined_member_count , invited_member_count , heroes ) = if send_member_count {
let joined_member_count = db . rooms . room_members ( & room_id ) . count ( ) ;
@ -133,35 +201,17 @@ pub async fn sync_events_route(
. map_err ( | _ | Error ::bad_database ( "Invalid member event in database." ) ) ? ;
if let Some ( state_key ) = & pdu . state_key {
let current_content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> (
members
. get ( state_key )
. ok_or_else ( | | {
Error ::bad_database (
"A user that joined once has no member event anymore." ,
)
} ) ?
. content
. clone ( ) ,
)
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | {
Error ::bad_database ( "Invalid member event in database." )
let user_id = UserId ::try_from ( state_key . clone ( ) ) . map_err ( | _ | {
Error ::bad_database ( "Invalid UserId in member PDU." )
} ) ? ;
// The membership was and still is invite or join
if matches! (
content . membership ,
ruma ::events ::room ::member ::MembershipState ::Join
| ruma ::events ::room ::member ::MembershipState ::Invite
) & & matches! (
current_content . membership ,
ruma ::events ::room ::member ::MembershipState ::Join
| ruma ::events ::room ::member ::MembershipState ::Invite
) {
MembershipState ::Join | MembershipState ::Invite
) & & ( db . rooms . is_joined ( & user_id , & room_id ) ?
| | db . rooms . is_invited ( & user_id , & room_id ) ? )
{
Ok ::< _ , Error > ( Some ( state_key . clone ( ) ) )
} else {
Ok ( None )
@ -274,7 +324,7 @@ pub async fn sync_events_route(
notification_count ,
} ,
timeline : sync_events ::Timeline {
limited : joined_since_last_sync,
limited : limited | | joined_since_last_sync,
prev_batch ,
events : room_events ,
} ,
@ -297,13 +347,6 @@ pub async fn sync_events_route(
joined_rooms . insert ( room_id . clone ( ) , joined_room ) ;
}
// Look for device list updates in this room
device_list_updates . extend (
db . users
. keys_changed ( & room_id . to_string ( ) , since , None )
. filter_map ( | r | r . ok ( ) ) ,
) ;
// Take presence updates from this room
for ( user_id , presence ) in
db . rooms
@ -348,31 +391,6 @@ pub async fn sync_events_route(
. map ( | pdu | pdu . to_sync_room_event ( ) )
. collect ( ) ;
// TODO: Only until leave point
let mut edus = db
. rooms
. edus
. roomlatests_since ( & room_id , since ) ?
. filter_map ( | r | r . ok ( ) ) // Filter out buggy events
. collect ::< Vec < _ > > ( ) ;
if db
. rooms
. edus
. last_roomactive_update ( & room_id , & db . globals ) ?
> since
{
edus . push (
serde_json ::from_str (
& serde_json ::to_string ( & AnySyncEphemeralRoomEvent ::Typing (
db . rooms . edus . roomactives_all ( & room_id ) ? ,
) )
. expect ( "event is valid, we just created it" ) ,
)
. expect ( "event is valid, we just created it" ) ,
) ;
}
let left_room = sync_events ::LeftRoom {
account_data : sync_events ::AccountData { events : Vec ::new ( ) } ,
timeline : sync_events ::Timeline {
@ -383,6 +401,49 @@ pub async fn sync_events_route(
state : sync_events ::State { events : Vec ::new ( ) } ,
} ;
let mut left_since_last_sync = false ;
for pdu in db . rooms . pdus_since ( & sender_id , & room_id , since ) ? {
let pdu = pdu ? ;
if pdu . kind = = EventType ::RoomMember & & pdu . state_key = = Some ( sender_id . to_string ( ) ) {
let content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( pdu . content . clone ( ) )
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ? ;
if content . membership = = MembershipState ::Leave {
left_since_last_sync = true ;
break ;
}
}
}
if left_since_last_sync {
device_list_left . extend (
db . rooms
. room_members ( & room_id )
. filter_map ( | user_id | {
Some (
UserId ::try_from ( user_id . ok ( ) ? . clone ( ) )
. map_err ( | _ | {
Error ::bad_database ( "Invalid member event state key in db." )
} )
. ok ( ) ? ,
)
} )
. filter ( | user_id | {
// Don't send key updates from the sender to the sender
sender_id ! = user_id
} )
. filter ( | user_id | {
// Only send if the sender doesn't share any encrypted room with the target
// anymore
! share_encrypted_room ( & db , sender_id , user_id , & room_id )
} ) ,
) ;
}
if ! left_room . is_empty ( ) {
left_rooms . insert ( room_id . clone ( ) , left_room ) ;
}
@ -392,23 +453,19 @@ pub async fn sync_events_route(
for room_id in db . rooms . rooms_invited ( & sender_id ) {
let room_id = room_id ? ;
let mut invited_since_last_sync = false ;
for pdu in db
. rooms
. pdus_since ( & sender_id , & room_id , since ) ?
{
for pdu in db . rooms . pdus_since ( & sender_id , & room_id , since ) ? {
let pdu = pdu ? ;
if pdu . kind = = EventType ::RoomMember {
if pdu . state_key = = Some ( sender_id . to_string ( ) ) {
let content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( pdu . content . clone ( ) )
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ? ;
if content . membership = = ruma ::events ::room ::member ::MembershipState ::Invite {
invited_since_last_sync = true ;
break ;
}
if pdu . kind = = EventType ::RoomMember & & pdu . state_key = = Some ( sender_id . to_string ( ) ) {
let content = serde_json ::from_value ::<
Raw < ruma ::events ::room ::member ::MemberEventContent > ,
> ( pdu . content . clone ( ) )
. expect ( "Raw::from_value always works" )
. deserialize ( )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in database." ) ) ? ;
if content . membership = = MembershipState ::Invite {
invited_since_last_sync = true ;
break ;
}
}
}
@ -433,6 +490,27 @@ pub async fn sync_events_route(
}
}
for user_id in left_encrypted_users {
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if db
. rooms
. get_shared_rooms ( vec! [ sender_id . clone ( ) , user_id . clone ( ) ] )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | other_room_id | {
Some (
db . rooms
. room_state_get ( & other_room_id , & EventType ::RoomEncryption , "" )
. ok ( ) ?
. is_some ( ) ,
)
} )
. all ( | encrypted | ! encrypted )
{
device_list_left . insert ( user_id ) ;
}
}
// Remove all to-device events the device received *last time*
db . users
. remove_to_device_events ( sender_id , device_id , since ) ? ;
@ -464,7 +542,7 @@ pub async fn sync_events_route(
} ,
device_lists : sync_events ::DeviceLists {
changed : device_list_updates . into_iter ( ) . collect ( ) ,
left : Vec ::new ( ) , // TODO
left : device_list_left . into_iter ( ) . collect ( ) ,
} ,
device_one_time_keys_count : if db . users . last_one_time_keys_update ( sender_id ) ? > since {
db . users . count_one_time_keys ( sender_id , device_id ) ?
@ -500,3 +578,24 @@ pub async fn sync_events_route(
Ok ( response . into ( ) )
}
fn share_encrypted_room (
db : & Database ,
sender_id : & UserId ,
user_id : & UserId ,
ignore_room : & RoomId ,
) -> bool {
db . rooms
. get_shared_rooms ( vec! [ sender_id . clone ( ) , user_id . clone ( ) ] )
. filter_map ( | r | r . ok ( ) )
. filter ( | room_id | room_id ! = ignore_room )
. filter_map ( | other_room_id | {
Some (
db . rooms
. room_state_get ( & other_room_id , & EventType ::RoomEncryption , "" )
. ok ( ) ?
. is_some ( ) ,
)
} )
. any ( | encrypted | encrypted )
}