From 591769d5f3fde6314ba84130898dc6202b9b5c98 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Mon, 1 Feb 2021 17:02:56 -0500 Subject: [PATCH] Fiter PDU's before main incoming PDU loop --- src/database/rooms.rs | 11 +---- src/server_server.rs | 103 ++++++++++++++++++++++++++---------------- 2 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index ee8f0ab0..6ee29a69 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -466,16 +466,7 @@ impl Rooms { /// Returns true if the event_id was previously inserted. pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result { - log::info!( - "Number of outlier pdu's {:#?}", - self.eventid_outlierpdu - .iter() - .map(|pair| { - let (_k, v) = pair.unwrap(); - serde_json::from_slice::(&v).unwrap() - }) - .collect::>() - ); + log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len()); let mut key = pdu.room_id().as_bytes().to_vec(); key.push(0xff); diff --git a/src/server_server.rs b/src/server_server.rs index 5177f965..2cfbc6e6 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -533,6 +533,54 @@ pub async fn send_transaction_message_route<'a>( } } + let mut resolved_map = BTreeMap::new(); + + let pdus_to_resolve = body + .pdus + .iter() + .filter_map(|pdu| { + // 1. Is a valid event, otherwise it is dropped. + // Ruma/PduEvent/StateEvent satisfies this + // We do not add the event_id field to the pdu here because of signature and hashes checks + let (event_id, value) = crate::pdu::gen_event_id_canonical_json(pdu); + + // If we have no idea about this room skip the PDU + let room_id = match value + .get("room_id") + .map(|id| match id { + CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(), + _ => None, + }) + .flatten() + { + Some(id) => id, + None => { + resolved_map.insert(event_id, Err("Event needs a valid RoomId".to_string())); + return None; + } + }; + + // 1. check the server is in the room (optional) + match db.rooms.exists(&room_id) { + Ok(true) => {} + _ => { + resolved_map + .insert(event_id, Err("Room is unknown to this server".to_string())); + return None; + } + } + + // If we know of this pdu we don't need to continue processing it + // + // This check is essentially + if let Ok(Some(_)) = db.rooms.get_pdu_id(&event_id) { + return None; + } + + Some((event_id, value)) + }) + .collect::>(); + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere? // SPEC: // Servers MUST strictly enforce the JSON format specified in the appendices. @@ -540,35 +588,7 @@ pub async fn send_transaction_message_route<'a>( // events over federation. For example, the Federation API's /send endpoint would // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. - let mut resolved_map = BTreeMap::new(); - 'main_pdu_loop: for pdu in &body.pdus { - // 1. Is a valid event, otherwise it is dropped. - // Ruma/PduEvent/StateEvent satisfies this - // We do not add the event_id field to the pdu here because of signature and hashes checks - let (event_id, value) = crate::pdu::gen_event_id_canonical_json(pdu); - - // If we have no idea about this room skip the PDU - let room_id = match value - .get("room_id") - .map(|id| match id { - CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(), - _ => None, - }) - .flatten() - { - Some(id) => id, - None => { - resolved_map.insert(event_id, Err("Event needs a valid RoomId".to_string())); - continue; - } - }; - - // 1. check the server is in the room (optional) - if !db.rooms.exists(&room_id)? { - resolved_map.insert(event_id, Err("Room is unknown to this server".to_string())); - continue; - } - + 'main_pdu_loop: for (event_id, value) in pdus_to_resolve { let server_name = &body.body.origin; let mut pub_key_map = BTreeMap::new(); @@ -921,13 +941,13 @@ pub async fn send_transaction_message_route<'a>( )?; // Event has passed all auth/stateres checks - resolved_map.insert(pdu.event_id().clone(), Ok(())); } - Ok(send_transaction_message::v1::Response { - pdus: dbg!(resolved_map), + if !resolved_map.is_empty() { + warn!("These PDU's failed {:?}", resolved_map); } - .into()) + + Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } /// An async function that can recursively calls itself. @@ -1139,6 +1159,7 @@ pub(crate) async fn calculate_forward_extremities( // Make sure the incoming event is not already a forward extremity // FIXME: I think this could happen if different servers send us the same event?? if current_leaves.contains(pdu.event_id()) { + error!("The incoming event is already present in get_pdu_leaves BUG"); is_incoming_leaf = false; // Not sure what to do here } @@ -1147,11 +1168,12 @@ pub(crate) async fn calculate_forward_extremities( // then do nothing - it's not a candidate to be a new extremity if // it has been referenced. // - // We first check if know of the event and then don't include it as a forward - // extremity if it is a timeline event - if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { - is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); - } + // We check this in the filter just before the main incoming PDU for loop + // so no already known event can make it this far. + // + // if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { + // is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); + // } // TODO: // [dendrite] Checks if any other leaves have been referenced and removes them @@ -1219,7 +1241,12 @@ pub(crate) async fn build_forward_extremity_snapshots( } fork_states.insert(state_before); + } else if id == pdu.event_id() { + // We add this snapshot after `build_forward_extremity_snapshots` is + // called which we requested from the sending server } else { + error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind()); + let res = db .sending .send_federation_request(