From 35c1904b37812b08576e8da84d9e4effd2f71fc8 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Sun, 24 Jan 2021 20:18:40 -0500 Subject: [PATCH] Finish forward extremity gathering, use resolved state as new snapshot --- src/server_server.rs | 147 +++++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 67 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index f782ad5a..e733d248 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -18,6 +18,7 @@ use ruma::{ OutgoingRequest, }, directory::{IncomingFilter, IncomingRoomNetwork}, + events::EventType, serde::to_canonical_value, signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, @@ -483,34 +484,6 @@ pub async fn get_public_rooms_route( .into()) } -#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub enum PrevEvents { - Sequential(T), - Fork(Vec), -} - -impl IntoIterator for PrevEvents { - type Item = T; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - match self { - Self::Sequential(item) => vec![item].into_iter(), - Self::Fork(list) => list.into_iter(), - } - } -} - -impl PrevEvents { - pub fn new(id: &[T]) -> Self { - match id { - [] => panic!("All events must have previous event"), - [single_id] => Self::Sequential(single_id.clone()), - rest => Self::Fork(rest.to_vec()), - } - } -} - #[cfg_attr( feature = "conduit_bin", put("/_matrix/federation/v1/send/<_>", data = "") @@ -605,8 +578,16 @@ pub async fn send_transaction_message_route<'a>( UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); let origin = sender.server_name(); - // TODO: this could fail or the server not respond... - let keys = fetch_signing_keys(&db, origin).await?; + let keys = match fetch_signing_keys(&db, origin).await { + Ok(keys) => keys, + Err(_) => { + resolved_map.insert( + event_id, + Err("Could not find signing keys for this server".to_string()), + ); + continue; + } + }; pub_key_map.insert( origin.to_string(), @@ -769,11 +750,12 @@ pub async fn send_transaction_message_route<'a>( // // calculate_forward_extremities takes care of adding the current state if not already in the state sets // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. - let (mut fork_states, fork_ids) = match calculate_forward_extremities( + let (mut fork_states, extremities) = match calculate_forward_extremities( &db, &pdu, server_name, &pub_key_map, + current_state, &mut auth_cache, ) .await @@ -791,6 +773,7 @@ pub async fn send_transaction_message_route<'a>( let fork_states = fork_states.into_iter().collect::>(); + let mut update_state = false; // 13. start state-res with all previous forward extremities minus the ones that are in // the prev_events of this event plus the new one created by this event and use // the result as the new room state @@ -800,11 +783,12 @@ pub async fn send_transaction_message_route<'a>( } else if fork_states.len() == 1 { fork_states[0].clone() } else { + // We do need to force an update to this rooms state + update_state = true; + // TODO: remove this is for current debugging Jan, 15 2021 let mut number_fetches = 0_u32; let mut auth_events = vec![]; - // this keeps track if we error so we can break out of these inner loops - // to continue on with the incoming PDU's for map in &fork_states { let mut state_auth = vec![]; for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { @@ -821,14 +805,12 @@ pub async fn send_transaction_message_route<'a>( .await .map(|mut vec| { number_fetches += 1; - vec.remove(0) + vec.pop() }) { - Ok(aev) => aev, - Err(_) => { - resolved_map.insert( - event_id.clone(), - Err("Event has been soft failed".into()), - ); + Ok(Some(aev)) => aev, + _ => { + resolved_map + .insert(event_id.clone(), Err("Failed to fetch event".into())); continue 'main_pdu_loop; } }, @@ -839,20 +821,19 @@ pub async fn send_transaction_message_route<'a>( } info!("{} event's were not in the auth_cache", number_fetches); - let mut event_map = EventMap::new(); // Add everything we will need to event_map - event_map.extend( + auth_cache.extend( auth_events .iter() .map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) .flatten(), ); - event_map.extend( + auth_cache.extend( incoming_auth_events .into_iter() .map(|pdu| (pdu.event_id().clone(), pdu)), ); - event_map.extend( + auth_cache.extend( state_at_event .into_iter() .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), @@ -873,7 +854,7 @@ pub async fn send_transaction_message_route<'a>( .into_iter() .map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) .collect(), - &mut event_map, + &mut auth_cache, ) { Ok(res) => res .into_iter() @@ -905,14 +886,23 @@ pub async fn send_transaction_message_route<'a>( ); } else { // Add the event to the DB and update the forward extremities (via roomid_pduleaves). - append_state(&db, &pdu, &fork_ids)?; + append_incoming_pdu( + &db, + &pdu, + &extremities, + if update_state { + Some(state_at_forks) + } else { + None + }, + )?; // Event has passed all auth/stateres checks resolved_map.insert(pdu.event_id().clone(), Ok(())); } } - Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) + Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } /// An async function that can recursively calls itself. @@ -1029,6 +1019,7 @@ async fn fetch_check_auth_events( continue; } + // TODO: Batch these async calls so we can wait on multiple at once let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) .await .map(|mut vec| { @@ -1119,6 +1110,7 @@ async fn calculate_forward_extremities( pdu: &PduEvent, origin: &ServerName, pub_key_map: &PublicKeyMap, + current_state: BTreeMap<(EventType, Option), Arc>, auth_cache: &mut EventMap>, ) -> Result<(BTreeSet>>, Vec)> { let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; @@ -1126,17 +1118,13 @@ async fn calculate_forward_extremities( let mut is_incoming_leaf = true; // Make sure the incoming event is not already a forward extremity // FIXME: I think this could happen if different servers send us the same event?? - if current_leaves.contains(pdu.event_id()) { - is_incoming_leaf = false; - // Not sure what to do here - } - + // // If the incoming event is already referenced by an existing event // then do nothing - it's not a candidate to be a new extremity if // it has been referenced. - if already_referenced(db, pdu)? { + if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() { is_incoming_leaf = false; - // This event has been dealt with already?? + // Not sure what to do here } // TODO: @@ -1213,29 +1201,54 @@ async fn calculate_forward_extremities( // This guarantees that our current room state is included if !includes_current_state && current_hash.is_some() { - fork_states.insert( - db.rooms - .state_full(pdu.room_id(), current_hash.as_ref().unwrap())? - .into_iter() - .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) - .collect(), - ); + fork_states.insert(current_state); } Ok((fork_states, dbg!(current_leaves))) } -/// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG) -fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result { - Ok(false) -} - -fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> { +/// Update the room state to be the resolved state and add the fully auth'ed event +/// to the DB. +/// +/// TODO: If we force the state we need to validate all events in that state +/// any events we fetched from another server need to be fully verified? +fn append_incoming_pdu( + db: &Database, + pdu: &PduEvent, + new_room_leaves: &[EventId], + state: Option>>, +) -> Result<()> { let count = db.globals.next_count()?; let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); + // Update the state of the room if needed + // We can tell if we need to do this based on wether state resolution took place or not + if let Some(state) = state { + let new = state + .into_iter() + .map(|((ev, k), pdu)| { + Ok(( + ( + ev, + k.ok_or_else(|| Error::Conflict("State contained non state event"))?, + ), + db.rooms + .get_pdu_id(pdu.event_id()) + .ok() + .flatten() + .ok_or_else(|| Error::Conflict("Resolved state contained unknown event"))? + .to_vec(), + )) + }) + .collect::>()?; + + info!("Force update of state for {:?}", pdu); + + db.rooms.force_state(pdu.room_id(), new, &db.globals)?; + } + // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;