Finish forward extremity gathering, use resolved state as new snapshot

merge-requests/44/merge
Devin Ragotzy 3 years ago
parent a119d858f3
commit 35c1904b37

@ -18,6 +18,7 @@ use ruma::{
OutgoingRequest, OutgoingRequest,
}, },
directory::{IncomingFilter, IncomingRoomNetwork}, directory::{IncomingFilter, IncomingRoomNetwork},
events::EventType,
serde::to_canonical_value, serde::to_canonical_value,
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap},
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
@ -483,34 +484,6 @@ pub async fn get_public_rooms_route(
.into()) .into())
} }
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum PrevEvents<T> {
Sequential(T),
Fork(Vec<T>),
}
impl<T> IntoIterator for PrevEvents<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
match self {
Self::Sequential(item) => vec![item].into_iter(),
Self::Fork(list) => list.into_iter(),
}
}
}
impl<T: Clone> PrevEvents<T> {
pub fn new(id: &[T]) -> Self {
match id {
[] => panic!("All events must have previous event"),
[single_id] => Self::Sequential(single_id.clone()),
rest => Self::Fork(rest.to_vec()),
}
}
}
#[cfg_attr( #[cfg_attr(
feature = "conduit_bin", feature = "conduit_bin",
put("/_matrix/federation/v1/send/<_>", data = "<body>") put("/_matrix/federation/v1/send/<_>", data = "<body>")
@ -605,8 +578,16 @@ pub async fn send_transaction_message_route<'a>(
UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field");
let origin = sender.server_name(); let origin = sender.server_name();
// TODO: this could fail or the server not respond... let keys = match fetch_signing_keys(&db, origin).await {
let keys = fetch_signing_keys(&db, origin).await?; Ok(keys) => keys,
Err(_) => {
resolved_map.insert(
event_id,
Err("Could not find signing keys for this server".to_string()),
);
continue;
}
};
pub_key_map.insert( pub_key_map.insert(
origin.to_string(), origin.to_string(),
@ -769,11 +750,12 @@ pub async fn send_transaction_message_route<'a>(
// //
// calculate_forward_extremities takes care of adding the current state if not already in the state sets // calculate_forward_extremities takes care of adding the current state if not already in the state sets
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
let (mut fork_states, fork_ids) = match calculate_forward_extremities( let (mut fork_states, extremities) = match calculate_forward_extremities(
&db, &db,
&pdu, &pdu,
server_name, server_name,
&pub_key_map, &pub_key_map,
current_state,
&mut auth_cache, &mut auth_cache,
) )
.await .await
@ -791,6 +773,7 @@ pub async fn send_transaction_message_route<'a>(
let fork_states = fork_states.into_iter().collect::<Vec<_>>(); let fork_states = fork_states.into_iter().collect::<Vec<_>>();
let mut update_state = false;
// 13. start state-res with all previous forward extremities minus the ones that are in // 13. start state-res with all previous forward extremities minus the ones that are in
// the prev_events of this event plus the new one created by this event and use // the prev_events of this event plus the new one created by this event and use
// the result as the new room state // the result as the new room state
@ -800,11 +783,12 @@ pub async fn send_transaction_message_route<'a>(
} else if fork_states.len() == 1 { } else if fork_states.len() == 1 {
fork_states[0].clone() fork_states[0].clone()
} else { } else {
// We do need to force an update to this rooms state
update_state = true;
// TODO: remove this is for current debugging Jan, 15 2021 // TODO: remove this is for current debugging Jan, 15 2021
let mut number_fetches = 0_u32; let mut number_fetches = 0_u32;
let mut auth_events = vec![]; let mut auth_events = vec![];
// this keeps track if we error so we can break out of these inner loops
// to continue on with the incoming PDU's
for map in &fork_states { for map in &fork_states {
let mut state_auth = vec![]; let mut state_auth = vec![];
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) {
@ -821,14 +805,12 @@ pub async fn send_transaction_message_route<'a>(
.await .await
.map(|mut vec| { .map(|mut vec| {
number_fetches += 1; number_fetches += 1;
vec.remove(0) vec.pop()
}) { }) {
Ok(aev) => aev, Ok(Some(aev)) => aev,
Err(_) => { _ => {
resolved_map.insert( resolved_map
event_id.clone(), .insert(event_id.clone(), Err("Failed to fetch event".into()));
Err("Event has been soft failed".into()),
);
continue 'main_pdu_loop; continue 'main_pdu_loop;
} }
}, },
@ -839,20 +821,19 @@ pub async fn send_transaction_message_route<'a>(
} }
info!("{} event's were not in the auth_cache", number_fetches); info!("{} event's were not in the auth_cache", number_fetches);
let mut event_map = EventMap::new();
// Add everything we will need to event_map // Add everything we will need to event_map
event_map.extend( auth_cache.extend(
auth_events auth_events
.iter() .iter()
.map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) .map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone())))
.flatten(), .flatten(),
); );
event_map.extend( auth_cache.extend(
incoming_auth_events incoming_auth_events
.into_iter() .into_iter()
.map(|pdu| (pdu.event_id().clone(), pdu)), .map(|pdu| (pdu.event_id().clone(), pdu)),
); );
event_map.extend( auth_cache.extend(
state_at_event state_at_event
.into_iter() .into_iter()
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)), .map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
@ -873,7 +854,7 @@ pub async fn send_transaction_message_route<'a>(
.into_iter() .into_iter()
.map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) .map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect())
.collect(), .collect(),
&mut event_map, &mut auth_cache,
) { ) {
Ok(res) => res Ok(res) => res
.into_iter() .into_iter()
@ -905,14 +886,23 @@ pub async fn send_transaction_message_route<'a>(
); );
} else { } else {
// Add the event to the DB and update the forward extremities (via roomid_pduleaves). // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
append_state(&db, &pdu, &fork_ids)?; append_incoming_pdu(
&db,
&pdu,
&extremities,
if update_state {
Some(state_at_forks)
} else {
None
},
)?;
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
resolved_map.insert(pdu.event_id().clone(), Ok(())); resolved_map.insert(pdu.event_id().clone(), Ok(()));
} }
} }
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
} }
/// An async function that can recursively calls itself. /// An async function that can recursively calls itself.
@ -1029,6 +1019,7 @@ async fn fetch_check_auth_events(
continue; continue;
} }
// TODO: Batch these async calls so we can wait on multiple at once
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await .await
.map(|mut vec| { .map(|mut vec| {
@ -1119,6 +1110,7 @@ async fn calculate_forward_extremities(
pdu: &PduEvent, pdu: &PduEvent,
origin: &ServerName, origin: &ServerName,
pub_key_map: &PublicKeyMap, pub_key_map: &PublicKeyMap,
current_state: BTreeMap<(EventType, Option<String>), Arc<PduEvent>>,
auth_cache: &mut EventMap<Arc<PduEvent>>, auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> { ) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> {
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
@ -1126,17 +1118,13 @@ async fn calculate_forward_extremities(
let mut is_incoming_leaf = true; let mut is_incoming_leaf = true;
// Make sure the incoming event is not already a forward extremity // Make sure the incoming event is not already a forward extremity
// FIXME: I think this could happen if different servers send us the same event?? // FIXME: I think this could happen if different servers send us the same event??
if current_leaves.contains(pdu.event_id()) { //
is_incoming_leaf = false;
// Not sure what to do here
}
// If the incoming event is already referenced by an existing event // If the incoming event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if // then do nothing - it's not a candidate to be a new extremity if
// it has been referenced. // it has been referenced.
if already_referenced(db, pdu)? { if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() {
is_incoming_leaf = false; is_incoming_leaf = false;
// This event has been dealt with already?? // Not sure what to do here
} }
// TODO: // TODO:
@ -1213,29 +1201,54 @@ async fn calculate_forward_extremities(
// This guarantees that our current room state is included // This guarantees that our current room state is included
if !includes_current_state && current_hash.is_some() { if !includes_current_state && current_hash.is_some() {
fork_states.insert( fork_states.insert(current_state);
db.rooms
.state_full(pdu.room_id(), current_hash.as_ref().unwrap())?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(),
);
} }
Ok((fork_states, dbg!(current_leaves))) Ok((fork_states, dbg!(current_leaves)))
} }
/// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG) /// Update the room state to be the resolved state and add the fully auth'ed event
fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result<bool> { /// to the DB.
Ok(false) ///
} /// TODO: If we force the state we need to validate all events in that state
/// any events we fetched from another server need to be fully verified?
fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> { fn append_incoming_pdu(
db: &Database,
pdu: &PduEvent,
new_room_leaves: &[EventId],
state: Option<StateMap<Arc<PduEvent>>>,
) -> Result<()> {
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes()); pdu_id.extend_from_slice(&count.to_be_bytes());
// Update the state of the room if needed
// We can tell if we need to do this based on wether state resolution took place or not
if let Some(state) = state {
let new = state
.into_iter()
.map(|((ev, k), pdu)| {
Ok((
(
ev,
k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
db.rooms
.get_pdu_id(pdu.event_id())
.ok()
.flatten()
.ok_or_else(|| Error::Conflict("Resolved state contained unknown event"))?
.to_vec(),
))
})
.collect::<Result<_>>()?;
info!("Force update of state for {:?}", pdu);
db.rooms.force_state(pdu.room_id(), new, &db.globals)?;
}
// We append to state before appending the pdu, so we don't have a moment in time with the // We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;

Loading…
Cancel
Save