|
|
|
@ -5,15 +5,16 @@ pub use data::Data;
|
|
|
|
|
use std::{
|
|
|
|
|
collections::{BTreeMap, HashMap, HashSet},
|
|
|
|
|
fmt::Debug,
|
|
|
|
|
iter,
|
|
|
|
|
sync::Arc,
|
|
|
|
|
time::{Duration, Instant}, iter,
|
|
|
|
|
time::{Duration, Instant},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
|
api::{appservice_server, server_server},
|
|
|
|
|
services,
|
|
|
|
|
utils::{self, calculate_hash},
|
|
|
|
|
Error, PduEvent, Result, Config,
|
|
|
|
|
Config, Error, PduEvent, Result,
|
|
|
|
|
};
|
|
|
|
|
use federation::transactions::send_transaction_message;
|
|
|
|
|
use futures_util::{stream::FuturesUnordered, StreamExt};
|
|
|
|
@ -100,7 +101,11 @@ impl Service {
|
|
|
|
|
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
|
|
|
|
|
let (sender, receiver) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
|
|
let self1 = Arc::new(Self { db, sender, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)) });
|
|
|
|
|
let self1 = Arc::new(Self {
|
|
|
|
|
db,
|
|
|
|
|
sender,
|
|
|
|
|
maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)),
|
|
|
|
|
});
|
|
|
|
|
let self2 = Arc::clone(&self1);
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
@ -110,7 +115,10 @@ impl Service {
|
|
|
|
|
self1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn start_handler(&self, mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>) -> Result<()> {
|
|
|
|
|
async fn start_handler(
|
|
|
|
|
&self,
|
|
|
|
|
mut receiver: mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let mut futures = FuturesUnordered::new();
|
|
|
|
|
|
|
|
|
|
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
|
|
|
|
@ -118,8 +126,7 @@ impl Service {
|
|
|
|
|
// Retry requests we could not finish yet
|
|
|
|
|
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
|
|
|
|
|
|
|
|
|
|
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok())
|
|
|
|
|
{
|
|
|
|
|
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(|r| r.ok()) {
|
|
|
|
|
let entry = initial_transactions
|
|
|
|
|
.entry(outgoing_kind.clone())
|
|
|
|
|
.or_insert_with(Vec::new);
|
|
|
|
@ -137,8 +144,7 @@ impl Service {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (outgoing_kind, events) in initial_transactions {
|
|
|
|
|
current_transaction_status
|
|
|
|
|
.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
|
|
|
|
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
|
|
|
|
futures.push(Self::handle_events(outgoing_kind.clone(), events));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -235,7 +241,11 @@ impl Service {
|
|
|
|
|
|
|
|
|
|
if retry {
|
|
|
|
|
// We retry the previous transaction
|
|
|
|
|
for (_, e) in self.db.active_requests_for(outgoing_kind).filter_map(|r| r.ok()) {
|
|
|
|
|
for (_, e) in self
|
|
|
|
|
.db
|
|
|
|
|
.active_requests_for(outgoing_kind)
|
|
|
|
|
.filter_map(|r| r.ok())
|
|
|
|
|
{
|
|
|
|
|
events.push(e);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -276,7 +286,12 @@ impl Service {
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Look for read receipts in this room
|
|
|
|
|
for r in services().rooms.edus.read_receipt.readreceipts_since(&room_id, since) {
|
|
|
|
|
for r in services()
|
|
|
|
|
.rooms
|
|
|
|
|
.edus
|
|
|
|
|
.read_receipt
|
|
|
|
|
.readreceipts_since(&room_id, since)
|
|
|
|
|
{
|
|
|
|
|
let (user_id, count, read_receipt) = r?;
|
|
|
|
|
|
|
|
|
|
if count > max_edu_count {
|
|
|
|
@ -359,7 +374,9 @@ impl Service {
|
|
|
|
|
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
|
|
|
|
|
let event = SendingEventType::Pdu(pdu_id.to_owned());
|
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
|
|
|
|
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();
|
|
|
|
|
self.sender
|
|
|
|
|
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
@ -370,10 +387,25 @@ impl Service {
|
|
|
|
|
servers: I,
|
|
|
|
|
pdu_id: &[u8],
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let requests = servers.into_iter().map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))).collect::<Vec<_>>();
|
|
|
|
|
let keys = self.db.queue_requests(&requests.iter().map(|(o, e)| (o, e.clone())).collect::<Vec<_>>())?;
|
|
|
|
|
let requests = servers
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|server| {
|
|
|
|
|
(
|
|
|
|
|
OutgoingKind::Normal(server),
|
|
|
|
|
SendingEventType::Pdu(pdu_id.to_owned()),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
let keys = self.db.queue_requests(
|
|
|
|
|
&requests
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|(o, e)| (o, e.clone()))
|
|
|
|
|
.collect::<Vec<_>>(),
|
|
|
|
|
)?;
|
|
|
|
|
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
|
|
|
|
|
self.sender.send((outgoing_kind.to_owned(), event, key)).unwrap();
|
|
|
|
|
self.sender
|
|
|
|
|
.send((outgoing_kind.to_owned(), event, key))
|
|
|
|
|
.unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
@ -389,7 +421,9 @@ impl Service {
|
|
|
|
|
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
|
|
|
|
|
let event = SendingEventType::Edu(serialized);
|
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
|
|
|
|
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();
|
|
|
|
|
self.sender
|
|
|
|
|
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
@ -399,7 +433,9 @@ impl Service {
|
|
|
|
|
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
|
|
|
|
|
let event = SendingEventType::Pdu(pdu_id);
|
|
|
|
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
|
|
|
|
self.sender.send((outgoing_kind, event, keys.into_iter().next().unwrap())).unwrap();
|
|
|
|
|
self.sender
|
|
|
|
|
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
@ -409,7 +445,8 @@ impl Service {
|
|
|
|
|
///
|
|
|
|
|
#[tracing::instrument(skip(self))]
|
|
|
|
|
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
|
|
|
|
|
self.db.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;
|
|
|
|
|
self.db
|
|
|
|
|
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
@ -638,7 +675,6 @@ impl Service {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip(self, destination, request))]
|
|
|
|
|
pub async fn send_federation_request<T: OutgoingRequest>(
|
|
|
|
|
&self,
|
|
|
|
|