diff --git a/src/client_server.rs b/src/client_server.rs index 09164db4..d740ab50 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -12,7 +12,6 @@ use ruma_client_api::{ account::{get_username_availability, register}, alias::get_alias, capabilities::get_capabilities, - to_device::send_event_to_device, config::{get_global_account_data, set_global_account_data}, directory::{self, get_public_rooms_filtered}, filter::{self, create_filter, get_filter}, @@ -34,13 +33,14 @@ use ruma_client_api::{ state::{create_state_event_for_empty_key, create_state_event_for_key}, sync::sync_events, thirdparty::get_protocols, + to_device::send_event_to_device, typing::create_typing_event, uiaa::{AuthFlow, UiaaInfo, UiaaResponse}, user_directory::search_users, }, unversioned::get_supported_versions, }; -use ruma_events::{collections::only::Event as EduEvent, EventType}; +use ruma_events::{collections::only::Event as EduEvent, EventJson, EventType}; use ruma_identifiers::{RoomId, UserId}; use serde_json::{json, value::RawValue}; @@ -165,45 +165,45 @@ pub fn register_route( let token = utils::random_string(TOKEN_LENGTH); // Add device - db - .users + db.users .create_device(&user_id, &device_id, &token) .unwrap(); // Initial data - db.account_data.update( - None, - &user_id, - EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { - content: ruma_events::push_rules::PushRulesEventContent { - global: ruma_events::push_rules::Ruleset { - content: vec![], - override_rules: vec![], - room: vec![], - sender: vec![], - underride: vec![ruma_events::push_rules::ConditionalPushRule { - actions: vec![ - ruma_events::push_rules::Action::Notify, - ruma_events::push_rules::Action::SetTweak( - ruma_common::push::Tweak::Highlight(false), - ), - ], - default: true, - enabled: true, - rule_id: ".m.rule.message".to_owned(), - conditions: vec![ruma_events::push_rules::PushCondition::EventMatch( - ruma_events::push_rules::EventMatchCondition { - key: "type".to_owned(), - pattern: "m.room.message".to_owned(), - }, - )], - }], + db.account_data + .update( + None, + &user_id, + EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { + content: ruma_events::push_rules::PushRulesEventContent { + global: ruma_events::push_rules::Ruleset { + content: vec![], + override_rules: vec![], + room: vec![], + sender: vec![], + underride: vec![ruma_events::push_rules::ConditionalPushRule { + actions: vec![ + ruma_events::push_rules::Action::Notify, + ruma_events::push_rules::Action::SetTweak( + ruma_common::push::Tweak::Highlight(false), + ), + ], + default: true, + enabled: true, + rule_id: ".m.rule.message".to_owned(), + conditions: vec![ruma_events::push_rules::PushCondition::EventMatch( + ruma_events::push_rules::EventMatchCondition { + key: "type".to_owned(), + pattern: "m.room.message".to_owned(), + }, + )], + }], + }, }, - }, - }), - &db.globals, - ) - .unwrap(); + }), + &db.globals, + ) + .unwrap(); MatrixResult(Ok(register::Response { access_token: Some(token), @@ -220,7 +220,10 @@ pub fn get_login_route() -> MatrixResult { } #[post("/_matrix/client/r0/login", data = "")] -pub fn login_route(db: State<'_, Database>, body: Ruma) -> MatrixResult { +pub fn login_route( + db: State<'_, Database>, + body: Ruma, +) -> MatrixResult { // Validate login method let user_id = if let (login::UserInfo::MatrixId(mut username), login::LoginInfo::Password { password }) = @@ -280,8 +283,7 @@ pub fn login_route(db: State<'_, Database>, body: Ruma) -> Matri let token = utils::random_string(TOKEN_LENGTH); // Add device - db - .users + db.users .create_device(&user_id, &device_id, &token) .unwrap(); @@ -318,7 +320,7 @@ pub fn get_pushrules_all_route() -> MatrixResult { vec![push::PushRule { actions: vec![ push::Action::Notify, - push::Action::SetTweak(ruma_common::push::Tweak::Highlight(false)) + push::Action::SetTweak(ruma_common::push::Tweak::Highlight(false)), ], default: true, enabled: true, @@ -346,39 +348,40 @@ pub fn set_pushrule_route( ) -> MatrixResult { // TODO let user_id = body.user_id.clone().expect("user is authenticated"); - db.account_data.update( - None, - &user_id, - EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { - content: ruma_events::push_rules::PushRulesEventContent { - global: ruma_events::push_rules::Ruleset { - content: vec![], - override_rules: vec![], - room: vec![], - sender: vec![], - underride: vec![ruma_events::push_rules::ConditionalPushRule { - actions: vec![ - ruma_events::push_rules::Action::Notify, - ruma_events::push_rules::Action::SetTweak( - ruma_common::push::Tweak::Highlight(false), - ), - ], - default: true, - enabled: true, - rule_id: ".m.rule.message".to_owned(), - conditions: vec![ruma_events::push_rules::PushCondition::EventMatch( - ruma_events::push_rules::EventMatchCondition { - key: "type".to_owned(), - pattern: "m.room.message".to_owned(), - }, - )], - }], + db.account_data + .update( + None, + &user_id, + EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { + content: ruma_events::push_rules::PushRulesEventContent { + global: ruma_events::push_rules::Ruleset { + content: vec![], + override_rules: vec![], + room: vec![], + sender: vec![], + underride: vec![ruma_events::push_rules::ConditionalPushRule { + actions: vec![ + ruma_events::push_rules::Action::Notify, + ruma_events::push_rules::Action::SetTweak( + ruma_common::push::Tweak::Highlight(false), + ), + ], + default: true, + enabled: true, + rule_id: ".m.rule.message".to_owned(), + conditions: vec![ruma_events::push_rules::PushCondition::EventMatch( + ruma_events::push_rules::EventMatchCondition { + key: "type".to_owned(), + pattern: "m.room.message".to_owned(), + }, + )], + }], + }, }, - }, - }), - &db.globals - ) - .unwrap(); + }), + &db.globals, + ) + .unwrap(); MatrixResult(Ok(set_pushrule::Response)) } @@ -393,9 +396,7 @@ pub fn set_pushrule_enabled_route( MatrixResult(Ok(set_pushrule_enabled::Response)) } -#[get( - "/_matrix/client/r0/user/<_user_id>/filter/<_filter_id>", -)] +#[get("/_matrix/client/r0/user/<_user_id>/filter/<_filter_id>")] pub fn get_filter_route( _user_id: String, _filter_id: String, @@ -413,18 +414,14 @@ pub fn get_filter_route( } #[post("/_matrix/client/r0/user/<_user_id>/filter")] -pub fn create_filter_route( - _user_id: String, -) -> MatrixResult { +pub fn create_filter_route(_user_id: String) -> MatrixResult { // TODO MatrixResult(Ok(create_filter::Response { filter_id: utils::random_string(10), })) } -#[put( - "/_matrix/client/r0/user/<_user_id>/account_data/<_type>", -)] +#[put("/_matrix/client/r0/user/<_user_id>/account_data/<_type>")] pub fn set_global_account_data_route( _user_id: String, _type: String, @@ -432,9 +429,7 @@ pub fn set_global_account_data_route( MatrixResult(Ok(set_global_account_data::Response)) } -#[get( - "/_matrix/client/r0/user/<_user_id>/account_data/<_type>", -)] +#[get("/_matrix/client/r0/user/<_user_id>/account_data/<_type>")] pub fn get_global_account_data_route( _user_id: String, _type: String, @@ -460,25 +455,44 @@ pub fn set_displayname_route( if displayname == "" { db.users.set_displayname(&user_id, None).unwrap(); } else { - db - .users + db.users .set_displayname(&user_id, Some(displayname.clone())) .unwrap(); } // Send a new membership event into all joined rooms for room_id in db.rooms.rooms_joined(&user_id) { - db.rooms.append_pdu( - room_id.unwrap(), - user_id.clone(), - EventType::RoomMember, - json!({"membership": "join", "displayname": displayname}), - None, - Some(user_id.to_string()), - &db.globals - ).unwrap(); + db.rooms + .append_pdu( + room_id.unwrap(), + user_id.clone(), + EventType::RoomMember, + json!({"membership": "join", "displayname": displayname}), + None, + Some(user_id.to_string()), + &db.globals, + ) + .unwrap(); } - // TODO: send a new m.presence event + + // Presence update + db.global_edus + .update_globallatest( + &user_id, + EduEvent::Presence(ruma_events::presence::PresenceEvent { + content: ruma_events::presence::PresenceEventContent { + avatar_url: db.users.avatar_url(&user_id).unwrap(), + currently_active: None, + displayname: db.users.displayname(&user_id).unwrap(), + last_active_ago: Some(utils::millis_since_unix_epoch().try_into().unwrap()), + presence: ruma_events::presence::PresenceState::Online, + status_msg: None, + }, + sender: user_id.clone(), + }), + &db.globals, + ) + .unwrap(); } else { // Send error on None // Synapse returns a parsing error but the spec doesn't require this @@ -542,8 +556,7 @@ pub fn set_avatar_url_route( if body.avatar_url == "" { db.users.set_avatar_url(&user_id, None).unwrap(); } else { - db - .users + db.users .set_avatar_url(&user_id, Some(body.avatar_url.clone())) .unwrap(); // TODO send a new m.room.member join event with the updated avatar_url @@ -605,11 +618,32 @@ pub fn get_profile_route( })) } -#[put("/_matrix/client/r0/presence/<_user_id>/status")] +#[put("/_matrix/client/r0/presence/<_user_id>/status", data = "")] pub fn set_presence_route( + db: State<'_, Database>, + body: Ruma, _user_id: String, ) -> MatrixResult { - // TODO + let user_id = body.user_id.clone().expect("user is authenticated"); + + db.global_edus + .update_globallatest( + &user_id, + EduEvent::Presence(ruma_events::presence::PresenceEvent { + content: ruma_events::presence::PresenceEventContent { + avatar_url: db.users.avatar_url(&user_id).unwrap(), + currently_active: None, + displayname: db.users.displayname(&user_id).unwrap(), + last_active_ago: Some(utils::millis_since_unix_epoch().try_into().unwrap()), + presence: body.presence, + status_msg: body.status_msg.clone(), + }, + sender: user_id.clone(), + }), + &db.globals, + ) + .unwrap(); + MatrixResult(Ok(set_presence::Response)) } @@ -637,28 +671,27 @@ pub fn set_read_marker_route( _room_id: String, ) -> MatrixResult { let user_id = body.user_id.clone().expect("user is authenticated"); - db.account_data.update( - Some(&body.room_id), - &user_id, - EduEvent::FullyRead(ruma_events::fully_read::FullyReadEvent { - content: ruma_events::fully_read::FullyReadEventContent { - event_id: body.fully_read.clone(), - }, - room_id: Some(body.room_id.clone()), - }), - &db.globals - ) - .unwrap(); + db.account_data + .update( + Some(&body.room_id), + &user_id, + EduEvent::FullyRead(ruma_events::fully_read::FullyReadEvent { + content: ruma_events::fully_read::FullyReadEventContent { + event_id: body.fully_read.clone(), + }, + room_id: Some(body.room_id.clone()), + }), + &db.globals, + ) + .unwrap(); if let Some(event) = &body.read_receipt { - db - .rooms + db.rooms .edus .room_read_set( &body.room_id, &user_id, - db - .rooms + db.rooms .get_pdu_count(event) .unwrap() .expect("TODO: what if a client specifies an invalid event"), @@ -680,8 +713,7 @@ pub fn set_read_marker_route( }, ); - db - .rooms + db.rooms .edus .roomlatest_update( &user_id, @@ -716,8 +748,7 @@ pub fn create_typing_event_route( }); if body.typing { - db - .rooms + db.rooms .edus .roomactive_add( edu, @@ -728,11 +759,7 @@ pub fn create_typing_event_route( ) .unwrap(); } else { - db - .rooms - .edus - .roomactive_remove(edu, &body.room_id) - .unwrap(); + db.rooms.edus.roomactive_remove(edu, &body.room_id).unwrap(); } MatrixResult(Ok(create_typing_event::Response)) @@ -747,8 +774,7 @@ pub fn create_room_route( let room_id = RoomId::try_from(db.globals.hostname()).expect("host is valid"); let user_id = body.user_id.clone().expect("user is authenticated"); - db - .rooms + db.rooms .append_pdu( room_id.clone(), user_id.clone(), @@ -760,8 +786,7 @@ pub fn create_room_route( ) .unwrap(); - db - .rooms + db.rooms .join( &room_id, &user_id, @@ -770,8 +795,7 @@ pub fn create_room_route( ) .unwrap(); - db - .rooms + db.rooms .append_pdu( room_id.clone(), user_id.clone(), @@ -793,8 +817,7 @@ pub fn create_room_route( .unwrap(); if let Some(name) = &body.name { - db - .rooms + db.rooms .append_pdu( room_id.clone(), user_id.clone(), @@ -808,8 +831,7 @@ pub fn create_room_route( } if let Some(topic) = &body.topic { - db - .rooms + db.rooms .append_pdu( room_id.clone(), user_id.clone(), @@ -823,8 +845,7 @@ pub fn create_room_route( } for user in &body.invite { - db - .rooms + db.rooms .invite(&user_id, &room_id, user, &db.globals) .unwrap(); } @@ -945,8 +966,7 @@ pub fn leave_room_route( _room_id: String, ) -> MatrixResult { let user_id = body.user_id.clone().expect("user is authenticated"); - db - .rooms + db.rooms .leave(&user_id, &body.room_id, &user_id, &db.globals) .unwrap(); MatrixResult(Ok(leave_room::Response)) @@ -970,8 +990,7 @@ pub fn invite_user_route( _room_id: String, ) -> MatrixResult { if let invite_user::InvitationRecipient::UserId { user_id } = &body.recipient { - db - .rooms + db.rooms .invite( &body.user_id.as_ref().expect("user is authenticated"), &body.room_id, @@ -1069,16 +1088,13 @@ pub fn search_users_route( } #[get("/_matrix/client/r0/rooms/<_room_id>/members")] -pub fn get_member_events_route( - _room_id: String, -) -> MatrixResult { +pub fn get_member_events_route(_room_id: String) -> MatrixResult { // TODO MatrixResult(Ok(get_member_events::Response { chunk: Vec::new() })) } #[get("/_matrix/client/r0/thirdparty/protocols")] -pub fn get_protocols_route( -) -> MatrixResult { +pub fn get_protocols_route() -> MatrixResult { // TODO MatrixResult(Ok(get_protocols::Response { protocols: BTreeMap::new(), @@ -1114,7 +1130,9 @@ pub fn create_message_event_route( ) .expect("message events are always okay"); - MatrixResult(Ok(create_message_event::Response { event_id: Some(event_id) })) + MatrixResult(Ok(create_message_event::Response { + event_id: Some(event_id), + })) } #[put( @@ -1144,7 +1162,9 @@ pub fn create_state_event_for_key_route( ) .unwrap(); - MatrixResult(Ok(create_state_event_for_key::Response { event_id: Some(event_id) })) + MatrixResult(Ok(create_state_event_for_key::Response { + event_id: Some(event_id), + })) } #[put( @@ -1173,7 +1193,9 @@ pub fn create_state_event_for_empty_key_route( ) .unwrap(); - MatrixResult(Ok(create_state_event_for_empty_key::Response { event_id: Some(event_id) })) + MatrixResult(Ok(create_state_event_for_empty_key::Response { + event_id: Some(event_id), + })) } #[get("/_matrix/client/r0/sync", data = "")] @@ -1197,7 +1219,8 @@ pub fn sync_route( let mut pdus = db .rooms - .pdus_since(&room_id, since).unwrap() + .pdus_since(&room_id, since) + .unwrap() .map(|r| r.unwrap()) .collect::>(); @@ -1213,16 +1236,12 @@ pub fn sync_route( } } - let notification_count = if let Some(last_read) = db - .rooms - .edus - .room_read_get(&room_id, &user_id) - .unwrap() - { - Some((db.rooms.pdus_since(&room_id, last_read).unwrap().count() as u32).into()) - } else { - None - }; + let notification_count = + if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &user_id).unwrap() { + Some((db.rooms.pdus_since(&room_id, last_read).unwrap().count() as u32).into()) + } else { + None + }; // They /sync response doesn't always return all messages, so we say the output is // limited unless there are enough events @@ -1262,10 +1281,10 @@ pub fn sync_route( } edus.extend( - db - .rooms + db.rooms .edus - .roomlatests_since(&room_id, since).unwrap() + .roomlatests_since(&room_id, since) + .unwrap() .map(|r| r.unwrap()), ); @@ -1273,8 +1292,10 @@ pub fn sync_route( room_id.clone().try_into().unwrap(), sync_events::JoinedRoom { account_data: Some(sync_events::AccountData { - events: db.account_data - .changes_since(Some(&room_id), &user_id, since).unwrap() + events: db + .account_data + .changes_since(Some(&room_id), &user_id, since) + .unwrap() .into_iter() .map(|(_, v)| v) .collect(), @@ -1304,8 +1325,7 @@ pub fn sync_route( // TODO: state before timeline state: sync_events::State { events: if send_full_state { - db - .rooms + db.rooms .room_state(&room_id) .unwrap() .into_iter() @@ -1332,17 +1352,12 @@ pub fn sync_route( let mut edus = db .rooms .edus - .roomlatests_since(&room_id, since).unwrap() + .roomlatests_since(&room_id, since) + .unwrap() .map(|r| r.unwrap()) .collect::>(); - edus.extend( - db - .rooms - .edus - .roomactives_all(&room_id) - .map(|r| r.unwrap()), - ); + edus.extend(db.rooms.edus.roomactives_all(&room_id).map(|r| r.unwrap())); left_rooms.insert( room_id.clone().try_into().unwrap(), @@ -1363,7 +1378,8 @@ pub fn sync_route( let room_id = room_id.unwrap(); let events = db .rooms - .pdus_since(&room_id, since).unwrap() + .pdus_since(&room_id, since) + .unwrap() .into_iter() .map(|pdu| pdu.unwrap().to_stripped_state_event()) .collect(); @@ -1383,10 +1399,23 @@ pub fn sync_route( join: joined_rooms, invite: invited_rooms, }, - presence: sync_events::Presence { events: Vec::new() }, + presence: sync_events::Presence { + events: db + .global_edus + .globallatests_since(since) + .unwrap() + .map(|edu| { + EventJson::::from( + edu.unwrap().json().to_owned(), + ) + }) + .collect(), + }, account_data: sync_events::AccountData { - events: db.account_data - .changes_since(None, &user_id, since).unwrap() + events: db + .account_data + .changes_since(None, &user_id, since) + .unwrap() .into_iter() .map(|(_, v)| v) .collect(), diff --git a/src/database.rs b/src/database.rs index 76109c99..3b8f9279 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,4 +1,5 @@ pub(self) mod account_data; +pub(self) mod global_edus; pub(self) mod globals; pub(self) mod rooms; pub(self) mod users; @@ -11,8 +12,7 @@ pub struct Database { pub users: users::Users, pub rooms: rooms::Rooms, pub account_data: account_data::AccountData, - //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count - //pub globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Count + Type + UserId + pub global_edus: global_edus::GlobalEdus, pub _db: sled::Db, } @@ -66,8 +66,10 @@ impl Database { account_data: account_data::AccountData { roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata").unwrap(), }, - //globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(), - //globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(), + global_edus: global_edus::GlobalEdus { + //globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(), + globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(), // Presence + }, _db: db, } } diff --git a/src/database/global_edus.rs b/src/database/global_edus.rs new file mode 100644 index 00000000..db446749 --- /dev/null +++ b/src/database/global_edus.rs @@ -0,0 +1,68 @@ +use crate::Result; +use ruma_events::{collections::only::Event as EduEvent, EventJson}; +use ruma_identifiers::UserId; + +pub struct GlobalEdus { + pub(super) globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Count + UserId + //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count +} + +impl GlobalEdus { + /// Adds a global event which will be saved until a new event replaces it (e.g. presence updates). + pub fn update_globallatest( + &self, + user_id: &UserId, + event: EduEvent, + globals: &super::globals::Globals, + ) -> Result<()> { + // Remove old entry + if let Some(old) = self + .globallatestid_globallatest + .iter() + .keys() + .rev() + .filter_map(|r| r.ok()) + .find(|key| { + key.rsplit(|&b| b == 0xff).next().unwrap() == user_id.to_string().as_bytes() + }) + { + // This is the old global_latest + self.globallatestid_globallatest.remove(old)?; + } + + let mut global_latest_id = globals.next_count()?.to_be_bytes().to_vec(); + global_latest_id.push(0xff); + global_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); + + self.globallatestid_globallatest + .insert(global_latest_id, &*serde_json::to_string(&event)?)?; + + Ok(()) + } + + /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. + pub fn globallatests_since( + &self, + since: u64, + ) -> Result>>> { + let first_possible_edu = since.to_be_bytes().to_vec(); + + Ok(self + .globallatestid_globallatest + .range(&*first_possible_edu..) + // Skip the first pdu if it's exactly at since, because we sent that last time + .skip( + if self + .globallatestid_globallatest + .get(first_possible_edu)? + .is_some() + { + 1 + } else { + 0 + }, + ) + .filter_map(|r| r.ok()) + .map(|(_, v)| Ok(serde_json::from_slice(&v)?))) + } +} diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index f2db5a44..a2ade552 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -79,14 +79,6 @@ impl RoomEdus { .map(|(_, v)| Ok(serde_json::from_slice(&v)?))) } - /// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`. - pub fn roomlatests_all( - &self, - room_id: &RoomId, - ) -> Result>>> { - self.roomlatests_since(room_id, 0) - } - /// Adds an event that will be saved until the `timeout` timestamp (e.g. typing notifications). pub fn roomactive_add( &self, diff --git a/src/server_server.rs b/src/server_server.rs index bb43957f..f77699e4 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -59,7 +59,12 @@ pub async fn send_request( request_map.insert("destination".to_owned(), destination.into()); let mut request_json = request_map.into(); - ruma_signatures::sign_json(db.globals.hostname(), db.globals.keypair(), &mut request_json).unwrap(); + ruma_signatures::sign_json( + db.globals.hostname(), + db.globals.keypair(), + &mut request_json, + ) + .unwrap(); let signatures = request_json["signatures"] .as_object() @@ -85,7 +90,11 @@ pub async fn send_request( ); } - let reqwest_response = db.globals.reqwest_client().execute(http_request.into()).await; + let reqwest_response = db + .globals + .reqwest_client() + .execute(http_request.into()) + .await; // Because reqwest::Response -> http::Response is complicated: match reqwest_response {