From acfe381dd3064512272f8f47ea4dd388c04f1c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 31 Jul 2023 16:18:23 +0200 Subject: [PATCH] fix: threads get updated properly Workaround for element web while waiting for https://github.com/matrix-org/matrix-js-sdk/pull/3635 --- src/api/client_server/membership.rs | 2 +- src/api/client_server/room.rs | 3 ++ src/api/client_server/sync.rs | 3 +- src/api/server_server.rs | 4 +-- src/database/key_value/rooms/timeline.rs | 2 ++ src/main.rs | 6 ++-- src/service/pdu.rs | 13 ++++++++ src/service/rooms/event_handler/mod.rs | 42 ++++++++++++------------ src/utils/error.rs | 4 +-- 9 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/api/client_server/membership.rs b/src/api/client_server/membership.rs index c9357b20..4a1f3743 100644 --- a/src/api/client_server/membership.rs +++ b/src/api/client_server/membership.rs @@ -674,7 +674,7 @@ async fn join_room_by_id_helper( }; let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { - warn!("{:?}: {}", value, e); + warn!("Invalid PDU in send_join response: {} {:?}", e, value); Error::BadServerResponse("Invalid PDU in send_join response.") })?; diff --git a/src/api/client_server/room.rs b/src/api/client_server/room.rs index 56bdf039..420dd507 100644 --- a/src/api/client_server/room.rs +++ b/src/api/client_server/room.rs @@ -445,6 +445,9 @@ pub async fn get_room_event_route( )); } + let mut event = (*event).clone(); + event.add_age()?; + Ok(get_room_event::v3::Response { event: event.to_room_event(), }) diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs index 527625a5..7c6002ec 100644 --- a/src/api/client_server/sync.rs +++ b/src/api/client_server/sync.rs @@ -20,8 +20,9 @@ use ruma::{ StateEventType, TimelineEventType, }, serde::Raw, - uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId, + uint, DeviceId, OwnedDeviceId, OwnedEventId, OwnedUserId, RoomId, UInt, UserId, }; +use serde::Deserialize; use std::{ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, sync::Arc, diff --git a/src/api/server_server.rs b/src/api/server_server.rs index ca5b69d0..2220c4df 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -813,7 +813,7 @@ pub async fn send_transaction_message_route( .readreceipt_update(&user_id, &room_id, event)?; } else { // TODO fetch missing events - info!("No known event ids in read receipt: {:?}", user_updates); + debug!("No known event ids in read receipt: {:?}", user_updates); } } } @@ -1011,7 +1011,7 @@ pub async fn get_backfill_route( .as_ref() .expect("server is authenticated"); - info!("Got backfill request from: {}", sender_servername); + debug!("Got backfill request from: {}", sender_servername); if !services() .rooms diff --git a/src/database/key_value/rooms/timeline.rs b/src/database/key_value/rooms/timeline.rs index 74e3e5ce..5ce2136e 100644 --- a/src/database/key_value/rooms/timeline.rs +++ b/src/database/key_value/rooms/timeline.rs @@ -246,6 +246,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if pdu.sender != user_id { pdu.remove_transaction_id()?; } + pdu.add_age()?; let count = pdu_count(&pdu_id)?; Ok((count, pdu)) }), @@ -272,6 +273,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase { if pdu.sender != user_id { pdu.remove_transaction_id()?; } + pdu.add_age()?; let count = pdu_count(&pdu_id)?; Ok((count, pdu)) }), diff --git a/src/main.rs b/src/main.rs index 9b7528c4..19750383 100644 --- a/src/main.rs +++ b/src/main.rs @@ -85,6 +85,8 @@ async fn main() { config.warn_deprecated(); + let log = format!("{},ruma_state_res=error,_=off,sled=off", config.log); + if config.allow_jaeger { opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); let tracer = opentelemetry_jaeger::new_agent_pipeline() @@ -94,7 +96,7 @@ async fn main() { .unwrap(); let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let filter_layer = match EnvFilter::try_new(&config.log) { + let filter_layer = match EnvFilter::try_new(&log) { Ok(s) => s, Err(e) => { eprintln!( @@ -121,7 +123,7 @@ async fn main() { } else { let registry = tracing_subscriber::Registry::default(); let fmt_layer = tracing_subscriber::fmt::Layer::new(); - let filter_layer = match EnvFilter::try_new(&config.log) { + let filter_layer = match EnvFilter::try_new(&log) { Ok(s) => s, Err(e) => { eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}"); diff --git a/src/service/pdu.rs b/src/service/pdu.rs index d24e1746..4a170bc2 100644 --- a/src/service/pdu.rs +++ b/src/service/pdu.rs @@ -103,6 +103,19 @@ impl PduEvent { Ok(()) } + pub fn add_age(&mut self) -> crate::Result<()> { + let mut unsigned: BTreeMap> = self + .unsigned + .as_ref() + .map_or_else(|| Ok(BTreeMap::new()), |u| serde_json::from_str(u.get())) + .map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?; + + unsigned.insert("age".to_owned(), to_raw_value(&1).unwrap()); + self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid")); + + Ok(()) + } + #[tracing::instrument(skip(self))] pub fn to_sync_room_event(&self) -> Raw { let mut json = json!({ diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 89ac72ea..c6e433c8 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -357,7 +357,7 @@ impl Service { .await; // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - info!( + debug!( "Auth check for {} based on auth events", incoming_pdu.event_id ); @@ -419,7 +419,7 @@ impl Service { )); } - info!("Validation successful."); + debug!("Validation successful."); // 7. Persist the event as an outlier. services() @@ -427,7 +427,7 @@ impl Service { .outlier .add_pdu_outlier(&incoming_pdu.event_id, &val)?; - info!("Added pdu as outlier."); + debug!("Added pdu as outlier."); Ok((Arc::new(incoming_pdu), val)) }) @@ -476,7 +476,7 @@ impl Service { // TODO: if we know the prev_events of the incoming event we can avoid the request and build // the state from a known point and resolve if > 1 prev_event - info!("Requesting state at event"); + debug!("Requesting state at event"); let mut state_at_incoming_event = None; if incoming_pdu.prev_events.len() == 1 { @@ -499,7 +499,7 @@ impl Service { }; if let Some(Ok(mut state)) = state { - info!("Using cached state"); + debug!("Using cached state"); let prev_pdu = services() .rooms .timeline @@ -523,7 +523,7 @@ impl Service { state_at_incoming_event = Some(state); } } else { - info!("Calculating state at event using state res"); + debug!("Calculating state at event using state res"); let mut extremity_sstatehashes = HashMap::new(); let mut okay = true; @@ -632,7 +632,7 @@ impl Service { } if state_at_incoming_event.is_none() { - info!("Calling /state_ids"); + debug!("Calling /state_ids"); // Call /state_ids to find out what the state at this pdu is. We trust the server's // response to some extend, but we still do a lot of checks on the events match services() @@ -647,7 +647,7 @@ impl Service { .await { Ok(res) => { - info!("Fetching state events at event."); + debug!("Fetching state events at event."); let state_vec = self .fetch_and_handle_outliers( origin, @@ -710,7 +710,7 @@ impl Service { let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); - info!("Starting auth check"); + debug!("Starting auth check"); // 11. Check the auth of the event passes based on the state of the event let check_result = state_res::event_auth::auth_check( &room_version, @@ -734,7 +734,7 @@ impl Service { "Event has failed auth check with state at the event.", )); } - info!("Auth check succeeded"); + debug!("Auth check succeeded"); // Soft fail check before doing state res let auth_events = services().rooms.state.get_auth_events( @@ -769,7 +769,7 @@ impl Service { // Now we calculate the set of extremities this room has after the incoming event has been // applied. We start with the previous extremities (aka leaves) - info!("Calculating extremities"); + debug!("Calculating extremities"); let mut extremities = services().rooms.state.get_forward_extremities(room_id)?; // Remove any forward extremities that are referenced by this incoming event's prev_events @@ -790,7 +790,7 @@ impl Service { ) }); - info!("Compressing state at event"); + debug!("Compressing state at event"); let state_ids_compressed = Arc::new( state_at_incoming_event .iter() @@ -804,7 +804,7 @@ impl Service { ); if incoming_pdu.state_key.is_some() { - info!("Preparing for stateres to derive new room state"); + debug!("Preparing for stateres to derive new room state"); // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); @@ -822,7 +822,7 @@ impl Service { .await?; // Set the new room state to the resolved state - info!("Forcing new room state"); + debug!("Forcing new room state"); let (sstatehash, new, removed) = services() .rooms @@ -837,7 +837,7 @@ impl Service { } // 14. Check if the event passes auth based on the "current state" of the room, if not soft fail it - info!("Starting soft fail auth check"); + debug!("Starting soft fail auth check"); if soft_fail { services().rooms.timeline.append_incoming_pdu( @@ -861,7 +861,7 @@ impl Service { )); } - info!("Appending pdu to timeline"); + debug!("Appending pdu to timeline"); extremities.insert(incoming_pdu.event_id.clone()); // Now that the event has passed all auth it is added into the timeline. @@ -877,7 +877,7 @@ impl Service { &state_lock, )?; - info!("Appended incoming pdu"); + debug!("Appended incoming pdu"); // Event has passed all auth/stateres checks drop(state_lock); @@ -890,7 +890,7 @@ impl Service { room_version_id: &RoomVersionId, incoming_state: HashMap>, ) -> Result>> { - info!("Loading current room state ids"); + debug!("Loading current room state ids"); let current_sstatehash = services() .rooms .state @@ -917,7 +917,7 @@ impl Service { ); } - info!("Loading fork states"); + debug!("Loading fork states"); let fork_states: Vec<_> = fork_states .into_iter() @@ -935,7 +935,7 @@ impl Service { }) .collect(); - info!("Resolving state"); + debug!("Resolving state"); let lock = services().globals.stateres_mutex.lock(); let state = match state_res::resolve(room_version_id, &fork_states, auth_chain_sets, |id| { @@ -953,7 +953,7 @@ impl Service { drop(lock); - info!("State resolution done. Compressing state"); + debug!("State resolution done. Compressing state"); let new_room_state = state .into_iter() diff --git a/src/utils/error.rs b/src/utils/error.rs index 4f044ca2..5ffb38c8 100644 --- a/src/utils/error.rs +++ b/src/utils/error.rs @@ -9,7 +9,7 @@ use ruma::{ OwnedServerName, }; use thiserror::Error; -use tracing::{error, warn}; +use tracing::{error, info}; #[cfg(feature = "persy")] use persy::PersyError; @@ -131,7 +131,7 @@ impl Error { _ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR), }; - warn!("{}: {}", status_code, message); + info!("Returning an error: {}: {}", status_code, message); RumaResponse(UiaaResponse::MatrixError(RumaError { body: ErrorBody::Standard { kind, message },