From ecd1e45a449036e8d3166e1c59b51a161bf31f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 14 Aug 2021 21:56:15 +0200 Subject: [PATCH] fix: fetch more than one prev event --- src/server_server.rs | 721 ++++++++++++++++++++++--------------------- 1 file changed, 377 insertions(+), 344 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index 5b2acd00..e7221261 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -867,17 +867,19 @@ pub async fn handle_incoming_pdu<'a>( .map_err(|_| "Failed to ask database for event.".to_owned())? .ok_or_else(|| "Failed to find create event in db.".to_owned())?; - let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?; + let (incoming_pdu, val) = handle_outlier_pdu( + origin, + &create_event, + event_id, + room_id, + value, + db, + pub_key_map, + ) + .await?; // 8. if not timeline event: stop - if !is_timeline_event - || incoming_pdu.origin_server_ts - < db.rooms - .first_pdu_in_room(&room_id) - .map_err(|_| "Error loading first room event.".to_owned())? - .expect("Room exists") - .origin_server_ts - { + if !is_timeline_event { return Ok(None); } @@ -893,16 +895,45 @@ pub async fn handle_incoming_pdu<'a>( &room_id, pub_key_map, ) - .await.pop() { - todo_timeline_stack.push((pdu, json)); + .await + .pop() + { + if incoming_pdu.origin_server_ts + > db.rooms + .first_pdu_in_room(&room_id) + .map_err(|_| "Error loading first room event.".to_owned())? + .expect("Room exists") + .origin_server_ts + { + todo_outlier_stack.extend(pdu.prev_events.iter().cloned()); + todo_timeline_stack.push((pdu, json)); + } } } while let Some(prev) = todo_timeline_stack.pop() { - upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?; + upgrade_outlier_to_timeline_pdu( + prev.0, + prev.1, + &create_event, + origin, + db, + room_id, + pub_key_map, + ) + .await?; } - upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await + upgrade_outlier_to_timeline_pdu( + incoming_pdu, + val, + &create_event, + origin, + db, + room_id, + pub_key_map, + ) + .await } fn handle_outlier_pdu<'a>( @@ -913,7 +944,8 @@ fn handle_outlier_pdu<'a>( value: BTreeMap, db: &'a Database, pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveType<'a, StdResult<(Arc, BTreeMap), String>> { +) -> AsyncRecursiveType<'a, StdResult<(Arc, BTreeMap), String>> +{ Box::pin(async move { let start_time = Instant::now(); @@ -928,11 +960,11 @@ fn handle_outlier_pdu<'a>( // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match - let create_event_content = - serde_json::from_value::>(create_event.content.clone()) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; + let create_event_content = + serde_json::from_value::>(create_event.content.clone()) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; let room_version_id = &create_event_content.room_version; let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); @@ -1062,9 +1094,8 @@ fn handle_outlier_pdu<'a>( .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; debug!("Added pdu as outlier."); - Ok((incoming_pdu,val)) + Ok((incoming_pdu, val)) }) - } async fn upgrade_outlier_to_timeline_pdu( @@ -1076,381 +1107,385 @@ async fn upgrade_outlier_to_timeline_pdu( room_id: &RoomId, pub_key_map: &RwLock>>, ) -> StdResult>, String> { - // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities - // doing all the checks in this list starting at 1. These are not timeline events. - - // TODO: if we know the prev_events of the incoming event we can avoid the request and build - // the state from a known point and resolve if > 1 prev_event + // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities + // doing all the checks in this list starting at 1. These are not timeline events. - debug!("Requesting state at event."); - let mut state_at_incoming_event = None; - - if incoming_pdu.prev_events.len() == 1 { - let prev_event = &incoming_pdu.prev_events[0]; - let prev_event_sstatehash = db - .rooms - .pdu_shortstatehash(prev_event) - .map_err(|_| "Failed talking to db".to_owned())?; + // TODO: if we know the prev_events of the incoming event we can avoid the request and build + // the state from a known point and resolve if > 1 prev_event - let state = - prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); + debug!("Requesting state at event."); + let mut state_at_incoming_event = None; - if let Some(Ok(state)) = state { - warn!("Using cached state"); - let mut state = fetch_and_handle_outliers( - db, - origin, - &state.into_iter().collect::>(), - &create_event, - &room_id, - pub_key_map, - ) - .await - .into_iter() - .map(|(pdu,_)| { + if incoming_pdu.prev_events.len() == 1 { + let prev_event = &incoming_pdu.prev_events[0]; + let prev_event_sstatehash = db + .rooms + .pdu_shortstatehash(prev_event) + .map_err(|_| "Failed talking to db".to_owned())?; + + let state = + prev_event_sstatehash.map(|shortstatehash| db.rooms.state_full_ids(shortstatehash)); + + if let Some(Ok(state)) = state { + warn!("Using cached state"); + let mut state = fetch_and_handle_outliers( + db, + origin, + &state.into_iter().collect::>(), + &create_event, + &room_id, + pub_key_map, + ) + .await + .into_iter() + .map(|(pdu, _)| { + ( ( - ( - pdu.kind.clone(), - pdu.state_key - .clone() - .expect("events from state_full_ids are state events"), - ), - pdu, - ) - }) - .collect::>(); + pdu.kind.clone(), + pdu.state_key + .clone() + .expect("events from state_full_ids are state events"), + ), + pdu, + ) + }) + .collect::>(); - let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { + let prev_pdu = + db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { "Could not find prev event, but we know the state.".to_owned() })?; - if let Some(state_key) = &prev_pdu.state_key { - state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); - } - - state_at_incoming_event = Some(state); + if let Some(state_key) = &prev_pdu.state_key { + state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); } - // TODO: set incoming_auth_events? + + state_at_incoming_event = Some(state); } + // TODO: set incoming_auth_events? + } - if state_at_incoming_event.is_none() { - warn!("Calling /state_ids"); - // Call /state_ids to find out what the state at this pdu is. We trust the server's - // response to some extend, but we still do a lot of checks on the events - match db - .sending - .send_federation_request( - &db.globals, + if state_at_incoming_event.is_none() { + warn!("Calling /state_ids"); + // Call /state_ids to find out what the state at this pdu is. We trust the server's + // response to some extend, but we still do a lot of checks on the events + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_room_state_ids::v1::Request { + room_id: &room_id, + event_id: &incoming_pdu.event_id, + }, + ) + .await + { + Ok(res) => { + debug!("Fetching state events at event."); + let state_vec = fetch_and_handle_outliers( + &db, origin, - get_room_state_ids::v1::Request { - room_id: &room_id, - event_id: &incoming_pdu.event_id, - }, + &res.pdu_ids, + &create_event, + &room_id, + pub_key_map, ) - .await - { - Ok(res) => { - debug!("Fetching state events at event."); - let state_vec = fetch_and_handle_outliers( - &db, - origin, - &res.pdu_ids, - &create_event, - &room_id, - pub_key_map, - ) - .await; - - let mut state = HashMap::new(); - for (pdu, _) in state_vec { - match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { - Entry::Vacant(v) => { - v.insert(pdu); - } - Entry::Occupied(_) => { - return Err( - "State event's type and state_key combination exists multiple times.".to_owned(), - ) - } + .await; + + let mut state = HashMap::new(); + for (pdu, _) in state_vec { + match state.entry(( + pdu.kind.clone(), + pdu.state_key + .clone() + .ok_or_else(|| "Found non-state pdu in state events.".to_owned())?, + )) { + Entry::Vacant(v) => { + v.insert(pdu); } + Entry::Occupied(_) => return Err( + "State event's type and state_key combination exists multiple times." + .to_owned(), + ), } + } - // The original create event must still be in the state - if state - .get(&(EventType::RoomCreate, "".to_owned())) - .map(|a| a.as_ref()) - != Some(&create_event) - { - return Err("Incoming event refers to wrong create event.".to_owned()); - } + // The original create event must still be in the state + if state + .get(&(EventType::RoomCreate, "".to_owned())) + .map(|a| a.as_ref()) + != Some(&create_event) + { + return Err("Incoming event refers to wrong create event.".to_owned()); + } - debug!("Fetching auth chain events at event."); - fetch_and_handle_outliers( - &db, - origin, - &res.auth_chain_ids, - &create_event, - &room_id, - pub_key_map, - ) - .await; + debug!("Fetching auth chain events at event."); + fetch_and_handle_outliers( + &db, + origin, + &res.auth_chain_ids, + &create_event, + &room_id, + pub_key_map, + ) + .await; - state_at_incoming_event = Some(state); - } - Err(_) => { - return Err("Fetching state for event failed".into()); - } - }; - } + state_at_incoming_event = Some(state); + } + Err(_) => { + return Err("Fetching state for event failed".into()); + } + }; + } - let state_at_incoming_event = - state_at_incoming_event.expect("we always set this to some above"); + let state_at_incoming_event = + state_at_incoming_event.expect("we always set this to some above"); - // 11. Check the auth of the event passes based on the state of the event + // 11. Check the auth of the event passes based on the state of the event let create_event_content = serde_json::from_value::>(create_event.content.clone()) .expect("Raw::from_value always works.") .deserialize() .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; - let room_version_id = &create_event_content.room_version; - let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); - - // If the previous event was the create event special rules apply - let previous_create = if incoming_pdu.auth_events.len() == 1 - && incoming_pdu.prev_events == incoming_pdu.auth_events - { - db.rooms - .get_pdu(&incoming_pdu.auth_events[0]) - .map_err(|e| e.to_string())? - .filter(|maybe_create| **maybe_create == *create_event) - } else { - None - }; + let room_version_id = &create_event_content.room_version; + let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + // If the previous event was the create event special rules apply + let previous_create = if incoming_pdu.auth_events.len() == 1 + && incoming_pdu.prev_events == incoming_pdu.auth_events + { + db.rooms + .get_pdu(&incoming_pdu.auth_events[0]) + .map_err(|e| e.to_string())? + .filter(|maybe_create| **maybe_create == *create_event) + } else { + None + }; - if !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - previous_create.clone(), - &state_at_incoming_event, - None, // TODO: third party invite - ) - .map_err(|_e| "Auth check failed.".to_owned())? - { - return Err("Event has failed auth check with state at the event.".into()); - } - debug!("Auth check succeeded."); + if !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + previous_create.clone(), + &state_at_incoming_event, + None, // TODO: third party invite + ) + .map_err(|_e| "Auth check failed.".to_owned())? + { + return Err("Event has failed auth check with state at the event.".into()); + } + debug!("Auth check succeeded."); - // We start looking at current room state now, so lets lock the room + // We start looking at current room state now, so lets lock the room - let mutex_state = Arc::clone( - db.globals - .roomid_mutex_state - .write() - .unwrap() - .entry(room_id.clone()) - .or_default(), - ); - let state_lock = mutex_state.lock().await; + let mutex_state = Arc::clone( + db.globals + .roomid_mutex_state + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let state_lock = mutex_state.lock().await; - // Now we calculate the set of extremities this room has after the incoming event has been - // applied. We start with the previous extremities (aka leaves) - let mut extremities = db - .rooms - .get_pdu_leaves(&room_id) - .map_err(|_| "Failed to load room leaves".to_owned())?; + // Now we calculate the set of extremities this room has after the incoming event has been + // applied. We start with the previous extremities (aka leaves) + let mut extremities = db + .rooms + .get_pdu_leaves(&room_id) + .map_err(|_| "Failed to load room leaves".to_owned())?; - // Remove any forward extremities that are referenced by this incoming event's prev_events - for prev_event in &incoming_pdu.prev_events { - if extremities.contains(prev_event) { - extremities.remove(prev_event); - } + // Remove any forward extremities that are referenced by this incoming event's prev_events + for prev_event in &incoming_pdu.prev_events { + if extremities.contains(prev_event) { + extremities.remove(prev_event); } + } - // Only keep those extremities were not referenced yet - extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); + // Only keep those extremities were not referenced yet + extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true))); - let mut extremity_statehashes = Vec::new(); + let mut extremity_statehashes = Vec::new(); - for id in &extremities { - match db - .rooms - .get_pdu(&id) - .map_err(|_| "Failed to ask db for pdu.".to_owned())? - { - Some(leaf_pdu) => { - extremity_statehashes.push(( - db.rooms - .pdu_shortstatehash(&leaf_pdu.event_id) - .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? - .ok_or_else(|| { - error!( - "Found extremity pdu with no statehash in db: {:?}", - leaf_pdu - ); - "Found pdu with no statehash in db.".to_owned() - })?, - Some(leaf_pdu), - )); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err("Missing state snapshot.".to_owned()); - } + for id in &extremities { + match db + .rooms + .get_pdu(&id) + .map_err(|_| "Failed to ask db for pdu.".to_owned())? + { + Some(leaf_pdu) => { + extremity_statehashes.push(( + db.rooms + .pdu_shortstatehash(&leaf_pdu.event_id) + .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? + .ok_or_else(|| { + error!( + "Found extremity pdu with no statehash in db: {:?}", + leaf_pdu + ); + "Found pdu with no statehash in db.".to_owned() + })?, + Some(leaf_pdu), + )); + } + _ => { + error!("Missing state snapshot for {:?}", id); + return Err("Missing state snapshot.".to_owned()); } } + } - // 12. Ensure that the state is derived from the previous current state (i.e. we calculated - // by doing state res where one of the inputs was a previously trusted set of state, - // don't just trust a set of state we got from a remote). + // 12. Ensure that the state is derived from the previous current state (i.e. we calculated + // by doing state res where one of the inputs was a previously trusted set of state, + // don't just trust a set of state we got from a remote). - // We do this by adding the current state to the list of fork states - let current_statehash = db - .rooms - .current_shortstatehash(&room_id) - .map_err(|_| "Failed to load current state hash.".to_owned())? - .expect("every room has state"); + // We do this by adding the current state to the list of fork states + let current_statehash = db + .rooms + .current_shortstatehash(&room_id) + .map_err(|_| "Failed to load current state hash.".to_owned())? + .expect("every room has state"); - let current_state = db - .rooms - .state_full(current_statehash) - .map_err(|_| "Failed to load room state.")?; + let current_state = db + .rooms + .state_full(current_statehash) + .map_err(|_| "Failed to load room state.")?; - extremity_statehashes.push((current_statehash.clone(), None)); + extremity_statehashes.push((current_statehash.clone(), None)); - let mut fork_states = Vec::new(); - for (statehash, leaf_pdu) in extremity_statehashes { - let mut leaf_state = db - .rooms - .state_full(statehash) - .map_err(|_| "Failed to ask db for room state.".to_owned())?; - - if let Some(leaf_pdu) = leaf_pdu { - if let Some(state_key) = &leaf_pdu.state_key { - // Now it's the state after - let key = (leaf_pdu.kind.clone(), state_key.clone()); - leaf_state.insert(key, leaf_pdu); - } + let mut fork_states = Vec::new(); + for (statehash, leaf_pdu) in extremity_statehashes { + let mut leaf_state = db + .rooms + .state_full(statehash) + .map_err(|_| "Failed to ask db for room state.".to_owned())?; + + if let Some(leaf_pdu) = leaf_pdu { + if let Some(state_key) = &leaf_pdu.state_key { + // Now it's the state after + let key = (leaf_pdu.kind.clone(), state_key.clone()); + leaf_state.insert(key, leaf_pdu); } - - fork_states.push(leaf_state); } - // We also add state after incoming event to the fork states - extremities.insert(incoming_pdu.event_id.clone()); - let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { - state_after.insert( - (incoming_pdu.kind.clone(), state_key.clone()), - incoming_pdu.clone(), + fork_states.push(leaf_state); + } + + // We also add state after incoming event to the fork states + extremities.insert(incoming_pdu.event_id.clone()); + let mut state_after = state_at_incoming_event.clone(); + if let Some(state_key) = &incoming_pdu.state_key { + state_after.insert( + (incoming_pdu.kind.clone(), state_key.clone()), + incoming_pdu.clone(), + ); + } + fork_states.push(state_after.clone()); + + let mut update_state = false; + // 14. Use state resolution to find new room state + let new_room_state = if fork_states.is_empty() { + return Err("State is empty.".to_owned()); + } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { + // There was only one state, so it has to be the room's current state (because that is + // always included) + fork_states[0] + .iter() + .map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) + .collect() + } else { + // We do need to force an update to this room's state + update_state = true; + + let fork_states = &fork_states + .into_iter() + .map(|map| { + map.into_iter() + .map(|(k, v)| (k, v.event_id.clone())) + .collect::>() + }) + .collect::>(); + + let mut auth_chain_sets = Vec::new(); + for state in fork_states { + auth_chain_sets.push( + get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) + .map_err(|_| "Failed to load auth chain.".to_owned())? + .collect(), ); } - fork_states.push(state_after.clone()); - - let mut update_state = false; - // 14. Use state resolution to find new room state - let new_room_state = if fork_states.is_empty() { - return Err("State is empty.".to_owned()); - } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { - // There was only one state, so it has to be the room's current state (because that is - // always included) - fork_states[0] - .iter() - .map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) - .collect() - } else { - // We do need to force an update to this room's state - update_state = true; - - let fork_states = &fork_states - .into_iter() - .map(|map| { - map.into_iter() - .map(|(k, v)| (k, v.event_id.clone())) - .collect::>() - }) - .collect::>(); - - let mut auth_chain_sets = Vec::new(); - for state in fork_states { - auth_chain_sets.push( - get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) - .map_err(|_| "Failed to load auth chain.".to_owned())? - .collect(), - ); - } - let state = match state_res::StateResolution::resolve( - &room_id, - room_version_id, - fork_states, - auth_chain_sets, - |id| { - let res = db.rooms.get_pdu(id); - if let Err(e) = &res { - error!("LOOK AT ME Failed to fetch event: {}", e); - } - res.ok().flatten() - }, - ) { - Ok(new_state) => new_state, - Err(_) => { - return Err("State resolution failed, either an event could not be found or deserialization".into()); + let state = match state_res::StateResolution::resolve( + &room_id, + room_version_id, + fork_states, + auth_chain_sets, + |id| { + let res = db.rooms.get_pdu(id); + if let Err(e) = &res { + error!("LOOK AT ME Failed to fetch event: {}", e); } - }; - - state + res.ok().flatten() + }, + ) { + Ok(new_state) => new_state, + Err(_) => { + return Err("State resolution failed, either an event could not be found or deserialization".into()); + } }; - debug!("starting soft fail auth check"); - // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - let soft_fail = !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - previous_create, - ¤t_state, - None, - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - - let mut pdu_id = None; - if !soft_fail { - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. - pdu_id = Some( - append_incoming_pdu( - &db, - &incoming_pdu, - val, - extremities, - &state_at_incoming_event, - &state_lock, - ) - .map_err(|_| "Failed to add pdu to db.".to_owned())?, - ); - debug!("Appended incoming pdu."); - } else { - warn!("Event was soft failed: {:?}", incoming_pdu); - } + state + }; - // Set the new room state to the resolved state - if update_state { - db.rooms - .force_state(&room_id, new_room_state, &db) - .map_err(|_| "Failed to set new room state.".to_owned())?; - } - debug!("Updated resolved state"); + debug!("starting soft fail auth check"); + // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it + let soft_fail = !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + previous_create, + ¤t_state, + None, + ) + .map_err(|_e| "Auth check failed.".to_owned())?; + + let mut pdu_id = None; + if !soft_fail { + // Now that the event has passed all auth it is added into the timeline. + // We use the `state_at_event` instead of `state_after` so we accurately + // represent the state for this event. + pdu_id = Some( + append_incoming_pdu( + &db, + &incoming_pdu, + val, + extremities, + &state_at_incoming_event, + &state_lock, + ) + .map_err(|_| "Failed to add pdu to db.".to_owned())?, + ); + debug!("Appended incoming pdu."); + } else { + warn!("Event was soft failed: {:?}", incoming_pdu); + } - if soft_fail { - // Soft fail, we leave the event as an outlier but don't add it to the timeline - return Err("Event has been soft failed".into()); - } + // Set the new room state to the resolved state + if update_state { + db.rooms + .force_state(&room_id, new_room_state, &db) + .map_err(|_| "Failed to set new room state.".to_owned())?; + } + debug!("Updated resolved state"); + + if soft_fail { + // Soft fail, we leave the event as an outlier but don't add it to the timeline + return Err("Event has been soft failed".into()); + } - // Event has passed all auth/stateres checks - drop(state_lock); - Ok(pdu_id) + // Event has passed all auth/stateres checks + drop(state_lock); + Ok(pdu_id) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) @@ -1535,9 +1570,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>( ) .await { - Ok((pdu, json)) => { - (pdu, Some(json)) - } + Ok((pdu, json)) => (pdu, Some(json)), Err(e) => { warn!("Authentication of event {} failed: {:?}", id, e); back_off(id.clone());