diff --git a/Cargo.lock b/Cargo.lock index c4c6419a..ced79c6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,6 +505,7 @@ dependencies = [ "ruma-federation-api", "ruma-identifiers", "ruma-signatures", + "serde", "serde_json", "sled", ] diff --git a/Cargo.toml b/Cargo.toml index b76a9c94..d2597071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,4 @@ js_int = "0.1.4" serde_json = "1.0.50" ruma-signatures = { git = "https://github.com/ruma/ruma-signatures.git" } ruma-federation-api = "0.0.1" +serde = "1.0.105" diff --git a/src/data.rs b/src/data.rs index 28b8d05f..f0917ff4 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,9 +1,12 @@ -use crate::{utils, Database}; +use crate::{utils, Database, PduEvent}; use log::debug; -use ruma_events::collections::all::Event; +use ruma_events::{room::message::MessageEvent, EventType}; use ruma_federation_api::RoomV3Pdu; use ruma_identifiers::{EventId, RoomId, UserId}; -use std::convert::{TryFrom, TryInto}; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, +}; pub struct Data { hostname: String, @@ -145,7 +148,7 @@ impl Data { } /// Add a persisted data unit from this homeserver - pub fn pdu_append(&self, event_id: &EventId, room_id: &RoomId, event: Event) { + pub fn pdu_append_message(&self, event_id: &EventId, room_id: &RoomId, event: MessageEvent) { // prev_events are the leaves of the current graph. This method removes all leaves from the // room and replaces them with our event let prev_events = self.pdu_leaves_replace(room_id, event_id); @@ -163,25 +166,25 @@ impl Data { .unwrap_or(0_u64) + 1; - let mut pdu_value = serde_json::to_value(&event).expect("message event can be serialized"); - let pdu = pdu_value.as_object_mut().unwrap(); - - pdu.insert( - "prev_events".to_owned(), - prev_events - .iter() - .map(|id| id.to_string()) - .collect::>() - .into(), - ); - pdu.insert("origin".to_owned(), self.hostname().into()); - pdu.insert("depth".to_owned(), depth.into()); - pdu.insert("auth_events".to_owned(), vec!["$auth_eventid"].into()); // TODO - pdu.insert( - "hashes".to_owned(), - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(), - ); // TODO - pdu.insert("signatures".to_owned(), "signature".into()); // TODO + let pdu = PduEvent { + event_id: event_id.clone(), + room_id: room_id.clone(), + sender: event.sender, + origin: self.hostname.clone(), + origin_server_ts: event.origin_server_ts, + kind: EventType::RoomMessage, + content: serde_json::to_value(event.content).unwrap(), + state_key: None, + prev_events, + depth: depth.try_into().unwrap(), + auth_events: Vec::new(), + redacts: None, + unsigned: Default::default(), + hashes: ruma_federation_api::EventHash { + sha256: "aaa".to_owned(), + }, + signatures: HashMap::new(), + }; // The new value will need a new index. We store the last used index in 'n' + id let mut count_key: Vec = vec![b'n']; @@ -205,7 +208,7 @@ impl Data { self.db .pduid_pdus - .insert(&pdu_id, dbg!(&*serde_json::to_string(&pdu).unwrap())) + .insert(&pdu_id, &*serde_json::to_string(&pdu).unwrap()) .unwrap(); self.db @@ -215,7 +218,7 @@ impl Data { } /// Returns a vector of all PDUs. - pub fn pdus_all(&self) -> Vec { + pub fn pdus_all(&self) -> Vec { self.pdus_since( self.db .eventid_pduid @@ -229,7 +232,7 @@ impl Data { } /// Returns a vector of all events that happened after the event with id `since`. - pub fn pdus_since(&self, since: String) -> Vec { + pub fn pdus_since(&self, since: String) -> Vec { let mut pdus = Vec::new(); if let Some(room_id) = since.rsplitn(2, '#').nth(1) { diff --git a/src/main.rs b/src/main.rs index 8af476d0..44ff413b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,15 @@ #![feature(proc_macro_hygiene, decl_macro)] mod data; mod database; +mod pdu; mod ruma_wrapper; mod utils; pub use data::Data; pub use database::Database; +pub use pdu::PduEvent; -use log::debug; +use log::{debug, error}; use rocket::{get, options, post, put, routes, State}; use ruma_client_api::{ error::{Error, ErrorKind}, @@ -17,7 +19,7 @@ use ruma_client_api::{ }, unversioned::get_supported_versions, }; -use ruma_events::{collections::all::Event, room::message::MessageEvent}; +use ruma_events::{collections::all::RoomEvent, room::message::MessageEvent, EventResult}; use ruma_identifiers::{EventId, UserId}; use ruma_wrapper::{MatrixResult, Ruma}; use serde_json::map::Map; @@ -212,7 +214,7 @@ fn create_message_event_route( body: Ruma, ) -> MatrixResult { // Construct event - let mut event = Event::RoomMessage(MessageEvent { + let mut event = RoomEvent::RoomMessage(MessageEvent { content: body.data.clone().into_result().unwrap(), event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(), origin_server_ts: utils::millis_since_unix_epoch(), @@ -230,13 +232,13 @@ fn create_message_event_route( .expect("ruma's reference hashes are correct"); // Insert event id - if let Event::RoomMessage(message) = &mut event { + if let RoomEvent::RoomMessage(message) = &mut event { message.event_id = event_id.clone(); + data.pdu_append_message(&event_id, &body.room_id, message.clone()); + } else { + error!("only roommessages are handled currently"); } - // Add PDU to the graph - data.pdu_append(&event_id, &body.room_id, event); - MatrixResult(Ok(create_message_event::Response { event_id })) } @@ -245,30 +247,38 @@ fn sync_route( data: State, body: Ruma, ) -> MatrixResult { - let pdus = data.pdus_all(); let mut joined_rooms = HashMap::new(); - joined_rooms.insert( - "!roomid:localhost".try_into().unwrap(), - sync_events::JoinedRoom { - account_data: sync_events::AccountData { events: Vec::new() }, - summary: sync_events::RoomSummary { - heroes: Vec::new(), - joined_member_count: None, - invited_member_count: None, - }, - unread_notifications: sync_events::UnreadNotificationsCount { - highlight_count: None, - notification_count: None, - }, - timeline: sync_events::Timeline { - limited: None, - prev_batch: None, - events: todo!(), + { + let pdus = data.pdus_all(); + let mut room_events = Vec::new(); + + for pdu in pdus { + room_events.push(pdu.to_room_event()); + } + + joined_rooms.insert( + "!roomid:localhost".try_into().unwrap(), + sync_events::JoinedRoom { + account_data: sync_events::AccountData { events: Vec::new() }, + summary: sync_events::RoomSummary { + heroes: Vec::new(), + joined_member_count: None, + invited_member_count: None, + }, + unread_notifications: sync_events::UnreadNotificationsCount { + highlight_count: None, + notification_count: None, + }, + timeline: sync_events::Timeline { + limited: None, + prev_batch: None, + events: room_events, + }, + state: sync_events::State { events: Vec::new() }, + ephemeral: sync_events::Ephemeral { events: Vec::new() }, }, - state: sync_events::State { events: Vec::new() }, - ephemeral: sync_events::Ephemeral { events: Vec::new() }, - }, - ); + ); + } MatrixResult(Ok(sync_events::Response { next_batch: String::new(), diff --git a/src/pdu.rs b/src/pdu.rs new file mode 100644 index 00000000..588242b7 --- /dev/null +++ b/src/pdu.rs @@ -0,0 +1,42 @@ +use js_int::UInt; +use ruma_events::{collections::all::RoomEvent, EventResult, EventType}; +use ruma_federation_api::EventHash; +use ruma_identifiers::{EventId, RoomId, UserId}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Deserialize, Serialize)] +pub struct PduEvent { + pub event_id: EventId, + pub room_id: RoomId, + pub sender: UserId, + pub origin: String, + pub origin_server_ts: UInt, + #[serde(rename = "type")] + pub kind: EventType, + pub content: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + pub state_key: Option, + pub prev_events: Vec, + pub depth: UInt, + pub auth_events: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub redacts: Option, + #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] + pub unsigned: serde_json::Map, + pub hashes: EventHash, + pub signatures: HashMap>, +} + +impl PduEvent { + pub fn to_room_event(&self) -> RoomEvent { + // Can only fail in rare circumstances that won't ever happen here, see + // https://docs.rs/serde_json/1.0.50/serde_json/fn.to_string.html + let json = serde_json::to_string(&self).unwrap(); + // EventResult's deserialize implementation always returns `Ok(...)` + serde_json::from_str::>(&json) + .unwrap() + .into_result() + .unwrap() + } +}