From 23b18d71eef78544148746d30d8b9630998a3a22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 18 Feb 2023 13:20:20 +0100 Subject: [PATCH] feat: handle backfill requests Based on https://gitlab.com/famedly/conduit/-/merge_requests/421 --- src/api/server_server.rs | 83 ++++++++++++++++- src/main.rs | 1 + src/service/mod.rs | 7 +- src/service/rooms/state_accessor/mod.rs | 114 +++++++++++++++++++++++- 4 files changed, 199 insertions(+), 6 deletions(-) diff --git a/src/api/server_server.rs b/src/api/server_server.rs index fc3e2c0f..11a6cbf4 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -12,6 +12,7 @@ use ruma::{ client::error::{Error as RumaError, ErrorKind}, federation::{ authorization::get_event_authorization, + backfill::get_backfill, device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey}, @@ -42,8 +43,9 @@ use ruma::{ }, serde::{Base64, JsonObject, Raw}, to_device::DeviceIdOrAllDevices, - CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, - OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, + uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, + OwnedEventId, OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, + ServerName, }; use serde_json::value::{to_raw_value, RawValue as RawJsonValue}; use std::{ @@ -950,6 +952,83 @@ pub async fn get_event_route( }) } +/// # `GET /_matrix/federation/v1/backfill/` +/// +/// Retrieves events from before the sender joined the room, if the room's +/// history visibility allows. +pub async fn get_backfill_route( + body: Ruma, +) -> Result { + if !services().globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let sender_servername = body + .sender_servername + .as_ref() + .expect("server is authenticated"); + + info!("Got backfill request from: {}", sender_servername); + + if !services() + .rooms + .state_cache + .server_in_room(sender_servername, &body.room_id)? + { + return Err(Error::BadRequest( + ErrorKind::Forbidden, + "Server is not in room.", + )); + } + + services() + .rooms + .event_handler + .acl_check(sender_servername, &body.room_id)?; + + let until = body + .v + .iter() + .map(|eventid| services().rooms.timeline.get_pdu_count(eventid)) + .filter_map(|r| r.ok().flatten()) + .max() + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "No known eventid in v", + ))?; + + let limit = body.limit.min(uint!(100)); + + let all_events = services() + .rooms + .timeline + .pdus_until(&user_id!("@doesntmatter:conduit.rs"), &body.room_id, until)? + .take(limit.try_into().unwrap()); + + let events = all_events + .filter_map(|r| r.ok()) + .filter(|(_, e)| { + matches!( + services().rooms.state_accessor.server_can_see_event( + sender_servername, + &e.room_id, + &e.event_id, + ), + Ok(true), + ) + }) + .map(|(pdu_id, _)| services().rooms.timeline.get_pdu_json_from_id(&pdu_id)) + .filter_map(|r| r.ok().flatten()) + .map(|pdu| PduEvent::convert_to_outgoing_federation_event(pdu)) + .collect(); + + Ok(get_backfill::v1::Response { + origin: services().globals.server_name().to_owned(), + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), + pdus: events, + }) +} + /// # `POST /_matrix/federation/v1/get_missing_events/{roomId}` /// /// Retrieves events that the sender is missing. diff --git a/src/main.rs b/src/main.rs index da80507c..fe6cfc01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -390,6 +390,7 @@ fn routes() -> Router { .ruma_route(server_server::get_public_rooms_filtered_route) .ruma_route(server_server::send_transaction_message_route) .ruma_route(server_server::get_event_route) + .ruma_route(server_server::get_backfill_route) .ruma_route(server_server::get_missing_events_route) .ruma_route(server_server::get_event_authorization_route) .ruma_route(server_server::get_room_state_route) diff --git a/src/service/mod.rs b/src/service/mod.rs index 385dcc69..07d80a15 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -77,7 +77,12 @@ impl Services { search: rooms::search::Service { db }, short: rooms::short::Service { db }, state: rooms::state::Service { db }, - state_accessor: rooms::state_accessor::Service { db }, + state_accessor: rooms::state_accessor::Service { + db, + server_visibility_cache: Mutex::new(LruCache::new( + (100.0 * config.conduit_cache_capacity_modifier) as usize, + )), + }, state_cache: rooms::state_cache::Service { db }, state_compressor: rooms::state_compressor::Service { db, diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs index 87d99368..e940ffa1 100644 --- a/src/service/rooms/state_accessor/mod.rs +++ b/src/service/rooms/state_accessor/mod.rs @@ -1,13 +1,28 @@ mod data; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; pub use data::Data; -use ruma::{events::StateEventType, EventId, RoomId}; +use lru_cache::LruCache; +use ruma::{ + events::{ + room::{ + history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + member::{MembershipState, RoomMemberEventContent}, + }, + StateEventType, + }, + EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId, +}; +use tracing::error; -use crate::{PduEvent, Result}; +use crate::{services, Error, PduEvent, Result}; pub struct Service { pub db: &'static dyn Data, + pub server_visibility_cache: Mutex>, } impl Service { @@ -46,6 +61,99 @@ impl Service { self.db.state_get(shortstatehash, event_type, state_key) } + /// Get membership for given user in state + fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result { + self.state_get( + shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + )? + .map_or(Ok(MembershipState::Leave), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomMemberEventContent| c.membership) + .map_err(|_| Error::bad_database("Invalid room membership event in database.")) + }) + } + + /// The user was a joined member at this state (potentially in the past) + fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join) + .unwrap_or_default() // Return sensible default, i.e. false + } + + /// The user was an invited or joined room member at this state (potentially + /// in the past) + fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool { + self.user_membership(shortstatehash, user_id) + .map(|s| s == MembershipState::Join || s == MembershipState::Invite) + .unwrap_or_default() // Return sensible default, i.e. false + } + + /// Whether a server is allowed to see an event through federation, based on + /// the room's history_visibility at that event's state. + #[tracing::instrument(skip(self))] + pub fn server_can_see_event( + &self, + origin: &ServerName, + room_id: &RoomId, + event_id: &EventId, + ) -> Result { + let shortstatehash = match self.pdu_shortstatehash(event_id)? { + Some(shortstatehash) => shortstatehash, + None => return Ok(false), + }; + + if let Some(visibility) = self + .server_visibility_cache + .lock() + .unwrap() + .get_mut(&(origin.to_owned(), shortstatehash)) + { + return Ok(*visibility); + } + + let history_visibility = self + .state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")? + .map_or(Ok(HistoryVisibility::Shared), |s| { + serde_json::from_str(s.content.get()) + .map(|c: RoomHistoryVisibilityEventContent| c.history_visibility) + .map_err(|_| { + Error::bad_database("Invalid history visibility event in database.") + }) + })?; + + let mut current_server_members = services() + .rooms + .state_cache + .room_members(room_id) + .filter_map(|r| r.ok()) + .filter(|member| member.server_name() == origin); + + let visibility = match history_visibility { + HistoryVisibility::WorldReadable | HistoryVisibility::Shared => true, + HistoryVisibility::Invited => { + // Allow if any member on requesting server was AT LEAST invited, else deny + current_server_members.any(|member| self.user_was_invited(shortstatehash, &member)) + } + HistoryVisibility::Joined => { + // Allow if any member on requested server was joined, else deny + current_server_members.any(|member| self.user_was_joined(shortstatehash, &member)) + } + _ => { + error!("Unknown history visibility {history_visibility}"); + false + } + }; + + self.server_visibility_cache + .lock() + .unwrap() + .insert((origin.to_owned(), shortstatehash), visibility); + + Ok(visibility) + } + /// Returns the state hash for this pdu. pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result> { self.db.pdu_shortstatehash(event_id)