diff --git a/src/client_server/account.rs b/src/client_server/account.rs index d4f103c7..b00882a6 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -733,7 +733,7 @@ pub async fn deactivate_route( pub async fn third_party_route( body: Ruma, ) -> ConduitResult { - let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let _sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(get_contacts::Response::new(Vec::new()).into()) } diff --git a/src/database.rs b/src/database.rs index e66ff04d..5ad2add8 100644 --- a/src/database.rs +++ b/src/database.rs @@ -278,6 +278,8 @@ impl Database { pdu_cache: Mutex::new(LruCache::new(100_000)), auth_chain_cache: Mutex::new(LruCache::new(100_000)), shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), + eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), + statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), stateinfo_cache: Mutex::new(LruCache::new(1000)), }, account_data: account_data::AccountData { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index e2415a41..600566c1 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -92,6 +92,8 @@ pub struct Rooms { pub(super) pdu_cache: Mutex>>, pub(super) auth_chain_cache: Mutex>>, pub(super) shorteventid_cache: Mutex>, + pub(super) eventidshort_cache: Mutex>, + pub(super) statekeyshort_cache: Mutex>, pub(super) stateinfo_cache: Mutex< LruCache< u64, @@ -665,7 +667,11 @@ impl Rooms { event_id: &EventId, globals: &super::globals::Globals, ) -> Result { - Ok(match self.eventid_shorteventid.get(event_id.as_bytes())? { + if let Some(short) = self.eventidshort_cache.lock().unwrap().get_mut(&event_id) { + return Ok(*short); + } + + let short = match self.eventid_shorteventid.get(event_id.as_bytes())? { Some(shorteventid) => utils::u64_from_bytes(&shorteventid) .map_err(|_| Error::bad_database("Invalid shorteventid in db."))?, None => { @@ -676,7 +682,14 @@ impl Rooms { .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; shorteventid } - }) + }; + + self.eventidshort_cache + .lock() + .unwrap() + .insert(event_id.clone(), short); + + Ok(short) } pub fn get_shortroomid(&self, room_id: &RoomId) -> Result> { @@ -694,17 +707,36 @@ impl Rooms { event_type: &EventType, state_key: &str, ) -> Result> { + if let Some(short) = self + .statekeyshort_cache + .lock() + .unwrap() + .get_mut(&(event_type.clone(), state_key.to_owned())) + { + return Ok(Some(*short)); + } + let mut statekey = event_type.as_ref().as_bytes().to_vec(); statekey.push(0xff); statekey.extend_from_slice(&state_key.as_bytes()); - self.statekey_shortstatekey + let short = self + .statekey_shortstatekey .get(&statekey)? .map(|shortstatekey| { utils::u64_from_bytes(&shortstatekey) .map_err(|_| Error::bad_database("Invalid shortstatekey in db.")) }) - .transpose() + .transpose()?; + + if let Some(s) = short { + self.statekeyshort_cache + .lock() + .unwrap() + .insert((event_type.clone(), state_key.to_owned()), s); + } + + Ok(short) } pub fn get_or_create_shortroomid( @@ -730,11 +762,20 @@ impl Rooms { state_key: &str, globals: &super::globals::Globals, ) -> Result { + if let Some(short) = self + .statekeyshort_cache + .lock() + .unwrap() + .get_mut(&(event_type.clone(), state_key.to_owned())) + { + return Ok(*short); + } + let mut statekey = event_type.as_ref().as_bytes().to_vec(); statekey.push(0xff); statekey.extend_from_slice(&state_key.as_bytes()); - Ok(match self.statekey_shortstatekey.get(&statekey)? { + let short = match self.statekey_shortstatekey.get(&statekey)? { Some(shortstatekey) => utils::u64_from_bytes(&shortstatekey) .map_err(|_| Error::bad_database("Invalid shortstatekey in db."))?, None => { @@ -743,7 +784,14 @@ impl Rooms { .insert(&statekey, &shortstatekey.to_be_bytes())?; shortstatekey } - }) + }; + + self.statekeyshort_cache + .lock() + .unwrap() + .insert((event_type.clone(), state_key.to_owned()), short); + + Ok(short) } pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result { @@ -2173,8 +2221,10 @@ impl Rooms { } } - self.roomserverids.insert(&roomserver_id, &[])?; - self.serverroomids.insert(&serverroom_id, &[])?; + if update_joined_count { + self.roomserverids.insert(&roomserver_id, &[])?; + self.serverroomids.insert(&serverroom_id, &[])?; + } self.userroomid_joined.insert(&userroom_id, &[])?; self.roomuserid_joined.insert(&roomuser_id, &[])?; self.userroomid_invitestate.remove(&userroom_id)?; @@ -2199,8 +2249,10 @@ impl Rooms { return Ok(()); } - self.roomserverids.insert(&roomserver_id, &[])?; - self.serverroomids.insert(&serverroom_id, &[])?; + if update_joined_count { + self.roomserverids.insert(&roomserver_id, &[])?; + self.serverroomids.insert(&serverroom_id, &[])?; + } self.userroomid_invitestate.insert( &userroom_id, &serde_json::to_vec(&last_state.unwrap_or_default()) @@ -2214,14 +2266,16 @@ impl Rooms { self.roomuserid_leftcount.remove(&roomuser_id)?; } member::MembershipState::Leave | member::MembershipState::Ban => { - if self - .room_members(room_id) - .chain(self.room_members_invited(room_id)) - .filter_map(|r| r.ok()) - .all(|u| u.server_name() != user_id.server_name()) - { - self.roomserverids.remove(&roomserver_id)?; - self.serverroomids.remove(&serverroom_id)?; + if update_joined_count { + if self + .room_members(room_id) + .chain(self.room_members_invited(room_id)) + .filter_map(|r| r.ok()) + .all(|u| u.server_name() != user_id.server_name()) + { + self.roomserverids.remove(&roomserver_id)?; + self.serverroomids.remove(&serverroom_id)?; + } } self.userroomid_leftstate.insert( &userroom_id, @@ -2245,10 +2299,52 @@ impl Rooms { } pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { - self.roomid_joinedcount.insert( - room_id.as_bytes(), - &(self.room_members(&room_id).count() as u64).to_be_bytes(), - ) + let mut joinedcount = 0_u64; + let mut joined_servers = HashSet::new(); + + for joined in self.room_members(&room_id).filter_map(|r| r.ok()) { + joined_servers.insert(joined.server_name().to_owned()); + joinedcount += 1; + } + + for invited in self.room_members_invited(&room_id).filter_map(|r| r.ok()) { + joined_servers.insert(invited.server_name().to_owned()); + } + + self.roomid_joinedcount + .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; + + for old_joined_server in self.room_servers(room_id).filter_map(|r| r.ok()) { + if !joined_servers.remove(&old_joined_server) { + // Server not in room anymore + let mut roomserver_id = room_id.as_bytes().to_vec(); + roomserver_id.push(0xff); + roomserver_id.extend_from_slice(old_joined_server.as_bytes()); + + let mut serverroom_id = old_joined_server.as_bytes().to_vec(); + serverroom_id.push(0xff); + serverroom_id.extend_from_slice(room_id.as_bytes()); + + self.roomserverids.remove(&roomserver_id)?; + self.serverroomids.remove(&serverroom_id)?; + } + } + + // Now only new servers are in joined_servers anymore + for server in joined_servers { + let mut roomserver_id = room_id.as_bytes().to_vec(); + roomserver_id.push(0xff); + roomserver_id.extend_from_slice(server.as_bytes()); + + let mut serverroom_id = server.as_bytes().to_vec(); + serverroom_id.push(0xff); + serverroom_id.extend_from_slice(room_id.as_bytes()); + + self.roomserverids.insert(&roomserver_id, &[])?; + self.serverroomids.insert(&serverroom_id, &[])?; + } + + Ok(()) } pub async fn leave_room( diff --git a/src/server_server.rs b/src/server_server.rs index de3eef57..49f225f1 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -272,14 +272,20 @@ where if status == 200 { let response = T::IncomingResponse::try_from_http_response(http_response); response.map_err(|e| { - warn!("Invalid 200 response from {}: {}", &destination, e); + warn!( + "Invalid 200 response from {} on: {} {}", + &destination, url, e + ); Error::BadServerResponse("Server returned bad 200 response.") }) } else { Err(Error::FederationError( destination.to_owned(), RumaError::try_from_http_response(http_response).map_err(|e| { - warn!("Server returned bad error response: {}", e); + warn!( + "Invalid {} response from {} on: {} {}", + status, &destination, url, e + ); Error::BadServerResponse("Server returned bad error response.") })?, )) @@ -884,15 +890,10 @@ pub async fn handle_incoming_pdu<'a>( } // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - let mut visited = HashSet::new(); + let mut graph = HashMap::new(); + let mut eventid_info = HashMap::new(); let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); - let mut todo_timeline_stack = Vec::new(); while let Some(prev_event_id) = todo_outlier_stack.pop() { - if visited.contains(&prev_event_id) { - continue; - } - visited.insert(prev_event_id.clone()); - if let Some((pdu, json_opt)) = fetch_and_handle_outliers( db, origin, @@ -914,24 +915,58 @@ pub async fn handle_incoming_pdu<'a>( .expect("Room exists") .origin_server_ts { - todo_outlier_stack.extend(pdu.prev_events.iter().cloned()); - todo_timeline_stack.push((pdu, json)); + for prev_prev in &pdu.prev_events { + if !graph.contains_key(prev_prev) { + todo_outlier_stack.push(dbg!(prev_prev.clone())); + } + } + + graph.insert( + prev_event_id.clone(), + pdu.prev_events.iter().cloned().collect(), + ); + eventid_info.insert(prev_event_id.clone(), (pdu, json)); + } else { + graph.insert(prev_event_id.clone(), HashSet::new()); + eventid_info.insert(prev_event_id.clone(), (pdu, json)); } + } else { + graph.insert(prev_event_id.clone(), HashSet::new()); } } } - while let Some(prev) = todo_timeline_stack.pop() { - upgrade_outlier_to_timeline_pdu( - prev.0, - prev.1, - &create_event, - origin, - db, - room_id, - pub_key_map, - ) - .await?; + let sorted = + state_res::StateResolution::lexicographical_topological_sort(dbg!(&graph), |event_id| { + // This return value is the key used for sorting events, + // events are then sorted by power level, time, + // and lexically by event_id. + println!("{}", event_id); + Ok(( + 0, + MilliSecondsSinceUnixEpoch( + eventid_info + .get(event_id) + .map_or_else(|| uint!(0), |info| info.0.origin_server_ts.clone()), + ), + ruma::event_id!("$notimportant"), + )) + }) + .map_err(|_| "Error sorting prev events".to_owned())?; + + for prev_id in dbg!(sorted) { + if let Some((pdu, json)) = eventid_info.remove(&prev_id) { + upgrade_outlier_to_timeline_pdu( + pdu, + json, + &create_event, + origin, + db, + room_id, + pub_key_map, + ) + .await?; + } } upgrade_outlier_to_timeline_pdu( @@ -1872,8 +1907,7 @@ fn get_auth_chain( full_auth_chain.extend(cached.iter().cloned()); } else { drop(cache); - let mut auth_chain = HashSet::new(); - get_auth_chain_recursive(&event_id, &mut auth_chain, db)?; + let auth_chain = get_auth_chain_inner(&event_id, db)?; cache = db.rooms.auth_chain_cache(); cache.insert(sevent_id, auth_chain.clone()); full_auth_chain.extend(auth_chain); @@ -1887,33 +1921,34 @@ fn get_auth_chain( .filter_map(move |sid| db.rooms.get_eventid_from_short(sid).ok())) } -fn get_auth_chain_recursive( - event_id: &EventId, - found: &mut HashSet, - db: &Database, -) -> Result<()> { - let r = db.rooms.get_pdu(&event_id); - match r { - Ok(Some(pdu)) => { - for auth_event in &pdu.auth_events { - let sauthevent = db - .rooms - .get_or_create_shorteventid(auth_event, &db.globals)?; - if !found.contains(&sauthevent) { - found.insert(sauthevent); - get_auth_chain_recursive(&auth_event, found, db)?; +fn get_auth_chain_inner(event_id: &EventId, db: &Database) -> Result> { + let mut todo = vec![event_id.clone()]; + let mut found = HashSet::new(); + + while let Some(event_id) = todo.pop() { + match db.rooms.get_pdu(&event_id) { + Ok(Some(pdu)) => { + for auth_event in &pdu.auth_events { + let sauthevent = db + .rooms + .get_or_create_shorteventid(auth_event, &db.globals)?; + + if !found.contains(&sauthevent) { + found.insert(sauthevent); + todo.push(auth_event.clone()); + } } } - } - Ok(None) => { - warn!("Could not find pdu mentioned in auth events."); - } - Err(e) => { - warn!("Could not load event in auth chain: {}", e); + Ok(None) => { + warn!("Could not find pdu mentioned in auth events: {}", event_id); + } + Err(e) => { + warn!("Could not load event in auth chain: {} {}", event_id, e); + } } } - Ok(()) + Ok(found) } #[cfg_attr(