@ -4,6 +4,7 @@ pub use edus::RoomEdus;
use crate ::{ pdu ::PduBuilder , utils , Error , PduEvent , Result } ;
use log ::error ;
use regex ::Regex ;
use ring ::digest ;
use ruma ::{
api ::client ::error ::ErrorKind ,
@ -15,7 +16,8 @@ use ruma::{
} ,
EventType ,
} ,
EventId , Raw , RoomAliasId , RoomId , ServerName , UserId ,
serde ::{ to_canonical_value , CanonicalJsonObject , CanonicalJsonValue , Raw } ,
EventId , RoomAliasId , RoomId , RoomVersionId , ServerName , UserId ,
} ;
use sled ::IVec ;
use state_res ::{ event_auth , Error as StateError , Requester , StateEvent , StateMap , StateStore } ;
@ -61,7 +63,8 @@ pub struct Rooms {
/// Remember the state hash at events in the past.
pub ( super ) pduid_statehash : sled ::Tree ,
/// The state for a given state hash.
pub ( super ) stateid_pduid : sled ::Tree , // StateId = StateHash + EventType + StateKey
pub ( super ) statekey_short : sled ::Tree , // StateKey = EventType + StateKey, Short = Count
pub ( super ) stateid_pduid : sled ::Tree , // StateId = StateHash + Short, PduId = Count (without roomid)
}
impl StateStore for Rooms {
@ -74,7 +77,10 @@ impl StateStore for Rooms {
. get_pdu_id ( event_id )
. map_err ( StateError ::custom ) ?
. ok_or_else ( | | {
StateError ::NotFound ( "PDU via room_id and event_id not found in the db." . into ( ) )
StateError ::NotFound ( format! (
"PDU via room_id and event_id not found in the db: {}" ,
event_id . as_str ( )
) )
} ) ? ;
serde_json ::from_slice (
@ -88,7 +94,7 @@ impl StateStore for Rooms {
. and_then ( | pdu : StateEvent | {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu . room_id ( ) = = Some ( room_id ) {
if pdu . room_id ( ) = = room_id {
Ok ( Arc ::new ( pdu ) )
} else {
Err ( StateError ::NotFound (
@ -102,21 +108,28 @@ impl StateStore for Rooms {
impl Rooms {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
pub fn state_full ( & self , state_hash : & StateHashId ) -> Result < StateMap < PduEvent > > {
pub fn state_full (
& self ,
room_id : & RoomId ,
state_hash : & StateHashId ,
) -> Result < StateMap < PduEvent > > {
self . stateid_pduid
. scan_prefix ( & state_hash )
. values ( )
. map ( | pduid | {
self . pduid_pdu . get ( & pduid ? ) ? . map_or_else (
| | Err ( Error ::bad_database ( "Failed to find StateMap." ) ) ,
. map ( | pduid_short | {
let mut pduid = room_id . as_bytes ( ) . to_vec ( ) ;
pduid . push ( 0xff ) ;
pduid . extend_from_slice ( & pduid_short ? ) ;
self . pduid_pdu . get ( & pduid ) ? . map_or_else (
| | Err ( Error ::bad_database ( "Failed to find PDU in state snapshot." ) ) ,
| b | {
serde_json ::from_slice ::< PduEvent > ( & b )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU in db." ) )
} ,
)
} )
. filter_map ( | r | r . ok ( ) )
. map ( | pdu | {
let pdu = pdu ? ;
Ok ( (
(
pdu . kind . clone ( ) ,
@ -131,64 +144,45 @@ impl Rooms {
. collect ::< Result < StateMap < _ > > > ( )
}
/// Returns all state entries for this type.
pub fn state_type (
& self ,
state_hash : & StateHashId ,
event_type : & EventType ,
) -> Result < HashMap < String , PduEvent > > {
let mut prefix = state_hash . to_vec ( ) ;
prefix . push ( 0xff ) ;
prefix . extend_from_slice ( & event_type . to_string ( ) . as_bytes ( ) ) ;
prefix . push ( 0xff ) ;
let mut hashmap = HashMap ::new ( ) ;
for pdu in self
. stateid_pduid
. scan_prefix ( & prefix )
. values ( )
. map ( | pdu_id | {
Ok ::< _ , Error > (
serde_json ::from_slice ::< PduEvent > ( & self . pduid_pdu . get ( pdu_id ? ) ? . ok_or_else (
| | Error ::bad_database ( "PDU in state not found in database." ) ,
) ? )
. map_err ( | _ | Error ::bad_database ( "Invalid PDU bytes in room state." ) ) ? ,
)
} )
{
let pdu = pdu ? ;
let state_key = pdu . state_key . clone ( ) . ok_or_else ( | | {
Error ::bad_database ( "Room state contains event without state_key." )
} ) ? ;
hashmap . insert ( state_key , pdu ) ;
}
Ok ( hashmap )
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
pub fn state_get (
& self ,
room_id : & RoomId ,
state_hash : & StateHashId ,
event_type : & EventType ,
state_key : & str ,
) -> Result < Option < ( IVec , PduEvent ) > > {
let mut key = state_hash . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & event_type . to_string ( ) . as_bytes ( ) ) ;
let mut key = event_type . to_string ( ) . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( & state_key . as_bytes ( ) ) ;
self . stateid_pduid . get ( & key ) ? . map_or ( Ok ( None ) , | pdu_id | {
Ok ::< _ , Error > ( Some ( (
pdu_id . clone ( ) ,
serde_json ::from_slice ::< PduEvent > (
& self . pduid_pdu . get ( & pdu_id ) ? . ok_or_else ( | | {
Error ::bad_database ( "PDU in state not found in database." )
} ) ? ,
)
. map_err ( | _ | Error ::bad_database ( "Invalid PDU bytes in room state." ) ) ? ,
) ) )
} )
let short = self . statekey_short . get ( & key ) ? ;
if let Some ( short ) = short {
let mut stateid = state_hash . to_vec ( ) ;
stateid . push ( 0xff ) ;
stateid . extend_from_slice ( & short ) ;
self . stateid_pduid
. get ( & stateid ) ?
. map_or ( Ok ( None ) , | pdu_id_short | {
let mut pdu_id = room_id . as_bytes ( ) . to_vec ( ) ;
pdu_id . push ( 0xff ) ;
pdu_id . extend_from_slice ( & pdu_id_short ) ;
Ok ::< _ , Error > ( Some ( (
pdu_id . clone ( ) . into ( ) ,
serde_json ::from_slice ::< PduEvent > (
& self . pduid_pdu . get ( & pdu_id ) ? . ok_or_else ( | | {
Error ::bad_database ( "PDU in state not found in database." )
} ) ? ,
)
. map_err ( | _ | Error ::bad_database ( "Invalid PDU bytes in room state." ) ) ? ,
) ) )
} )
} else {
return Ok ( None ) ;
}
}
/// Returns the last state hash key added to the db.
@ -196,7 +190,7 @@ impl Rooms {
Ok ( self . pduid_statehash . get ( pdu_id ) ? )
}
/// Returns the last state hash key added to the db .
/// Returns the last state hash key added to the db for the given room .
pub fn current_state_hash ( & self , room_id : & RoomId ) -> Result < Option < StateHashId > > {
Ok ( self . roomid_statehash . get ( room_id . as_bytes ( ) ) ? )
}
@ -249,11 +243,14 @@ impl Rooms {
. is_some ( ) )
}
/// Returns the full room state.
/// Force the creation of a new StateHash and insert it into the db.
///
/// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
pub fn force_state (
& self ,
room_id : & RoomId ,
state : HashMap < ( EventType , String ) , Vec < u8 > > ,
globals : & super ::globals ::Globals ,
) -> Result < ( ) > {
let state_hash =
self . calculate_hash ( & state . values ( ) . map ( | pdu_id | & * * pdu_id ) . collect ::< Vec < _ > > ( ) ) ? ;
@ -261,11 +258,29 @@ impl Rooms {
prefix . push ( 0xff ) ;
for ( ( event_type , state_key ) , pdu_id ) in state {
let mut statekey = event_type . as_ref ( ) . as_bytes ( ) . to_vec ( ) ;
statekey . push ( 0xff ) ;
statekey . extend_from_slice ( & state_key . as_bytes ( ) ) ;
let short = match self . statekey_short . get ( & statekey ) ? {
Some ( short ) = > utils ::u64_from_bytes ( & short )
. map_err ( | _ | Error ::bad_database ( "Invalid short bytes in statekey_short." ) ) ? ,
None = > {
let short = globals . next_count ( ) ? ;
self . statekey_short
. insert ( & statekey , & short . to_be_bytes ( ) ) ? ;
short
}
} ;
let pdu_id_short = pdu_id
. splitn ( 2 , | & b | b = = 0xff )
. nth ( 1 )
. ok_or_else ( | | Error ::bad_database ( "Invalid pduid in state." ) ) ? ;
let mut state_id = prefix . clone ( ) ;
state_id . extend_from_slice ( & event_type . as_str ( ) . as_bytes ( ) ) ;
state_id . push ( 0xff ) ;
state_id . extend_from_slice ( & state_key . as_bytes ( ) ) ;
self . stateid_pduid . insert ( state_id , pdu_id ) ? ;
state_id . extend_from_slice ( & short . to_be_bytes ( ) ) ;
self . stateid_pduid . insert ( state_id , pdu_id_short ) ? ;
}
self . roomid_statehash
@ -277,25 +292,12 @@ impl Rooms {
/// Returns the full room state.
pub fn room_state_full ( & self , room_id : & RoomId ) -> Result < StateMap < PduEvent > > {
if let Some ( current_state_hash ) = self . current_state_hash ( room_id ) ? {
self . state_full ( & current_state_hash)
self . state_full ( & room_id, & current_state_hash)
} else {
Ok ( BTreeMap ::new ( ) )
}
}
/// Returns all state entries for this type.
pub fn room_state_type (
& self ,
room_id : & RoomId ,
event_type : & EventType ,
) -> Result < HashMap < String , PduEvent > > {
if let Some ( current_state_hash ) = self . current_state_hash ( room_id ) ? {
self . state_type ( & current_state_hash , event_type )
} else {
Ok ( HashMap ::new ( ) )
}
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
pub fn room_state_get (
& self ,
@ -304,7 +306,7 @@ impl Rooms {
state_key : & str ,
) -> Result < Option < ( IVec , PduEvent ) > > {
if let Some ( current_state_hash ) = self . current_state_hash ( room_id ) ? {
self . state_get ( & current_state_hash, event_type , state_key )
self . state_get ( & room_id, & current_state_hash, event_type , state_key )
} else {
Ok ( None )
}
@ -369,8 +371,8 @@ impl Rooms {
} )
}
/// Returns the pdu .
pub fn get_pdu_json_from_id ( & self , pdu_id : & [ u8 ] ) -> Result < Option < serde_json::Value > > {
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>` .
pub fn get_pdu_json_from_id ( & self , pdu_id : & [ u8 ] ) -> Result < Option < CanonicalJsonObject > > {
self . pduid_pdu . get ( pdu_id ) ? . map_or ( Ok ( None ) , | pdu | {
Ok ( Some (
serde_json ::from_slice ( & pdu )
@ -437,16 +439,46 @@ impl Rooms {
}
/// Creates a new persisted data unit and adds it to a room.
///
/// By this point the incoming event should be fully authenticated, no auth happens
/// in `append_pdu`.
pub fn append_pdu (
& self ,
pdu : & PduEvent ,
pdu_json : & serde_json ::Value ,
mut pdu_json : CanonicalJsonObject ,
count : u64 ,
pdu_id : IVec ,
globals : & super ::globals ::Globals ,
account_data : & super ::account_data ::AccountData ,
admin : & super ::admin ::Admin ,
) -> Result < ( ) > {
// Make unsigned fields correct. This is not properly documented in the spec, but state
// events need to have previous content in the unsigned field, so clients can easily
// interpret things like membership changes
if let Some ( state_key ) = & pdu . state_key {
if let CanonicalJsonValue ::Object ( unsigned ) = pdu_json
. entry ( "unsigned" . to_owned ( ) )
. or_insert_with ( | | CanonicalJsonValue ::Object ( Default ::default ( ) ) )
{
if let Some ( prev_state_hash ) = self . pdu_state_hash ( & pdu_id ) . unwrap ( ) {
if let Some ( prev_state ) = self
. state_get ( & pdu . room_id , & prev_state_hash , & pdu . kind , & state_key )
. unwrap ( )
{
unsigned . insert (
"prev_content" . to_owned ( ) ,
CanonicalJsonValue ::Object (
utils ::to_canonical_object ( prev_state . 1. content )
. expect ( "event is valid, we just created it" ) ,
) ,
) ;
}
}
} else {
error ! ( "Invalid unsigned type in pdu." ) ;
}
}
self . replace_pdu_leaves ( & pdu . room_id , & pdu . event_id ) ? ;
// Mark as read first so the sending client doesn't get a notification even if appending
@ -454,7 +486,11 @@ impl Rooms {
self . edus
. private_read_set ( & pdu . room_id , & pdu . sender , count , & globals ) ? ;
self . pduid_pdu . insert ( & pdu_id , & * pdu_json . to_string ( ) ) ? ;
self . pduid_pdu . insert (
& pdu_id ,
& * serde_json ::to_string ( & pdu_json )
. expect ( "CanonicalJsonObject is always a valid String" ) ,
) ? ;
self . eventid_pduid
. insert ( pdu . event_id . as_bytes ( ) , & * pdu_id ) ? ;
@ -512,17 +548,59 @@ impl Rooms {
. as_ref ( )
= = Some ( & pdu . room_id )
{
let mut parts = body . split_whitespace ( ) . skip ( 1 ) ;
let mut lines = body . lines ( ) ;
let command_line = lines . next ( ) . expect ( "each string has at least one line" ) ;
let body = lines . collect ::< Vec < _ > > ( ) ;
let mut parts = command_line . split_whitespace ( ) . skip ( 1 ) ;
if let Some ( command ) = parts . next ( ) {
let args = parts . collect ::< Vec < _ > > ( ) ;
admin . send ( AdminCommand ::SendTextMessage (
message ::TextMessageEventContent {
body : format ! ( "Command: {}, Args: {:?}" , command , args ) ,
formatted : None ,
relates_to : None ,
} ,
) ) ;
match command {
"register_appservice" = > {
if body . len ( ) > 2
& & body [ 0 ] . trim ( ) = = "```"
& & body . last ( ) . unwrap ( ) . trim ( ) = = "```"
{
let appservice_config = body [ 1 .. body . len ( ) - 1 ] . join ( "\n" ) ;
let parsed_config = serde_yaml ::from_str ::< serde_yaml ::Value > (
& appservice_config ,
) ;
match parsed_config {
Ok ( yaml ) = > {
admin . send ( AdminCommand ::RegisterAppservice ( yaml ) ) ;
}
Err ( e ) = > {
admin . send ( AdminCommand ::SendMessage (
message ::MessageEventContent ::text_plain (
format! (
"Could not parse appservice config: {}" ,
e
) ,
) ,
) ) ;
}
}
} else {
admin . send ( AdminCommand ::SendMessage (
message ::MessageEventContent ::text_plain (
"Expected code block in command body." ,
) ,
) ) ;
}
}
"list_appservices" = > {
admin . send ( AdminCommand ::ListAppservices ) ;
}
_ = > {
admin . send ( AdminCommand ::SendMessage (
message ::MessageEventContent ::text_plain ( format! (
"Command: {}, Args: {:?}" ,
command , args
) ) ,
) ) ;
}
}
}
}
}
@ -538,7 +616,12 @@ impl Rooms {
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
pub fn append_to_state ( & self , new_pdu_id : & [ u8 ] , new_pdu : & PduEvent ) -> Result < StateHashId > {
pub fn append_to_state (
& self ,
new_pdu_id : & [ u8 ] ,
new_pdu : & PduEvent ,
globals : & super ::globals ::Globals ,
) -> Result < StateHashId > {
let old_state =
if let Some ( old_state_hash ) = self . roomid_statehash . get ( new_pdu . room_id . as_bytes ( ) ) ? {
// Store state for event. The state does not include the event itself.
@ -553,6 +636,7 @@ impl Rooms {
self . stateid_pduid
. scan_prefix ( & prefix )
. filter_map ( | pdu | pdu . map_err ( | e | error ! ( "{}" , e ) ) . ok ( ) )
// Chop the old state_hash out leaving behind the short key (u64)
. map ( | ( k , v ) | ( k . subslice ( prefix . len ( ) , k . len ( ) - prefix . len ( ) ) , v ) )
. collect ::< HashMap < IVec , IVec > > ( )
} else {
@ -561,10 +645,26 @@ impl Rooms {
if let Some ( state_key ) = & new_pdu . state_key {
let mut new_state = old_state ;
let mut pdu_key = new_pdu . kind . as_ st r( ) . as_bytes ( ) . to_vec ( ) ;
let mut pdu_key = new_pdu . kind . as_ ref ( ) . as_bytes ( ) . to_vec ( ) ;
pdu_key . push ( 0xff ) ;
pdu_key . extend_from_slice ( state_key . as_bytes ( ) ) ;
new_state . insert ( pdu_key . into ( ) , new_pdu_id . into ( ) ) ;
let short = match self . statekey_short . get ( & pdu_key ) ? {
Some ( short ) = > utils ::u64_from_bytes ( & short )
. map_err ( | _ | Error ::bad_database ( "Invalid short bytes in statekey_short." ) ) ? ,
None = > {
let short = globals . next_count ( ) ? ;
self . statekey_short . insert ( & pdu_key , & short . to_be_bytes ( ) ) ? ;
short
}
} ;
let new_pdu_id_short = new_pdu_id
. splitn ( 2 , | & b | b = = 0xff )
. nth ( 1 )
. ok_or_else ( | | Error ::bad_database ( "Invalid pduid in state." ) ) ? ;
new_state . insert ( ( & short . to_be_bytes ( ) ) . into ( ) , new_pdu_id_short . into ( ) ) ;
let new_state_hash =
self . calculate_hash ( & new_state . values ( ) . map ( | b | & * * b ) . collect ::< Vec < _ > > ( ) ) ? ;
@ -572,17 +672,12 @@ impl Rooms {
let mut key = new_state_hash . to_vec ( ) ;
key . push ( 0xff ) ;
// TODO: we could avoid writing to the DB on every state event by keeping
// track of the delta and write that every so often
for ( key_without_prefix , pdu_id ) in new_state {
for ( short , short_pdu_id ) in new_state {
let mut state_id = key . clone ( ) ;
state_id . extend_from_slice ( & key_without_prefix ) ;
self . stateid_pduid . insert ( & state_id , & pdu_id) ? ;
state_id . extend_from_slice ( & short ) ;
self . stateid_pduid . insert ( & state_id , & short_ pdu_id) ? ;
}
self . roomid_statehash
. insert ( new_pdu . room_id . as_bytes ( ) , & * new_state_hash ) ? ;
Ok ( new_state_hash )
} else {
Err ( Error ::bad_database (
@ -591,7 +686,15 @@ impl Rooms {
}
}
pub fn set_room_state ( & self , room_id : & RoomId , state_hash : & StateHashId ) -> Result < ( ) > {
self . roomid_statehash
. insert ( room_id . as_bytes ( ) , state_hash ) ? ;
Ok ( ( ) )
}
/// Creates a new persisted data unit and adds it to a room.
#[ allow(clippy::too_many_arguments) ]
pub fn build_and_append_pdu (
& self ,
pdu_builder : PduBuilder ,
@ -601,6 +704,7 @@ impl Rooms {
sending : & super ::sending ::Sending ,
admin : & super ::admin ::Admin ,
account_data : & super ::account_data ::AccountData ,
appservice : & super ::appservice ::Appservice ,
) -> Result < EventId > {
let PduBuilder {
event_type ,
@ -682,12 +786,12 @@ impl Rooms {
#[ allow(clippy::blocks_in_if_conditions) ]
if ! match event_type {
EventType ::RoomEncryption = > {
// Don't allow encryption events when it's disabled
! globals . encryption_disabled ( )
// Only allow encryption events if it's allowed in the config
globals . allow_ encryption( )
}
EventType ::RoomMember = > {
let prev_event = self
. get_pdu ( prev_events . iter( ) . next ( ) . ok_or ( Error ::BadRequest (
. get_pdu ( prev_events . get( 0 ) . ok_or ( Error ::BadRequest (
ErrorKind ::Unknown ,
"Membership can't be the first event" ,
) ) ? ) ?
@ -703,7 +807,7 @@ impl Rooms {
sender : & sender ,
} ,
prev_event ,
None ,
None , // TODO: third party invite
& auth_events
. iter ( )
. map ( | ( ( ty , key ) , pdu ) | {
@ -761,7 +865,7 @@ impl Rooms {
}
let mut pdu = PduEvent {
event_id : EventId::try_from ( "$thiswillbefilledinlater" ) . expect ( "we know this is valid ") ,
event_id : ruma::event_id ! ( "$thiswillbefilledinlater ") ,
room_id : room_id . clone ( ) ,
sender : sender . clone ( ) ,
origin_server_ts : utils ::millis_since_unix_epoch ( )
@ -787,37 +891,38 @@ impl Rooms {
} ;
// Hash and sign
let mut pdu_json = serde_json ::to_value ( & pdu ) . expect ( "event is valid, we just created it" ) ;
pdu_json
. as_object_mut ( )
. expect ( "json is object" )
. remove ( "event_id" ) ;
let mut pdu_json =
utils ::to_canonical_object ( & pdu ) . expect ( "event is valid, we just created it" ) ;
pdu_json . remove ( "event_id" ) ;
// Add origin because synapse likes that (and it's required in the spec)
pdu_json
. as_object_mut ( )
. expect ( "json is object" )
. insert ( "origin" . to_owned ( ) , globals . server_name ( ) . as_str ( ) . into ( ) ) ;
pdu_json . insert (
"origin" . to_owned ( ) ,
to_canonical_value ( globals . server_name ( ) )
. expect ( "server name is a valid CanonicalJsonValue" ) ,
) ;
ruma ::signatures ::hash_and_sign_event (
globals . server_name ( ) . as_str ( ) ,
globals . keypair ( ) ,
& mut pdu_json ,
& RoomVersionId ::Version6 ,
)
. expect ( "event is valid, we just created it" ) ;
// Generate event id
pdu . event_id = EventId ::try_from ( & * format! (
"${}" ,
ruma ::signatures ::reference_hash ( & pdu_json )
ruma ::signatures ::reference_hash ( & pdu_json , & RoomVersionId ::Version6 )
. expect ( "ruma can calculate reference hashes" )
) )
. expect ( "ruma's reference hashes are valid event ids" ) ;
pdu_json
. as_object_mut ( )
. expect ( "json is object" )
. insert ( "event_id" . to_owned ( ) , pdu . event_id . to_string ( ) . into ( ) ) ;
pdu_json . insert (
"event_id" . to_owned ( ) ,
to_canonical_value ( & pdu . event_id ) . expect ( "EventId is a valid CanonicalJsonValue" ) ,
) ;
// Increment the last index and use that
// This is also the next_batch/since value
@ -828,11 +933,11 @@ impl Rooms {
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
self . append_to_state ( & pdu_id , & pdu ) ? ;
let statehashid = self . append_to_state ( & pdu_id , & pdu , & globals ) ? ;
self . append_pdu (
& pdu ,
& pdu_json ,
pdu_json ,
count ,
pdu_id . clone ( ) . into ( ) ,
globals ,
@ -840,12 +945,79 @@ impl Rooms {
admin ,
) ? ;
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
self . set_room_state ( & room_id , & statehashid ) ? ;
for server in self
. room_servers ( room_id )
. filter_map ( | r | r . ok ( ) )
. filter ( | server | & * * server ! = globals . server_name ( ) )
{
sending . send_pdu ( server , & pdu_id ) ? ;
sending . send_pdu ( & server , & pdu_id ) ? ;
}
for appservice in appservice . iter_all ( ) . filter_map ( | r | r . ok ( ) ) {
if let Some ( namespaces ) = appservice . 1. get ( "namespaces" ) {
let users = namespaces
. get ( "users" )
. and_then ( | users | users . as_sequence ( ) )
. map_or_else (
| | Vec ::new ( ) ,
| users | {
users
. iter ( )
. map ( | users | {
users
. get ( "regex" )
. and_then ( | regex | regex . as_str ( ) )
. and_then ( | regex | Regex ::new ( regex ) . ok ( ) )
} )
. filter_map ( | o | o )
. collect ::< Vec < _ > > ( )
} ,
) ;
let aliases = namespaces
. get ( "aliases" )
. and_then ( | users | users . get ( "regex" ) )
. and_then ( | regex | regex . as_str ( ) )
. and_then ( | regex | Regex ::new ( regex ) . ok ( ) ) ;
let rooms = namespaces
. get ( "rooms" )
. and_then ( | rooms | rooms . as_sequence ( ) ) ;
let room_aliases = self . room_aliases ( & room_id ) ;
let bridge_user_id = appservice
. 1
. get ( "sender_localpart" )
. and_then ( | string | string . as_str ( ) )
. and_then ( | string | {
UserId ::parse_with_server_name ( string , globals . server_name ( ) ) . ok ( )
} ) ;
if bridge_user_id . map_or ( false , | bridge_user_id | {
self . is_joined ( & bridge_user_id , room_id ) . unwrap_or ( false )
} ) | | users . iter ( ) . any ( | users | {
users . is_match ( pdu . sender . as_str ( ) )
| | pdu . kind = = EventType ::RoomMember
& & pdu
. state_key
. as_ref ( )
. map_or ( false , | state_key | users . is_match ( & state_key ) )
} ) | | aliases . map_or ( false , | aliases | {
room_aliases
. filter_map ( | r | r . ok ( ) )
. any ( | room_alias | aliases . is_match ( room_alias . as_str ( ) ) )
} ) | | rooms . map_or ( false , | rooms | rooms . contains ( & room_id . as_str ( ) . into ( ) ) )
| | self
. room_members ( & room_id )
. filter_map ( | r | r . ok ( ) )
. any ( | member | users . iter ( ) . any ( | regex | regex . is_match ( member . as_str ( ) ) ) )
{
sending . send_pdu_appservice ( & appservice . 0 , & pdu_id ) ? ;
}
}
}
Ok ( pdu . event_id )