From 260db9fcc701f536625a60f1a23b4036c46f5f53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 9 Aug 2021 19:15:14 +0200 Subject: [PATCH] improvement: try to load missing prev events --- src/database/rooms.rs | 18 ++++++++++ src/server_server.rs | 80 ++++++++++++++++++++++++++++--------------- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 10a6215d..0f422350 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -280,6 +280,24 @@ impl Rooms { .is_some()) } + /// Checks if a room exists. + pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result>> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + // Look for PDUs in that room. + self.pduid_pdu + .iter_from(&prefix, false) + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(_, pdu)| { + serde_json::from_slice(&pdu) + .map_err(|_| Error::bad_database("Invalid first PDU in db.")) + .map(Arc::new) + }) + .next() + .transpose() + } + /// Force the creation of a new StateHash and insert it into the db. /// /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot. diff --git a/src/server_server.rs b/src/server_server.rs index 9a847c33..bf5e4f34 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -272,13 +272,14 @@ where if status == 200 { let response = T::IncomingResponse::try_from_http_response(http_response); response.map_err(|e| { - warn!("Invalid 200 response: {}", e); + warn!("Invalid 200 response from {}: {}", &destination, e); Error::BadServerResponse("Server returned bad 200 response.") }) } else { Err(Error::FederationError( destination.to_owned(), - RumaError::try_from_http_response(http_response).map_err(|_| { + RumaError::try_from_http_response(http_response).map_err(|e| { + warn!("Server returned bad error response: {}", e); Error::BadServerResponse("Server returned bad error response.") })?, )) @@ -811,7 +812,7 @@ pub async fn send_transaction_message_route( } /// An async function that can recursively call itself. -type AsyncRecursiveResult<'a, T, E> = Pin> + 'a + Send>>; +type AsyncRecursiveType<'a, T> = Pin + 'a + Send>>; /// When receiving an event one needs to: /// 0. Check the server is in the room @@ -836,7 +837,7 @@ type AsyncRecursiveResult<'a, T, E> = Pin( origin: &'a ServerName, @@ -846,7 +847,7 @@ pub fn handle_incoming_pdu<'a>( is_timeline_event: bool, db: &'a Database, pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveResult<'a, Option>, String> { +) -> AsyncRecursiveType<'a, StdResult>, String>> { Box::pin(async move { // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json match db.rooms.exists(&room_id) { @@ -920,9 +921,15 @@ pub fn handle_incoming_pdu<'a>( // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // EDIT: Step 5 is not applied anymore because it failed too often debug!("Fetching auth events for {}", incoming_pdu.event_id); - fetch_and_handle_events(db, origin, &incoming_pdu.auth_events, &room_id, pub_key_map) - .await - .map_err(|e| e.to_string())?; + fetch_and_handle_events( + db, + origin, + &incoming_pdu.auth_events, + &room_id, + pub_key_map, + false, + ) + .await; // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events debug!( @@ -1004,10 +1011,28 @@ pub fn handle_incoming_pdu<'a>( debug!("Added pdu as outlier."); // 8. if not timeline event: stop - if !is_timeline_event { + if !is_timeline_event + || incoming_pdu.origin_server_ts + < db.rooms + .first_pdu_in_room(&room_id) + .map_err(|_| "Error loading first room event.".to_owned())? + .expect("Room exists") + .origin_server_ts + { return Ok(None); } + // Load missing prev events first + fetch_and_handle_events( + db, + origin, + &incoming_pdu.prev_events, + &room_id, + pub_key_map, + true, + ) + .await; + // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities @@ -1034,9 +1059,9 @@ pub fn handle_incoming_pdu<'a>( &state.into_iter().collect::>(), &room_id, pub_key_map, + false, ) .await - .map_err(|_| "Failed to fetch state events locally".to_owned())? .into_iter() .map(|pdu| { ( @@ -1081,18 +1106,15 @@ pub fn handle_incoming_pdu<'a>( { Ok(res) => { debug!("Fetching state events at event."); - let state_vec = match fetch_and_handle_events( + let state_vec = fetch_and_handle_events( &db, origin, &res.pdu_ids, &room_id, pub_key_map, + false, ) - .await - { - Ok(state) => state, - Err(_) => return Err("Failed to fetch state events.".to_owned()), - }; + .await; let mut state = HashMap::new(); for pdu in state_vec { @@ -1118,18 +1140,15 @@ pub fn handle_incoming_pdu<'a>( } debug!("Fetching auth chain events at event."); - match fetch_and_handle_events( + fetch_and_handle_events( &db, origin, &res.auth_chain_ids, &room_id, pub_key_map, + false, ) - .await - { - Ok(state) => state, - Err(_) => return Err("Failed to fetch auth chain.".to_owned()), - }; + .await; state_at_incoming_event = Some(state); } @@ -1381,7 +1400,8 @@ pub(crate) fn fetch_and_handle_events<'a>( events: &'a [EventId], room_id: &'a RoomId, pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveResult<'a, Vec>, Error> { + are_timeline_events: bool, +) -> AsyncRecursiveType<'a, Vec>> { Box::pin(async move { let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { Entry::Vacant(e) => { @@ -1408,7 +1428,12 @@ pub(crate) fn fetch_and_handle_events<'a>( // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu checks both) - let pdu = match db.rooms.get_pdu(&id) { + let local_pdu = if are_timeline_events { + db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new)) + } else { + db.rooms.get_pdu(&id) + }; + let pdu = match local_pdu { Ok(Some(pdu)) => { trace!("Found {} in db", id); pdu @@ -1439,7 +1464,7 @@ pub(crate) fn fetch_and_handle_events<'a>( &event_id, &room_id, value.clone(), - false, + are_timeline_events, db, pub_key_map, ) @@ -1482,7 +1507,7 @@ pub(crate) fn fetch_and_handle_events<'a>( }; pdus.push(pdu); } - Ok(pdus) + pdus }) } @@ -2193,7 +2218,8 @@ pub async fn create_join_event_route( &pub_key_map, ) .await - .map_err(|_| { + .map_err(|e| { + warn!("Error while handling incoming send join PDU: {}", e); Error::BadRequest( ErrorKind::InvalidParam, "Error while handling incoming PDU.",