diff --git a/src/database/sending.rs b/src/database/sending.rs index 8c487e17..dfb7fa9a 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -8,7 +8,7 @@ use std::{ use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; use federation::transactions::send_transaction_message; -use log::info; +use log::{info, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ @@ -36,6 +36,7 @@ impl Sending { ) { let servernamepduids = self.servernamepduids.clone(); let servercurrentpdus = self.servercurrentpdus.clone(); + let maximum_requests = self.maximum_requests.clone(); let rooms = rooms.clone(); let globals = globals.clone(); let appservice = appservice.clone(); @@ -44,23 +45,43 @@ impl Sending { let mut futures = FuturesUnordered::new(); // Retry requests we could not finish yet - let mut current_transactions = HashMap::new(); + let mut current_transactions = HashMap::<(Box, bool), Vec>::new(); - for (server, pdu, is_appservice) in servercurrentpdus + for (key, server, pdu, is_appservice) in servercurrentpdus .iter() .filter_map(|r| r.ok()) .filter_map(|(key, _)| Self::parse_servercurrentpdus(key).ok()) - .filter(|(_, pdu, _)| !pdu.is_empty()) // Skip reservation key - .take(50) - // This should not contain more than 50 anyway { - current_transactions + if pdu.is_empty() { + // Remove old reservation key + servercurrentpdus.remove(key).unwrap(); + continue; + } + + let entry = current_transactions .entry((server, is_appservice)) - .or_insert_with(Vec::new) - .push(pdu); + .or_insert_with(Vec::new); + + if entry.len() > 30 { + warn!("Dropping some current pdus because too many were queued. This should not happen."); + servercurrentpdus.remove(key).unwrap(); + continue; + } + + entry.push(pdu); } for ((server, is_appservice), pdus) in current_transactions { + // Create new reservation + let mut prefix = if is_appservice { + "+".as_bytes().to_vec() + } else { + Vec::new() + }; + prefix.extend_from_slice(server.as_bytes()); + prefix.push(0xff); + servercurrentpdus.insert(prefix, &[]).unwrap(); + futures.push(Self::handle_event( server, is_appservice, @@ -68,6 +89,7 @@ impl Sending { &globals, &rooms, &appservice, + &maximum_requests, )); } @@ -106,7 +128,7 @@ impl Sending { .map(|k| { k.subslice(prefix.len(), k.len() - prefix.len()) }) - .take(50) + .take(30) .collect::>(); if !new_pdus.is_empty() { @@ -117,7 +139,7 @@ impl Sending { servernamepduids.remove(¤t_key).unwrap(); } - futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice)); + futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice, &maximum_requests)); } else { servercurrentpdus.remove(&prefix).unwrap(); // servercurrentpdus with the prefix should be empty now @@ -194,15 +216,17 @@ impl Sending { prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); - servercurrentpdus + if servercurrentpdus .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve - == Ok(Ok(())) + == Ok(Ok(())) { true } else { + false + } }) { servercurrentpdus.insert(&key, &[]).unwrap(); servernamepduids.remove(&key).unwrap(); - futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice)); + futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice, &maximum_requests)); } } } @@ -244,6 +268,7 @@ impl Sending { globals: &super::globals::Globals, rooms: &super::rooms::Rooms, appservice: &super::appservice::Appservice, + maximum_requests: &Semaphore, ) -> std::result::Result<(Box, bool), (Box, bool, Error)> { if is_appservice { let pdu_jsons = pdu_ids @@ -266,7 +291,9 @@ impl Sending { }) .filter_map(|r| r.ok()) .collect::>(); - appservice_server::send_request( + + let permit = maximum_requests.acquire().await; + let response = appservice_server::send_request( &globals, appservice .get_registration(server.as_str()) @@ -282,7 +309,11 @@ impl Sending { ) .await .map(|_response| (server.clone(), is_appservice)) - .map_err(|e| (server, is_appservice, e)) + .map_err(|e| (server, is_appservice, e)); + + drop(permit); + + response } else { let pdu_jsons = pdu_ids .iter() @@ -312,7 +343,8 @@ impl Sending { .filter_map(|r| r.ok()) .collect::>(); - server_server::send_request( + let permit = maximum_requests.acquire().await; + let response = server_server::send_request( &globals, server.clone(), send_transaction_message::v1::Request { @@ -328,12 +360,17 @@ impl Sending { ) .await .map(|_response| (server.clone(), is_appservice)) - .map_err(|e| (server, is_appservice, e)) + .map_err(|e| (server, is_appservice, e)); + + drop(permit); + + response } } - fn parse_servercurrentpdus(key: IVec) -> Result<(Box, IVec, bool)> { - let mut parts = key.splitn(2, |&b| b == 0xff); + fn parse_servercurrentpdus(key: IVec) -> Result<(IVec, Box, IVec, bool)> { + let key2 = key.clone(); + let mut parts = key2.splitn(2, |&b| b == 0xff); let server = parts.next().expect("splitn always returns one element"); let pdu = parts .next() @@ -351,6 +388,7 @@ impl Sending { }; Ok::<_, Error>(( + key, Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?, diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 45fcc7fb..898561f8 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -82,9 +82,7 @@ where registration .get("as_token") .and_then(|as_token| as_token.as_str()) - .map_or(false, |as_token| { - dbg!(token.as_deref()) == dbg!(Some(as_token)) - }) + .map_or(false, |as_token| token.as_deref() == Some(as_token)) }) { match T::METADATA.authentication { AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => {