diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 325a2e2f..665e3287 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -397,6 +397,24 @@ impl Rooms { Ok(events) } + /// Force an update to the leaves of a room. + pub fn force_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + for key in self.roomid_pduleaves.scan_prefix(&prefix).keys() { + self.roomid_pduleaves.remove(key?)?; + } + + for event_id in event_ids.iter() { + let mut key = prefix.to_owned(); + key.extend_from_slice(event_id.as_bytes()); + self.roomid_pduleaves.insert(&key, event_id.as_bytes())?; + } + + Ok(()) + } + /// Replace the leaves of a room with a new event. pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); diff --git a/src/pdu.rs b/src/pdu.rs index 340ddee5..e38410fd 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -9,7 +9,7 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; +use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; #[derive(Clone, Deserialize, Serialize, Debug)] pub struct PduEvent { @@ -284,6 +284,25 @@ impl state_res::Event for PduEvent { } } +// These impl's allow us to dedup state snapshots when resolving state +// for incoming events (federation/send/{txn}). +impl Eq for PduEvent {} +impl PartialEq for PduEvent { + fn eq(&self, other: &Self) -> bool { + self.event_id == other.event_id + } +} +impl PartialOrd for PduEvent { + fn partial_cmp(&self, other: &Self) -> Option { + self.event_id.partial_cmp(&other.event_id) + } +} +impl Ord for PduEvent { + fn cmp(&self, other: &Self) -> Ordering { + self.event_id.cmp(&other.event_id) + } +} + /// Generates a correct eventId for the incoming pdu. /// /// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap`. diff --git a/src/server_server.rs b/src/server_server.rs index 0eb7d6f5..16a1a8e9 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -5,7 +5,6 @@ use log::{error, info, warn}; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ api::{ - client::r0::state, federation::{ directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{ @@ -25,7 +24,7 @@ use ruma::{ }; use state_res::{Event, EventMap, StateMap}; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashSet}, convert::TryFrom, fmt::Debug, future::Future, @@ -600,31 +599,21 @@ pub async fn send_transaction_message_route<'a>( let server_name = &body.body.origin; let mut pub_key_map = BTreeMap::new(); - if let Some(sig) = value.get("signatures") { - match sig { - CanonicalJsonValue::Object(entity) => { - for key in entity.keys() { - // TODO: save this in a DB maybe... - // fetch the public signing key - let origin = <&ServerName>::try_from(key.as_str()).unwrap(); - let keys = fetch_signing_keys(&db, origin).await?; - - pub_key_map.insert( - origin.to_string(), - keys.into_iter() - .map(|(k, v)| (k.to_string(), v.key)) - .collect(), - ); - } - } - _ => { - resolved_map.insert( - event_id, - Err("`signatures` is not a JSON object".to_string()), - ); - continue; - } - } + + if let Some(CanonicalJsonValue::String(sender)) = value.get("sender") { + let sender = + UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); + let origin = sender.server_name(); + + // TODO: this could fail or the server not respond... + let keys = fetch_signing_keys(&db, origin).await?; + + pub_key_map.insert( + origin.to_string(), + keys.into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect(), + ); } else { resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string())); continue; @@ -642,8 +631,9 @@ pub async fn send_transaction_message_route<'a>( // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events // 7. if not timeline event: stop - // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - let (pdu, previous) = match validate_event( + // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events + // the events found in step 8 can be authed/resolved and appended to the DB + let (pdu, previous): (_, Vec>) = match validate_event( &db, value, event_id.clone(), @@ -670,6 +660,9 @@ pub async fn send_transaction_message_route<'a>( // 6. persist the event as an outlier. db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; + // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all + // the checks in this list starting at 1. These are not timeline events. + // // Step 10. check the auth of the event passes based on the calculated state of the event let (state_at_event, incoming_auth_events): (StateMap>, Vec>) = match db @@ -771,8 +764,12 @@ pub async fn send_transaction_message_route<'a>( ); }; - // Gather the forward extremities and resolve - let fork_states = match forward_extremities( + // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res + // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) + // + // calculate_forward_extremities takes care of adding the current state if not already in the state sets + // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. + let (mut fork_states, fork_ids) = match calculate_forward_extremities( &db, &pdu, server_name, @@ -788,6 +785,12 @@ pub async fn send_transaction_message_route<'a>( } }; + // add the incoming events to the mix of state snapshots + // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets + fork_states.insert(state_at_event.clone()); + + let fork_states = fork_states.into_iter().collect::>(); + // 13. start state-res with all previous forward extremities minus the ones that are in // the prev_events of this event plus the new one created by this event and use // the result as the new room state @@ -901,7 +904,9 @@ pub async fn send_transaction_message_route<'a>( Err("Event has been soft failed".into()), ); } else { - append_state(&db, &pdu)?; + // Add the event to the DB and update the forward extremities (via roomid_pduleaves). + append_state(&db, &pdu, &fork_ids)?; + // Event has passed all auth/stateres checks resolved_map.insert(pdu.event_id().clone(), Ok(())); } @@ -1106,25 +1111,52 @@ async fn fetch_signing_keys( /// Gather all state snapshots needed to resolve the current state of the room. /// /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res -/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) -async fn forward_extremities( +/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote). +/// +/// The state snapshot of the incoming event __needs__ to be added to the resulting list. +async fn calculate_forward_extremities( db: &Database, pdu: &PduEvent, origin: &ServerName, pub_key_map: &PublicKeyMap, auth_cache: &mut EventMap>, -) -> Result>>> { +) -> Result<(BTreeSet>>, Vec)> { let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; + let mut is_incoming_leaf = true; + // Make sure the incoming event is not already a forward extremity + // FIXME: I think this could happen if different servers send us the same event?? + if current_leaves.contains(pdu.event_id()) { + is_incoming_leaf = false; + // Not sure what to do here + } + + // If the incoming event is already referenced by an existing event + // then do nothing - it's not a candidate to be a new extremity if + // it has been referenced. + if already_referenced(db, pdu)? { + is_incoming_leaf = false; + // This event has been dealt with already?? + } + + // TODO: + // [dendrite] Checks if any other leaves have been referenced and removes them + // but as long as we update the pdu leaves here and for events on our server this + // should not be possible. + + // Remove any forward extremities that are referenced by this incoming events prev_events for incoming_leaf in &pdu.prev_events { - if !current_leaves.contains(incoming_leaf) { - current_leaves.push(incoming_leaf.clone()); + if current_leaves.contains(incoming_leaf) { + if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) { + current_leaves.remove(pos); + } } } let current_hash = db.rooms.current_state_hash(pdu.room_id())?; + let mut includes_current_state = false; - let mut fork_states = vec![]; + let mut fork_states = BTreeSet::new(); for id in ¤t_leaves { if let Some(id) = db.rooms.get_pdu_id(id)? { let state_hash = db @@ -1142,8 +1174,10 @@ async fn forward_extremities( .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .collect(); - fork_states.push(state); + fork_states.insert(state); } else { + error!("Forward extremity not found... {}", id); + let res = db .sending .send_federation_request( @@ -1166,25 +1200,37 @@ async fn forward_extremities( .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) .collect(); - fork_states.push(state); + fork_states.insert(state); } } + // Add the incoming event only if it is a leaf, we do this after fetching all the + // state since we know we have already fetched the state of the incoming event so lets + // not do it again! + if is_incoming_leaf { + current_leaves.push(pdu.event_id().clone()); + } + // This guarantees that our current room state is included if !includes_current_state && current_hash.is_some() { - fork_states.push( + fork_states.insert( db.rooms .state_full(pdu.room_id(), current_hash.as_ref().unwrap())? .into_iter() .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .collect(), - ) + ); } - Ok(fork_states) + Ok((fork_states, dbg!(current_leaves))) +} + +/// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG) +fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result { + Ok(false) } -fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { +fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> { let count = db.globals.next_count()?; let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); @@ -1195,13 +1241,17 @@ fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; db.rooms.append_pdu( - &pdu, + pdu, utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), count, pdu_id.clone().into(), &db, )?; + // If we update the room leaves after calling append_pdu it will stick since append_pdu + // calls replace_pdu_leaves with only the given event. + db.rooms.force_pdu_leaves(pdu.room_id(), new_room_leaves)?; + // We set the room state after inserting the pdu, so that we never have a moment in time // where events in the current room state do not exist db.rooms.set_room_state(&pdu.room_id, &statehashid)?;