@ -28,11 +28,44 @@ pub enum OutgoingKind {
Normal ( Box < ServerName > ) ,
}
impl OutgoingKind {
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
}
OutgoingKind ::Normal ( server ) = > {
let mut p = Vec ::new ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
} ;
prefix . push ( 0xff ) ;
prefix
}
}
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum SendingEventType {
Pdu ( Vec < u8 > ) ,
Edu ( Vec < u8 > ) ,
}
#[ derive(Clone) ]
pub struct Sending {
/// The state for a given state hash.
pub ( super ) servernamepduids : sled ::Tree , // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
pub ( super ) servercurrentpdus : sled ::Tree , // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId
pub ( super ) servercurrent events: sled ::Tree , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent
pub ( super ) maximum_requests : Arc < Semaphore > ,
}
@ -45,7 +78,7 @@ enum TransactionStatus {
impl Sending {
pub fn start_handler ( & self , db : & Database ) {
let servernamepduids = self . servernamepduids . clone ( ) ;
let servercurrent pdus = self . servercurrentpdu s. clone ( ) ;
let servercurrent events = self . servercurrentevent s. clone ( ) ;
let db = db . clone ( ) ;
@ -56,14 +89,14 @@ impl Sending {
let mut subscriber = servernamepduids . watch_prefix ( b" " ) ;
let mut current_transaction_status = HashMap ::< Vec < u8 > , TransactionStatus > ::new ( ) ;
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < Vec < u8 > > > ::new ( ) ;
for ( key , outgoing_kind , pdu) in servercurrentpdu s
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType > > ::new ( ) ;
for ( key , outgoing_kind , event) in servercurrentevent s
. iter ( )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | ( key , _ ) | {
Self ::parse_servercurrent pdus ( & key )
Self ::parse_servercurrent event ( & key )
. ok ( )
. map ( | ( k , p ) | ( key , k , p. to_v ec( ) ) )
. map ( | ( k , e ) | ( key , k , e) )
} )
{
let entry = initial_transactions
@ -72,39 +105,20 @@ impl Sending {
if entry . len ( ) > 30 {
warn ! (
"Dropping some current pdu : {:?} {:?} {:?}",
key , outgoing_kind , pdu
"Dropping some current events : {:?} {:?} {:?}",
key , outgoing_kind , event
) ;
servercurrent pdu s. remove ( key ) . unwrap ( ) ;
servercurrent event s. remove ( key ) . unwrap ( ) ;
continue ;
}
entry . push ( pdu ) ;
entry . push ( event ) ;
}
for ( outgoing_kind , pdus ) in initial_transactions {
let mut prefix = match & outgoing_kind {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
}
OutgoingKind ::Normal ( server ) = > {
let mut p = Vec ::new ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
} ;
prefix . push ( 0xff ) ;
current_transaction_status . insert ( prefix , TransactionStatus ::Running ) ;
futures . push ( Self ::handle_event ( outgoing_kind . clone ( ) , pdus , & db ) ) ;
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status
. insert ( outgoing_kind . get_prefix ( ) , TransactionStatus ::Running ) ;
futures . push ( Self ::handle_events ( outgoing_kind . clone ( ) , events , & db ) ) ;
}
loop {
@ -112,58 +126,46 @@ impl Sending {
Some ( response ) = futures . next ( ) = > {
match response {
Ok ( outgoing_kind ) = > {
let mut prefix = match & outgoing_kind {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
} ,
OutgoingKind ::Normal ( server ) = > {
let mut p = vec! [ ] ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
} ,
} ;
prefix . push ( 0xff ) ;
for key in servercurrentpdus
let prefix = outgoing_kind . get_prefix ( ) ;
for key in servercurrentevents
. scan_prefix ( & prefix )
. keys ( )
. filter_map ( | r | r . ok ( ) )
{
servercurrent pdu s. remove ( key ) . unwrap ( ) ;
servercurrentevents . remove ( key ) . unwrap ( ) ;
}
// Find events that have been added since starting the last request
let new_ pdu s = servernamepduids
let new_events = servernamepduids
. scan_prefix ( & prefix )
. keys ( )
. filter_map ( | r | r . ok ( ) )
. map ( | k | {
k[ prefix . len ( ) .. ] . to_vec ( )
SendingEventType ::Pdu ( k [ prefix . len ( ) .. ] . to_vec ( ) )
} )
. take ( 30 )
. collect ::< Vec < _ > > ( ) ;
if ! new_pdus . is_empty ( ) {
for pdu_id in & new_pdus {
// TODO: find edus
if ! new_events . is_empty ( ) {
// Insert pdus we found
for event in & new_events {
let mut current_key = prefix . clone ( ) ;
current_key . extend_from_slice ( pdu_id ) ;
servercurrentpdus . insert ( & current_key , & [ ] ) . unwrap ( ) ;
match event {
SendingEventType ::Pdu ( b ) |
SendingEventType ::Edu ( b ) = > {
current_key . extend_from_slice ( & b ) ;
servercurrentevents . insert ( & current_key , & [ ] ) . unwrap ( ) ;
servernamepduids . remove ( & current_key ) . unwrap ( ) ;
}
}
}
futures . push (
Self ::handle_event (
Self ::handle_event s (
outgoing_kind . clone ( ) ,
new_ pdu s,
new_ event s,
& db ,
)
) ;
@ -172,29 +174,7 @@ impl Sending {
}
}
Err ( ( outgoing_kind , _ ) ) = > {
let mut prefix = match & outgoing_kind {
OutgoingKind ::Appservice ( serv ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( serv . as_bytes ( ) ) ;
p
} ,
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
} ,
OutgoingKind ::Normal ( serv ) = > {
let mut p = vec! [ ] ;
p . extend_from_slice ( serv . as_bytes ( ) ) ;
p
} ,
} ;
prefix . push ( 0xff ) ;
current_transaction_status . entry ( prefix ) . and_modify ( | e | * e = match e {
current_transaction_status . entry ( outgoing_kind . get_prefix ( ) ) . and_modify ( | e | * e = match e {
TransactionStatus ::Running = > TransactionStatus ::Failed ( 1 , Instant ::now ( ) ) ,
TransactionStatus ::Retrying ( n ) = > TransactionStatus ::Failed ( * n + 1 , Instant ::now ( ) ) ,
TransactionStatus ::Failed ( _ , _ ) = > {
@ -206,51 +186,43 @@ impl Sending {
} ;
} ,
Some ( event ) = & mut subscriber = > {
if let sled ::Event ::Insert { key , .. } = event {
// New sled version:
//for (_tree, key, value_opt) in &event {
// if value_opt.is_none() {
// continue;
// }
let servernamepduid = key . clone ( ) ;
if let sled ::Event ::Insert { key , .. } = event {
if let Ok ( ( outgoing_kind , event ) ) = Self ::parse_servercurrentevent ( & key ) {
if let Some ( events ) = Self ::select_events ( & outgoing_kind , vec! [ ( event , key ) ] , & mut current_transaction_status , & servercurrentevents , & servernamepduids ) {
futures . push ( Self ::handle_events ( outgoing_kind , events , & db ) ) ;
}
}
}
}
}
}
} ) ;
}
fn select_events (
outgoing_kind : & OutgoingKind ,
new_events : Vec < ( SendingEventType , IVec ) > , // Events we want to send: event and full key
current_transaction_status : & mut HashMap < Vec < u8 > , TransactionStatus > ,
servercurrentevents : & sled ::Tree ,
servernamepduids : & sled ::Tree ,
) -> Option < Vec < SendingEventType > > {
let mut retry = false ;
let mut allow = true ;
if let Some ( ( outgoing_kind , prefix , pdu_id ) ) = Self ::parse_servercurrentpdus ( & servernamepduid )
. ok ( )
. map ( | ( outgoing_kind , pdu_id ) | {
let mut prefix = match & outgoing_kind {
OutgoingKind ::Appservice ( serv ) = > {
let mut p = b" + " . to_vec ( ) ;
p . extend_from_slice ( serv . as_bytes ( ) ) ;
p
} ,
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b" $ " . to_vec ( ) ;
p . extend_from_slice ( & user ) ;
p . push ( 0xff ) ;
p . extend_from_slice ( & pushkey ) ;
p
} ,
OutgoingKind ::Normal ( serv ) = > {
let mut p = vec! [ ] ;
p . extend_from_slice ( serv . as_bytes ( ) ) ;
p
} ,
} ;
prefix . push ( 0xff ) ;
( outgoing_kind , prefix , pdu_id )
} )
. filter ( | ( _ , prefix , _ ) | {
let prefix = outgoing_kind . get_prefix ( ) ;
let entry = current_transaction_status . entry ( prefix . clone ( ) ) ;
let mut allow = true ;
entry . and_modify ( | e | match e {
entry
. and_modify ( | e | match e {
TransactionStatus ::Running | TransactionStatus ::Retrying ( _ ) = > {
allow = false ; // already running
} ,
}
TransactionStatus ::Failed ( tries , time ) = > {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration ::from_secs ( 30 ) * ( * tries ) * ( * tries ) ;
@ -265,44 +237,39 @@ impl Sending {
* e = TransactionStatus ::Retrying ( * tries ) ;
}
}
} ) . or_insert ( TransactionStatus ::Running ) ;
allow
} )
{
let mut pdus = Vec ::new ( ) ;
. or_insert ( TransactionStatus ::Running ) ;
if ! allow {
return None ;
}
let mut events = Vec ::new ( ) ;
if retry {
// We retry the previous transaction
for pdu in servercurrentpdus
for key in servercurrentevent s
. scan_prefix ( & prefix )
. keys ( )
. filter_map ( | r | r . ok ( ) )
. filter_map ( | ( key , _ ) | {
Self ::parse_servercurrentpdus ( & key )
. ok ( )
. map ( | ( _ , p ) | p . to_vec ( ) )
} )
{
pdus . push ( pdu ) ;
}
} else {
servercurrentpdus . insert ( & key , & [ ] ) . unwrap ( ) ;
servernamepduids . remove ( & key ) . unwrap ( ) ;
pdus . push ( pdu_id . to_vec ( ) ) ;
}
futures . push (
Self ::handle_event (
outgoing_kind ,
pdus ,
& db ,
)
) ;
}
if let Ok ( ( _ , e ) ) = Self ::parse_servercurrentevent ( & key ) {
events . push ( e ) ;
}
}
} else {
for ( e , full_key ) in new_events {
servercurrentevents . insert ( & full_key , & [ ] ) . unwrap ( ) ;
// If it was a PDU we have to unqueue it
// TODO: don't try to unqueue EDUs
servernamepduids . remove ( & full_key ) . unwrap ( ) ;
events . push ( e ) ;
}
}
} ) ;
Some ( events )
}
#[ tracing::instrument(skip(self)) ]
@ -338,7 +305,7 @@ impl Sending {
}
#[ tracing::instrument ]
fn calculate_hash ( keys : & [ Vec < u8 > ] ) -> Vec < u8 > {
fn calculate_hash ( keys : & [ & [ u8 ] ] ) -> Vec < u8 > {
// We only hash the pdu's event ids, not the whole pdu
let bytes = keys . join ( & 0xff ) ;
let hash = digest ::digest ( & digest ::SHA256 , & bytes ) ;
@ -346,33 +313,37 @@ impl Sending {
}
#[ tracing::instrument(skip(db)) ]
async fn handle_event (
async fn handle_event s (
kind : OutgoingKind ,
pdu_ids: Vec < Vec < u8 > > ,
events: Vec < SendingEventType > ,
db : & Database ,
) -> std ::result ::Result < OutgoingKind , ( OutgoingKind , Error ) > {
match & kind {
OutgoingKind ::Appservice ( server ) = > {
let pdu_jsons = pdu_ids
. iter ( )
. map ( | pdu_id | {
Ok ::< _ , ( Box < ServerName > , Error ) > (
db . rooms
. get_pdu_from_id ( pdu_id )
. map_err ( | e | ( server . clone ( ) , e ) ) ?
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdu_jsons . push ( db . rooms
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
server . clone ( ) ,
kind . clone ( ) ,
Error ::bad_database (
"[Appservice] Event in servernamepduids not found in db." ,
) ,
)
} ) ?
. to_any_event ( ) ,
)
} )
. filter_map ( | r | r . ok ( ) )
. collect ::< Vec < _ > > ( ) ;
. to_any_event ( ) )
}
SendingEventType ::Edu ( _ ) = > {
// Appservices don't need EDUs (?)
}
}
}
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let response = appservice_server ::send_request (
@ -384,7 +355,14 @@ impl Sending {
appservice ::event ::push_events ::v1 ::Request {
events : & pdu_jsons ,
txn_id : & base64 ::encode_config (
Self ::calculate_hash ( & pdu_ids ) ,
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
base64 ::URL_SAFE_NO_PAD ,
) ,
} ,
@ -398,25 +376,30 @@ impl Sending {
response
}
OutgoingKind ::Push ( user , pushkey ) = > {
let pdus = pdu_ids
. iter ( )
. map ( | pdu_id | {
Ok ::< _ , ( Vec < u8 > , Error ) > (
let mut pdus = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdus . push (
db . rooms
. get_pdu_from_id ( pdu_id )
. map_err ( | e | ( pushkey . clone ( ) , e ) ) ?
. get_pdu_from_id ( & pdu_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
pushkey . clone ( ) ,
kind . clone ( ) ,
Error ::bad_database (
"[Push] Event in servernamepduids not found in db." ,
) ,
)
} ) ? ,
)
} )
. filter_map ( | r | r . ok ( ) )
. collect ::< Vec < _ > > ( ) ;
) ;
}
SendingEventType ::Edu ( _ ) = > {
// Push gateways don't need EDUs (?)
}
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
@ -427,13 +410,13 @@ impl Sending {
let userid =
UserId ::try_from ( utils ::string_from_bytes ( user ) . map_err ( | _ | {
(
OutgoingKind::Push ( user . clone ( ) , pushkey . clone ( ) ) ,
kind. clone ( ) ,
Error ::bad_database ( "Invalid push user string in db." ) ,
)
} ) ? )
. map_err ( | _ | {
(
OutgoingKind::Push ( user . clone ( ) , pushkey . clone ( ) ) ,
kind. clone ( ) ,
Error ::bad_database ( "Invalid push user id in db." ) ,
)
} ) ? ;
@ -484,15 +467,17 @@ impl Sending {
Ok ( OutgoingKind ::Push ( user . clone ( ) , pushkey . clone ( ) ) )
}
OutgoingKind ::Normal ( server ) = > {
let pdu_jsons = pdu_ids
. iter ( )
. map ( | pdu_id | {
Ok ::< _ , ( OutgoingKind , Error ) > (
let mut edu_jsons = Vec ::new ( ) ;
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
// TODO: check room version and remove event_id if needed
serde_json ::from_str (
pdu_jsons. push ( serde_json::from_str (
PduEvent ::convert_to_outgoing_federation_event (
db . rooms
. get_pdu_json_from_id ( pdu_id )
. get_pdu_json_from_id ( & pdu_id )
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ( ) ) , e ) ) ?
. ok_or_else ( | | {
(
@ -506,11 +491,15 @@ impl Sending {
. json ( )
. get ( ) ,
)
. expect ( "Raw<..> is always valid" ) ,
)
} )
. filter_map ( | r | r . ok ( ) )
. collect ::< Vec < _ > > ( ) ;
. expect ( "Raw<..> is always valid" ) ) ;
}
SendingEventType ::Edu ( edu ) = > {
edu_jsons . push (
serde_json ::from_slice ( edu ) . expect ( "Raw<..> is always valid" ) ,
) ;
}
}
}
let permit = db . sending . maximum_requests . acquire ( ) . await ;
@ -520,10 +509,17 @@ impl Sending {
send_transaction_message ::v1 ::Request {
origin : db . globals . server_name ( ) ,
pdus : & pdu_jsons ,
edus : & [ ] ,
edus : & edu_jsons ,
origin_server_ts : SystemTime ::now ( ) ,
transaction_id : & base64 ::encode_config (
Self ::calculate_hash ( & pdu_ids ) ,
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
base64 ::URL_SAFE_NO_PAD ,
) ,
} ,
@ -546,13 +542,13 @@ impl Sending {
}
}
fn parse_servercurrent pdus ( key : & IVec ) -> Result < ( OutgoingKind , IVec ) > {
fn parse_servercurrent event ( key : & IVec ) -> Result < ( OutgoingKind , SendingEventType ) > {
// Appservices start with a plus
Ok ::< _ , Error > ( if key . starts_with ( b" + " ) {
let mut parts = key [ 1 .. ] . splitn ( 2 , | & b | b = = 0xff ) ;
let server = parts . next ( ) . expect ( "splitn always returns one element" ) ;
let pdu = parts
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let server = utils ::string_from_bytes ( & server ) . map_err ( | _ | {
@ -563,7 +559,11 @@ impl Sending {
OutgoingKind ::Appservice ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
} ) ? ) ,
IVec ::from ( pdu ) ,
if event . starts_with ( b" * " ) {
SendingEventType ::Edu ( event [ 1 .. ] . to_vec ( ) )
} else {
SendingEventType ::Pdu ( event . to_vec ( ) )
} ,
)
} else if key . starts_with ( b" $ " ) {
let mut parts = key [ 1 .. ] . splitn ( 3 , | & b | b = = 0xff ) ;
@ -572,18 +572,22 @@ impl Sending {
let pushkey = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let pdu = parts
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
(
OutgoingKind ::Push ( user . to_vec ( ) , pushkey . to_vec ( ) ) ,
IVec ::from ( pdu ) ,
if event . starts_with ( b" * " ) {
SendingEventType ::Edu ( event [ 1 .. ] . to_vec ( ) )
} else {
SendingEventType ::Pdu ( event . to_vec ( ) )
} ,
)
} else {
let mut parts = key . splitn ( 2 , | & b | b = = 0xff ) ;
let server = parts . next ( ) . expect ( "splitn always returns one element" ) ;
let pdu = parts
let event = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( "Invalid bytes in servercurrentpdus." ) ) ? ;
let server = utils ::string_from_bytes ( & server ) . map_err ( | _ | {
@ -594,7 +598,11 @@ impl Sending {
OutgoingKind ::Normal ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( "Invalid server string in server_currenttransaction" )
} ) ? ) ,
IVec ::from ( pdu ) ,
if event . starts_with ( b" * " ) {
SendingEventType ::Edu ( event [ 1 .. ] . to_vec ( ) )
} else {
SendingEventType ::Pdu ( event . to_vec ( ) )
} ,
)
} )
}