From caddc656fba15ef3e13f65f21a7e6f43eb42e786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 23 Jul 2023 21:57:11 +0200 Subject: [PATCH 1/2] slightly better sliding sync --- src/api/client_server/sync.rs | 120 ++++++++++++++++++--- src/service/mod.rs | 7 +- src/service/rooms/state_accessor/mod.rs | 15 +++ src/service/users/mod.rs | 137 +++++++++++++++++++++++- 4 files changed, 260 insertions(+), 19 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index fed4fb73..8883c162 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -23,7 +23,7 @@ use ruma::{ uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, }; use std::{ - collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, sync::Arc, time::Duration, }; @@ -1174,8 +1174,7 @@ pub async fn sync_events_v4_route( ) -> Result> { let sender_user = body.sender_user.expect("user is authenticated"); let sender_device = body.sender_device.expect("user is authenticated"); - let body = dbg!(body.body); - + let mut body = dbg!(body.body); // Setup watchers, so if there's no response, we can wait for them let watcher = services().globals.watch(&sender_user, &sender_device); @@ -1188,7 +1187,21 @@ pub async fn sync_events_v4_route( .unwrap_or(0); let sincecount = PduCount::Normal(since); - let initial = since == 0; + if since == 0 { + if let Some(conn_id) = &body.conn_id { + services().users.forget_sync_request_connection( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + ) + } + } + + let known_rooms = services().users.update_sync_request_with_cache( + sender_user.clone(), + sender_device.clone(), + &mut body, + ); let all_joined_rooms = services() .rooms @@ -1205,8 +1218,10 @@ pub async fn sync_events_v4_route( continue; } + let mut new_known_rooms = BTreeMap::new(); + lists.insert( - list_id, + list_id.clone(), sync_events::v4::SyncList { ops: list .ranges @@ -1219,14 +1234,27 @@ pub async fn sync_events_v4_route( let room_ids = all_joined_rooms [(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)] .to_vec(); - todo_rooms.extend(room_ids.iter().cloned().map(|r| { + new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true))); + for room_id in &room_ids { + let todo_room = todo_rooms.entry(room_id.clone()).or_insert(( + BTreeSet::new(), + 0, + true, + )); let limit = list .room_details .timeline_limit .map_or(10, u64::from) .min(100); - (r, (list.room_details.required_state.clone(), limit)) - })); + todo_room + .0 + .extend(list.room_details.required_state.iter().cloned()); + todo_room.1 = todo_room.1.min(limit); + if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true) + { + todo_room.2 = false; + } + } sync_events::v4::SyncOp { op: SlidingOp::Sync, range: Some(r.clone()), @@ -1239,12 +1267,36 @@ pub async fn sync_events_v4_route( count: UInt::from(all_joined_rooms.len() as u32), }, ); + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_known_rooms( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + list_id, + new_known_rooms, + ); + } + } + + for (room_id, room) in body.room_subscriptions { + let todo_room = todo_rooms + .entry(room_id.clone()) + .or_insert((BTreeSet::new(), 0, true)); + let limit = room.timeline_limit.map_or(10, u64::from).min(100); + todo_room.0.extend(room.required_state.iter().cloned()); + todo_room.1 = todo_room.1.min(limit); + todo_room.2 = false; } let mut rooms = BTreeMap::new(); - for (room_id, (required_state_request, timeline_limit)) in todo_rooms { + for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms { let (timeline_pdus, limited) = - load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?; + load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?; + + if *known && timeline_pdus.is_empty() { + continue; + } let prev_batch = timeline_pdus .first() @@ -1256,7 +1308,14 @@ pub async fn sync_events_v4_route( } PduCount::Normal(c) => c.to_string(), })) - })?; + })? + .or_else(|| { + if since != 0 { + Some(since.to_string()) + } else { + None + } + }); let room_events: Vec<_> = timeline_pdus .iter() @@ -1279,8 +1338,41 @@ pub async fn sync_events_v4_route( rooms.insert( room_id.clone(), sync_events::v4::SlidingSyncRoom { - name: services().rooms.state_accessor.get_name(&room_id)?, - initial: Some(initial), + name: services() + .rooms + .state_accessor + .get_name(&room_id)? + .or_else(|| { + // Heroes + let mut names = services() + .rooms + .state_cache + .room_members(&room_id) + .filter_map(|r| r.ok()) + .filter(|member| member != &sender_user) + .map(|member| { + Ok::<_, Error>( + services() + .rooms + .state_accessor + .get_member(&room_id, &member)? + .and_then(|memberevent| memberevent.displayname) + .unwrap_or(member.to_string()), + ) + }) + .filter_map(|r| r.ok()) + .take(5) + .collect::>(); + if names.len() > 1 { + let last = names.pop().unwrap(); + Some(names.join(", ") + " and " + &last) + } else if names.len() == 1 { + Some(names.pop().unwrap()) + } else { + None + } + }), + initial: Some(*known), is_dm: None, invite_state: None, unread_notifications: UnreadNotificationsCount { @@ -1326,7 +1418,7 @@ pub async fn sync_events_v4_route( } Ok(dbg!(sync_events::v4::Response { - initial: initial, + initial: since == 0, txn_id: body.txn_id.clone(), pos: next_batch.to_string(), lists, diff --git a/src/service/mod.rs b/src/service/mod.rs index 56aed7fa..f85da788 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{Arc, Mutex}, }; @@ -105,7 +105,10 @@ impl Services { }, transaction_ids: transaction_ids::Service { db }, uiaa: uiaa::Service { db }, - users: users::Service { db }, + users: users::Service { + db, + connections: Mutex::new(BTreeMap::new()), + }, account_data: account_data::Service { db }, admin: admin::Service::build(), key_backups: key_backups::Service { db }, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 9d071a53..435f4dff 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -282,4 +282,19 @@ impl Service { .map_err(|_| Error::bad_database("Invalid room name event in database.")) }) } + + pub fn get_member( + &self, + room_id: &RoomId, + user_id: &UserId, + ) -> Result> { + services() + .rooms + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str())? + .map_or(Ok(None), |s| { + serde_json::from_str(s.content.get()) + .map_err(|_| Error::bad_database("Invalid room member event in database.")) + }) + } } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 6be5c895..63ab9b7d 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1,20 +1,36 @@ mod data; -use std::{collections::BTreeMap, mem}; +use std::{ + collections::BTreeMap, + mem, + sync::{Arc, Mutex}, +}; pub use data::Data; use ruma::{ - api::client::{device::Device, error::ErrorKind, filter::FilterDefinition}, + api::client::{ + device::Device, + error::ErrorKind, + filter::FilterDefinition, + sync::sync_events::{self, v4::SyncRequestList}, + }, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, serde::Raw, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, OwnedDeviceId, OwnedDeviceKeyId, OwnedMxcUri, - OwnedUserId, RoomAliasId, UInt, UserId, + OwnedRoomId, OwnedUserId, RoomAliasId, UInt, UserId, }; use crate::{services, Error, Result}; +pub struct SlidingSyncCache { + lists: BTreeMap, + known_rooms: BTreeMap>, +} + pub struct Service { pub db: &'static dyn Data, + pub connections: + Mutex>>>, } impl Service { @@ -23,6 +39,121 @@ impl Service { self.db.exists(user_id) } + pub fn forget_sync_request_connection( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + ) { + self.connections + .lock() + .unwrap() + .remove(&(user_id, device_id, conn_id)); + } + + pub fn update_sync_request_with_cache( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + request: &mut sync_events::v4::Request, + ) -> BTreeMap> { + let Some(conn_id) = request.conn_id.clone() else { return BTreeMap::new(); }; + + let cache = &mut self.connections.lock().unwrap(); + let cached = Arc::clone( + cache + .entry((user_id, device_id, conn_id)) + .or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + known_rooms: BTreeMap::new(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + for (list_id, list) in &mut request.lists { + if let Some(cached_list) = cached.lists.remove(list_id) { + if list.sort.is_empty() { + list.sort = cached_list.sort; + }; + if list.room_details.required_state.is_empty() { + list.room_details.required_state = cached_list.room_details.required_state; + }; + list.room_details.timeline_limit = list + .room_details + .timeline_limit + .or(cached_list.room_details.timeline_limit); + list.include_old_rooms = list + .include_old_rooms + .clone() + .or(cached_list.include_old_rooms); + match (&mut list.filters, cached_list.filters) { + (Some(list_filters), Some(cached_filters)) => { + list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm); + if list_filters.spaces.is_empty() { + list_filters.spaces = cached_filters.spaces; + } + list_filters.is_encrypted = + list_filters.is_encrypted.or(cached_filters.is_encrypted); + list_filters.is_invite = + list_filters.is_invite.or(cached_filters.is_invite); + if list_filters.room_types.is_empty() { + list_filters.room_types = cached_filters.room_types; + } + if list_filters.not_room_types.is_empty() { + list_filters.not_room_types = cached_filters.not_room_types; + } + list_filters.room_name_like = list_filters + .room_name_like + .clone() + .or(cached_filters.room_name_like); + if list_filters.tags.is_empty() { + list_filters.tags = cached_filters.tags; + } + if list_filters.not_tags.is_empty() { + list_filters.not_tags = cached_filters.not_tags; + } + } + (_, Some(cached_filters)) => list.filters = Some(cached_filters), + (_, _) => {} + } + if list.bump_event_types.is_empty() { + list.bump_event_types = cached_list.bump_event_types; + }; + } + cached.lists.insert(list_id.clone(), list.clone()); + } + + cached.known_rooms.clone() + } + + pub fn update_sync_known_rooms( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + list_id: String, + new_cached_rooms: BTreeMap, + ) { + let cache = &mut self.connections.lock().unwrap(); + let cached = Arc::clone( + cache + .entry((user_id, device_id, conn_id)) + .or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + known_rooms: BTreeMap::new(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + cached.known_rooms.insert(list_id, new_cached_rooms); + } + /// Check if account is deactivated pub fn is_deactivated(&self, user_id: &UserId) -> Result { self.db.is_deactivated(user_id) From d220641d6453b8be767197f58c0bb32f255e2a5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 24 Jul 2023 10:41:50 +0200 Subject: [PATCH 2/2] Sliding sync subscriptions, e2ee, to_device messages --- src/api/client_server/sync.rs | 294 ++++++++++++++++++++++++++++++++-- src/service/users/mod.rs | 89 +++++++++- 2 files changed, 361 insertions(+), 22 deletions(-) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 8883c162..527625a5 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -463,7 +463,7 @@ async fn sync_helper( } for user_id in left_encrypted_users { - let still_share_encrypted_room = services() + let dont_share_encrypted_room = services() .rooms .user .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? @@ -481,7 +481,7 @@ async fn sync_helper( .all(|encrypted| !encrypted); // If the user doesn't share an encrypted room with the target anymore, we need to tell // them - if still_share_encrypted_room { + if dont_share_encrypted_room { device_list_left.insert(user_id); } } @@ -1197,6 +1197,7 @@ pub async fn sync_events_v4_route( } } + // Get sticky parameters from cache let known_rooms = services().users.update_sync_request_with_cache( sender_user.clone(), sender_device.clone(), @@ -1210,6 +1211,195 @@ pub async fn sync_events_v4_route( .filter_map(|r| r.ok()) .collect::>(); + if body.extensions.to_device.enabled.unwrap_or(false) { + services() + .users + .remove_to_device_events(&sender_user, &sender_device, since)?; + } + + let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in + let mut device_list_changes = HashSet::new(); + let mut device_list_left = HashSet::new(); + + if body.extensions.e2ee.enabled.unwrap_or(false) { + // Look for device list updates of this account + device_list_changes.extend( + services() + .users + .keys_changed(sender_user.as_ref(), since, None) + .filter_map(|r| r.ok()), + ); + + for room_id in &all_joined_rooms { + let current_shortstatehash = + if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? { + s + } else { + error!("Room {} has no state", room_id); + continue; + }; + + let since_shortstatehash = services() + .rooms + .user + .get_token_shortstatehash(&room_id, since)?; + + let since_sender_member: Option = since_shortstatehash + .and_then(|shortstatehash| { + services() + .rooms + .state_accessor + .state_get( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .transpose() + }) + .transpose()? + .and_then(|pdu| { + serde_json::from_str(pdu.content.get()) + .map_err(|_| Error::bad_database("Invalid PDU in database.")) + .ok() + }); + + let encrypted_room = services() + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")? + .is_some(); + + if let Some(since_shortstatehash) = since_shortstatehash { + // Skip if there are only timeline changes + if since_shortstatehash == current_shortstatehash { + continue; + } + + let since_encryption = services().rooms.state_accessor.state_get( + since_shortstatehash, + &StateEventType::RoomEncryption, + "", + )?; + + let joined_since_last_sync = since_sender_member + .map_or(true, |member| member.membership != MembershipState::Join); + + let new_encrypted_room = encrypted_room && since_encryption.is_none(); + if encrypted_room { + let current_state_ids = services() + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await?; + let since_state_ids = services() + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .await?; + + for (key, id) in current_state_ids { + if since_state_ids.get(&key) != Some(&id) { + let pdu = match services().rooms.timeline.get_pdu(&id)? { + Some(pdu) => pdu, + None => { + error!("Pdu in state not found: {}", id); + continue; + } + }; + if pdu.kind == TimelineEventType::RoomMember { + if let Some(state_key) = &pdu.state_key { + let user_id = + UserId::parse(state_key.clone()).map_err(|_| { + Error::bad_database("Invalid UserId in member PDU.") + })?; + + if user_id == sender_user { + continue; + } + + let new_membership = serde_json::from_str::< + RoomMemberEventContent, + >( + pdu.content.get() + ) + .map_err(|_| Error::bad_database("Invalid PDU in database."))? + .membership; + + match new_membership { + MembershipState::Join => { + // A new user joined an encrypted room + if !share_encrypted_room( + &sender_user, + &user_id, + &room_id, + )? { + device_list_changes.insert(user_id); + } + } + MembershipState::Leave => { + // Write down users that have left encrypted rooms we are in + left_encrypted_users.insert(user_id); + } + _ => {} + } + } + } + } + } + if joined_since_last_sync || new_encrypted_room { + // If the user is in a new encrypted room, give them all joined users + device_list_changes.extend( + services() + .rooms + .state_cache + .room_members(&room_id) + .flatten() + .filter(|user_id| { + // Don't send key updates from the sender to the sender + &sender_user != user_id + }) + .filter(|user_id| { + // Only send keys if the sender doesn't share an encrypted room with the target already + !share_encrypted_room(&sender_user, user_id, &room_id) + .unwrap_or(false) + }), + ); + } + } + } + // Look for device list updates in this room + device_list_changes.extend( + services() + .users + .keys_changed(room_id.as_ref(), since, None) + .filter_map(|r| r.ok()), + ); + } + for user_id in left_encrypted_users { + let dont_share_encrypted_room = services() + .rooms + .user + .get_shared_rooms(vec![sender_user.clone(), user_id.clone()])? + .filter_map(|r| r.ok()) + .filter_map(|other_room_id| { + Some( + services() + .rooms + .state_accessor + .room_state_get(&other_room_id, &StateEventType::RoomEncryption, "") + .ok()? + .is_some(), + ) + }) + .all(|encrypted| !encrypted); + // If the user doesn't share an encrypted room with the target anymore, we need to tell + // them + if dont_share_encrypted_room { + device_list_left.insert(user_id); + } + } + } + let mut lists = BTreeMap::new(); let mut todo_rooms = BTreeMap::new(); // and required state @@ -1249,7 +1439,7 @@ pub async fn sync_events_v4_route( todo_room .0 .extend(list.room_details.required_state.iter().cloned()); - todo_room.1 = todo_room.1.min(limit); + todo_room.1 = todo_room.1.max(limit); if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true) { todo_room.2 = false; @@ -1279,18 +1469,51 @@ pub async fn sync_events_v4_route( } } - for (room_id, room) in body.room_subscriptions { + let mut known_subscription_rooms = BTreeMap::new(); + for (room_id, room) in dbg!(&body.room_subscriptions) { let todo_room = todo_rooms .entry(room_id.clone()) .or_insert((BTreeSet::new(), 0, true)); let limit = room.timeline_limit.map_or(10, u64::from).min(100); todo_room.0.extend(room.required_state.iter().cloned()); - todo_room.1 = todo_room.1.min(limit); - todo_room.2 = false; + todo_room.1 = todo_room.1.max(limit); + if known_rooms + .get("subscriptions") + .and_then(|k| k.get(room_id)) + != Some(&true) + { + todo_room.2 = false; + } + known_subscription_rooms.insert(room_id.clone(), true); + } + + for r in body.unsubscribe_rooms { + known_subscription_rooms.remove(&r); + body.room_subscriptions.remove(&r); + } + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_known_rooms( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + "subscriptions".to_owned(), + known_subscription_rooms, + ); + } + + if let Some(conn_id) = &body.conn_id { + services().users.update_sync_subscriptions( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + body.room_subscriptions, + ); } let mut rooms = BTreeMap::new(); for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms { + // TODO: per-room sync tokens let (timeline_pdus, limited) = load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?; @@ -1372,12 +1595,26 @@ pub async fn sync_events_v4_route( None } }), - initial: Some(*known), + initial: Some(!known), is_dm: None, invite_state: None, unread_notifications: UnreadNotificationsCount { - highlight_count: None, - notification_count: None, + highlight_count: Some( + services() + .rooms + .user + .highlight_count(&sender_user, &room_id)? + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services() + .rooms + .user + .notification_count(&sender_user, &room_id)? + .try_into() + .expect("notification count can't go that high"), + ), }, timeline: room_events, required_state, @@ -1399,7 +1636,7 @@ pub async fn sync_events_v4_route( .unwrap_or(0) as u32) .into(), ), - num_live: None, + num_live: None, // Count events in timeline greater than global sync counter }, ); } @@ -1424,17 +1661,44 @@ pub async fn sync_events_v4_route( lists, rooms, extensions: sync_events::v4::Extensions { - to_device: None, + to_device: if body.extensions.to_device.enabled.unwrap_or(false) { + Some(sync_events::v4::ToDevice { + events: services() + .users + .get_to_device_events(&sender_user, &sender_device)?, + next_batch: next_batch.to_string(), + }) + } else { + None + }, e2ee: sync_events::v4::E2EE { device_lists: DeviceLists { - changed: Vec::new(), - left: Vec::new(), + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), }, - device_one_time_keys_count: BTreeMap::new(), + device_one_time_keys_count: services() + .users + .count_one_time_keys(&sender_user, &sender_device)?, + // Fallback keys are not yet supported device_unused_fallback_key_types: None, }, account_data: sync_events::v4::AccountData { - global: Vec::new(), + global: if body.extensions.account_data.enabled.unwrap_or(false) { + services() + .account_data + .changes_since(None, &sender_user, since)? + .into_iter() + .filter_map(|(_, v)| { + serde_json::from_str(v.json().get()) + .map_err(|_| { + Error::bad_database("Invalid account event in database.") + }) + .ok() + }) + .collect() + } else { + Vec::new() + }, rooms: BTreeMap::new(), }, receipts: sync_events::v4::Receipts { diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 63ab9b7d..786b42e7 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -11,7 +11,10 @@ use ruma::{ device::Device, error::ErrorKind, filter::FilterDefinition, - sync::sync_events::{self, v4::SyncRequestList}, + sync::sync_events::{ + self, + v4::{ExtensionsConfig, SyncRequestList}, + }, }, encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::AnyToDeviceEvent, @@ -24,7 +27,9 @@ use crate::{services, Error, Result}; pub struct SlidingSyncCache { lists: BTreeMap, + subscriptions: BTreeMap, known_rooms: BTreeMap>, + extensions: ExtensionsConfig, } pub struct Service { @@ -66,7 +71,9 @@ impl Service { .or_insert_with(|| { Arc::new(Mutex::new(SlidingSyncCache { lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), })) }), ); @@ -74,12 +81,13 @@ impl Service { drop(cache); for (list_id, list) in &mut request.lists { - if let Some(cached_list) = cached.lists.remove(list_id) { + if let Some(cached_list) = cached.lists.get(list_id) { if list.sort.is_empty() { - list.sort = cached_list.sort; + list.sort = cached_list.sort.clone(); }; if list.room_details.required_state.is_empty() { - list.room_details.required_state = cached_list.room_details.required_state; + list.room_details.required_state = + cached_list.room_details.required_state.clone(); }; list.room_details.timeline_limit = list .room_details @@ -88,8 +96,8 @@ impl Service { list.include_old_rooms = list .include_old_rooms .clone() - .or(cached_list.include_old_rooms); - match (&mut list.filters, cached_list.filters) { + .or(cached_list.include_old_rooms.clone()); + match (&mut list.filters, cached_list.filters.clone()) { (Some(list_filters), Some(cached_filters)) => { list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm); if list_filters.spaces.is_empty() { @@ -120,15 +128,80 @@ impl Service { (_, _) => {} } if list.bump_event_types.is_empty() { - list.bump_event_types = cached_list.bump_event_types; + list.bump_event_types = cached_list.bump_event_types.clone(); }; } cached.lists.insert(list_id.clone(), list.clone()); } + cached + .subscriptions + .extend(request.room_subscriptions.clone().into_iter()); + request + .room_subscriptions + .extend(cached.subscriptions.clone().into_iter()); + + request.extensions.e2ee.enabled = request + .extensions + .e2ee + .enabled + .or(cached.extensions.e2ee.enabled); + + request.extensions.to_device.enabled = request + .extensions + .to_device + .enabled + .or(cached.extensions.to_device.enabled); + + request.extensions.account_data.enabled = request + .extensions + .account_data + .enabled + .or(cached.extensions.account_data.enabled); + request.extensions.account_data.lists = request + .extensions + .account_data + .lists + .clone() + .or(cached.extensions.account_data.lists.clone()); + request.extensions.account_data.rooms = request + .extensions + .account_data + .rooms + .clone() + .or(cached.extensions.account_data.rooms.clone()); + + cached.extensions = request.extensions.clone(); + cached.known_rooms.clone() } + pub fn update_sync_subscriptions( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: String, + subscriptions: BTreeMap, + ) { + let cache = &mut self.connections.lock().unwrap(); + let cached = Arc::clone( + cache + .entry((user_id, device_id, conn_id)) + .or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + }), + ); + let cached = &mut cached.lock().unwrap(); + drop(cache); + + cached.subscriptions = subscriptions; + } + pub fn update_sync_known_rooms( &self, user_id: OwnedUserId, @@ -144,7 +217,9 @@ impl Service { .or_insert_with(|| { Arc::new(Mutex::new(SlidingSyncCache { lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), })) }), );