From 3b9cadeec2ac139f1dfe489bafe10ed244526ca8 Mon Sep 17 00:00:00 2001 From: timokoesters Date: Sat, 11 Apr 2020 20:03:22 +0200 Subject: [PATCH] feat: read receipts --- src/client_server.rs | 49 ++++++++++++++++---- src/data.rs | 107 ++++++++++++++++++++++++++++++++++++++++--- src/database.rs | 57 +++++++++++++++++++++-- src/ruma_wrapper.rs | 3 +- 4 files changed, 195 insertions(+), 21 deletions(-) diff --git a/src/client_server.rs b/src/client_server.rs index 4d810669..3c36c7c5 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -30,7 +30,10 @@ use ruma_client_api::{ }, unversioned::get_supported_versions, }; -use ruma_events::{collections::only::Event, EventType}; +use ruma_events::{ + collections::only::{Event as EduEvent, Event}, + EventType, +}; use ruma_identifiers::{RoomId, RoomIdOrAliasId, UserId}; use serde_json::json; use std::{ @@ -458,6 +461,33 @@ pub fn set_read_marker_route( body: Ruma, _room_id: String, ) -> MatrixResult { + let user_id = body.user_id.clone().expect("user is authenticated"); + // TODO: Fully read + if let Some(event) = &body.read_receipt { + let mut user_receipts = HashMap::new(); + user_receipts.insert( + user_id.clone(), + ruma_events::receipt::Receipt { + ts: Some(utils::millis_since_unix_epoch()), + }, + ); + let mut receipt_content = HashMap::new(); + receipt_content.insert( + event.clone(), + ruma_events::receipt::Receipts { + read: Some(user_receipts), + }, + ); + + data.roomlatest_update( + &user_id, + &body.room_id, + EduEvent::Receipt(ruma_events::receipt::ReceiptEvent { + content: receipt_content, + room_id: None, // None because it can be inferred + }), + ); + } MatrixResult(Ok(set_read_marker::Response)) } @@ -707,16 +737,17 @@ pub fn sync_route( let mut joined_rooms = HashMap::new(); let joined_roomids = data.rooms_joined(body.user_id.as_ref().expect("user is authenticated")); + let since = body + .since + .clone() + .and_then(|string| string.parse().ok()) + .unwrap_or(0); for room_id in joined_roomids { - let pdus = if let Some(since) = body.since.clone().and_then(|string| string.parse().ok()) { - data.pdus_since(&room_id, since) - } else { - data.pdus_all(&room_id) - }; + let pdus = { data.pdus_since(&room_id, since) }; let room_events = pdus.into_iter().map(|pdu| pdu.to_room_event()).collect(); joined_rooms.insert( - room_id.try_into().unwrap(), + room_id.clone().try_into().unwrap(), sync_events::JoinedRoom { account_data: sync_events::AccountData { events: Vec::new() }, summary: sync_events::RoomSummary { @@ -734,7 +765,9 @@ pub fn sync_route( events: room_events, }, state: sync_events::State { events: Vec::new() }, - ephemeral: sync_events::Ephemeral { events: Vec::new() }, + ephemeral: sync_events::Ephemeral { + events: data.roomlatests_since(&room_id, since), + }, }, ); } diff --git a/src/data.rs b/src/data.rs index b26b899c..6d43278b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,8 @@ use crate::{utils, Database, PduEvent}; -use ruma_events::EventType; +use ruma_events::{collections::only::Event as EduEvent, EventResult, EventType}; use ruma_federation_api::RoomV3Pdu; use ruma_identifiers::{EventId, RoomId, UserId}; +use serde_json::json; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -167,6 +168,15 @@ impl Data { user_id.to_string().as_bytes().into(), ); + self.pdu_append( + room_id.clone(), + user_id.clone(), + EventType::RoomMember, + json!({"membership": "join"}), + None, + Some(user_id.to_string()), + ); + true } @@ -187,7 +197,7 @@ impl Data { // Create the first part of the full pdu id let mut prefix = vec![b'd']; prefix.extend_from_slice(room_id.to_string().as_bytes()); - prefix.push(b'#'); // Add delimiter so we don't find rooms starting with the same id + prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id if let Some((key, _)) = self.db.pduid_pdu.get_gt(&prefix).unwrap() { if key.starts_with(&prefix) { @@ -334,14 +344,12 @@ impl Data { // The new value will need a new index. We store the last used index in 'n' // The count will go up regardless of the room_id // This is also the next_batch/since value - let count_key: Vec = vec![b'n']; - // Increment the last index and use that let index = utils::u64_from_bytes( &self .db .pduid_pdu - .update_and_fetch(&count_key, utils::increment) + .update_and_fetch(b"n", utils::increment) .unwrap() .unwrap(), ); @@ -349,7 +357,7 @@ impl Data { let mut pdu_id = vec![b'd']; pdu_id.extend_from_slice(room_id.to_string().as_bytes()); - pdu_id.push(b'#'); // Add delimiter so we don't find rooms starting with the same id + pdu_id.push(0xff); // Add delimiter so we don't find rooms starting with the same id pdu_id.extend_from_slice(&index.to_be_bytes()); self.db @@ -389,7 +397,7 @@ impl Data { // Create the first part of the full pdu id let mut prefix = vec![b'd']; prefix.extend_from_slice(room_id.to_string().as_bytes()); - prefix.push(b'#'); // Add delimiter so we don't find rooms starting with the same id + prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id let mut current = prefix.clone(); current.extend_from_slice(&since.to_be_bytes()); @@ -406,6 +414,91 @@ impl Data { pdus } + pub fn roomlatest_update(&self, user_id: &UserId, room_id: &RoomId, event: EduEvent) { + let mut prefix = vec![b'd']; + prefix.extend_from_slice(room_id.to_string().as_bytes()); + prefix.push(0xff); + + // Start with last + if let Some(mut current) = self + .db + .roomlatestid_roomlatest + .scan_prefix(&prefix) + .keys() + .next_back() + .map(|c| c.unwrap()) + { + // Remove old marker (There should at most one) + loop { + if !current.starts_with(&prefix) { + // We're in another room + break; + } + if current.rsplitn(2, |&b| b == 0xff).next().unwrap() + == user_id.to_string().as_bytes() + { + // This is the old room_latest + self.db.roomlatestid_roomlatest.remove(current).unwrap(); + break; + } + // Else, try the event before that + if let Some((k, _)) = self.db.roomlatestid_roomlatest.get_lt(current).unwrap() { + current = k; + } else { + break; + } + } + } + + // Increment the last index and use that + let index = utils::u64_from_bytes( + &self + .db + .pduid_pdu + .update_and_fetch(b"n", utils::increment) + .unwrap() + .unwrap(), + ); + + let mut room_latest_id = prefix; + room_latest_id.extend_from_slice(&index.to_be_bytes()); + room_latest_id.push(0xff); + room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); + + self.db + .roomlatestid_roomlatest + .insert(room_latest_id, &*serde_json::to_string(&event).unwrap()) + .unwrap(); + } + + /// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`. + pub fn roomlatests_since(&self, room_id: &RoomId, since: u64) -> Vec { + let mut room_latests = Vec::new(); + + let mut prefix = vec![b'd']; + prefix.extend_from_slice(room_id.to_string().as_bytes()); + prefix.push(0xff); + + let mut current = prefix.clone(); + current.extend_from_slice(&since.to_be_bytes()); + + while let Some((key, value)) = self.db.roomlatestid_roomlatest.get_gt(¤t).unwrap() { + if key.starts_with(&prefix) { + current = key.to_vec(); + room_latests.push( + serde_json::from_slice::>(&value) + .expect("room_latest in db is valid") + .into_result() + .expect("room_latest in db is valid"), + ); + } else { + break; + } + } + + room_latests + } + pub fn debug(&self) { self.db.debug(); } diff --git a/src/database.rs b/src/database.rs index 7a308614..18de2224 100644 --- a/src/database.rs +++ b/src/database.rs @@ -57,18 +57,23 @@ pub struct Database { pub userid_avatarurl: sled::Tree, pub deviceid_token: sled::Tree, pub token_userid: sled::Tree, - pub pduid_pdu: sled::Tree, // PduId = RoomId + Since + pub pduid_pdu: sled::Tree, // PduId = 'd' + RoomId + Since (global since counter is at 'n') pub eventid_pduid: sled::Tree, pub roomid_pduleaves: MultiValue, pub roomid_userids: MultiValue, pub userid_roomids: MultiValue, + // EDUs: + pub roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Since + UserId TODO: Types + pub timeofremoval_roomrelevants: MultiValue, // Typing + pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Since + pub globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Since + Type + UserId _db: sled::Db, } impl Database { /// Tries to remove the old database but ignores all errors. pub fn try_remove(hostname: &str) { - let mut path = ProjectDirs::from("xyz", "koesters", "matrixserver") + let mut path = ProjectDirs::from("xyz", "koesters", "conduit") .unwrap() .data_dir() .to_path_buf(); @@ -78,7 +83,7 @@ impl Database { /// Load an existing database or create a new one. pub fn load_or_create(hostname: &str) -> Self { - let mut path = ProjectDirs::from("xyz", "koesters", "matrixserver") + let mut path = ProjectDirs::from("xyz", "koesters", "conduit") .unwrap() .data_dir() .to_path_buf(); @@ -97,6 +102,12 @@ impl Database { roomid_pduleaves: MultiValue(db.open_tree("roomid_pduleaves").unwrap()), roomid_userids: MultiValue(db.open_tree("roomid_userids").unwrap()), userid_roomids: MultiValue(db.open_tree("userid_roomids").unwrap()), + roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest").unwrap(), + timeofremoval_roomrelevants: MultiValue( + db.open_tree("timeofremoval_roomrelevants").unwrap(), + ), + globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(), + globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(), _db: db, } } @@ -118,7 +129,7 @@ impl Database { String::from_utf8_lossy(&v), ); } - println!("# UserId -> Displayname:"); + println!("\n# UserId -> Displayname:"); for (k, v) in self.userid_displayname.iter().map(|r| r.unwrap()) { println!( "{:?} -> {:?}", @@ -126,7 +137,7 @@ impl Database { String::from_utf8_lossy(&v), ); } - println!("# UserId -> AvatarURL:"); + println!("\n# UserId -> AvatarURL:"); for (k, v) in self.userid_avatarurl.iter().map(|r| r.unwrap()) { println!( "{:?} -> {:?}", @@ -190,5 +201,41 @@ impl Database { String::from_utf8_lossy(&v), ); } + println!("\n# RoomLatestId -> RoomLatest"); + for (k, v) in self.roomlatestid_roomlatest.iter().map(|r| r.unwrap()) { + println!( + "{:?} -> {:?}", + String::from_utf8_lossy(&k), + String::from_utf8_lossy(&v), + ); + } + println!("\n# TimeOfRemoval -> RoomRelevants Id:"); + for (k, v) in self + .timeofremoval_roomrelevants + .iter_all() + .map(|r| r.unwrap()) + { + println!( + "{:?} -> {:?}", + String::from_utf8_lossy(&k), + String::from_utf8_lossy(&v), + ); + } + println!("\n# GlobalAllId -> GlobalAll:"); + for (k, v) in self.globalallid_globalall.iter().map(|r| r.unwrap()) { + println!( + "{:?} -> {:?}", + String::from_utf8_lossy(&k), + String::from_utf8_lossy(&v), + ); + } + println!("\n# GlobalLatestId -> GlobalLatest:"); + for (k, v) in self.globallatestid_globallatest.iter().map(|r| r.unwrap()) { + println!( + "{:?} -> {:?}", + String::from_utf8_lossy(&k), + String::from_utf8_lossy(&v), + ); + } } } diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index f39ef752..81b7ea8e 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -1,3 +1,4 @@ +use log::warn; use rocket::{ data::{Data, FromData, FromDataFuture, Transform, TransformFuture, Transformed}, http::Status, @@ -106,7 +107,7 @@ where }, }), Err(e) => { - log::error!("{:?}", e); + warn!("{:?}", e); Failure((Status::InternalServerError, ())) } }