@ -9,6 +9,7 @@ use std::{
} ;
use futures_util ::{ stream ::FuturesUnordered , Future , StreamExt } ;
use globals ::SigningKeys ;
use ruma ::{
api ::{
client ::error ::ErrorKind ,
@ -30,7 +31,6 @@ use ruma::{
StateEventType , TimelineEventType ,
} ,
int ,
serde ::Base64 ,
state_res ::{ self , RoomVersion , StateMap } ,
uint , CanonicalJsonObject , CanonicalJsonValue , EventId , MilliSecondsSinceUnixEpoch ,
OwnedServerName , OwnedServerSigningKeyId , RoomId , RoomVersionId , ServerName ,
@ -78,7 +78,7 @@ impl Service {
room_id : & ' a RoomId ,
value : BTreeMap < String , CanonicalJsonValue > ,
is_timeline_event : bool ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & ' a RwLock < BTreeMap < String , SigningKeys > > ,
) -> Result < Option < Vec < u8 > > > {
// 0. Check the server is in the room
if ! services ( ) . rooms . metadata . exists ( room_id ) ? {
@ -304,19 +304,12 @@ impl Service {
room_id : & ' a RoomId ,
mut value : BTreeMap < String , CanonicalJsonValue > ,
auth_events_known : bool ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & ' a RwLock < BTreeMap < String , SigningKeys > > ,
) -> AsyncRecursiveType < ' a , Result < ( Arc < PduEvent > , BTreeMap < String , CanonicalJsonValue > ) > > {
Box ::pin ( async move {
// 1.1. Remove unsigned field
value . remove ( "unsigned" ) ;
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// We go through all the signatures we see on the value and fetch the corresponding signing
// keys
self . fetch_required_signing_keys ( & value , pub_key_map )
. await ? ;
// 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match
let create_event_content : RoomCreateEventContent =
@ -329,41 +322,80 @@ impl Service {
let room_version =
RoomVersion ::new ( room_version_id ) . expect ( "room version is supported" ) ;
let guard = pub_key_map . read ( ) . await ;
let mut val = match ruma ::signatures ::verify_event ( & guard , & value , room_version_id ) {
Err ( e ) = > {
// Drop
warn ! ( "Dropping bad event {}: {}" , event_id , e , ) ;
return Err ( Error ::BadRequest (
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// We go through all the signatures we see on the value and fetch the corresponding signing
// keys
self . fetch_required_signing_keys ( & value , pub_key_map )
. await ? ;
let origin_server_ts = value . get ( "origin_server_ts" ) . ok_or_else ( | | {
error ! ( "Invalid PDU, no origin_server_ts field" ) ;
Error ::BadRequest (
ErrorKind ::MissingParam ,
"Invalid PDU, no origin_server_ts field" ,
)
} ) ? ;
let origin_server_ts : MilliSecondsSinceUnixEpoch = {
let ts = origin_server_ts . as_integer ( ) . ok_or_else ( | | {
Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Signature verification failed" ,
) ) ;
}
Ok ( ruma ::signatures ::Verified ::Signatures ) = > {
// Redact
warn ! ( "Calculated hash does not match: {}" , event_id ) ;
let obj = match ruma ::canonical_json ::redact ( value , room_version_id , None ) {
Ok ( obj ) = > obj ,
Err ( _ ) = > {
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Redaction failed" ,
) )
}
} ;
"origin_server_ts must be an integer" ,
)
} ) ? ;
MilliSecondsSinceUnixEpoch ( i64 ::from ( ts ) . try_into ( ) . map_err ( | _ | {
Error ::BadRequest ( ErrorKind ::InvalidParam , "Time must be after the unix epoch" )
} ) ? )
} ;
// Skip the PDU if it is redacted and we already have it as an outlier event
if services ( ) . rooms . timeline . get_pdu_json ( event_id ) ? . is_some ( ) {
let guard = pub_key_map . read ( ) . await ;
let pkey_map = ( * guard ) . clone ( ) ;
// Removing all the expired keys, unless the room version allows stale keys
let filtered_keys = services ( ) . globals . filter_keys_server_map (
pkey_map ,
origin_server_ts ,
room_version_id ,
) ;
let mut val =
match ruma ::signatures ::verify_event ( & filtered_keys , & value , room_version_id ) {
Err ( e ) = > {
// Drop
warn ! ( "Dropping bad event {}: {}" , event_id , e , ) ;
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Event was redacted and we already knew about it" ,
" Signature verification failed ",
) ) ;
}
Ok ( ruma ::signatures ::Verified ::Signatures ) = > {
// Redact
warn ! ( "Calculated hash does not match: {}" , event_id ) ;
let obj = match ruma ::canonical_json ::redact ( value , room_version_id , None ) {
Ok ( obj ) = > obj ,
Err ( _ ) = > {
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Redaction failed" ,
) )
}
} ;
obj
}
Ok ( ruma ::signatures ::Verified ::All ) = > value ,
} ;
// Skip the PDU if it is redacted and we already have it as an outlier event
if services ( ) . rooms . timeline . get_pdu_json ( event_id ) ? . is_some ( ) {
return Err ( Error ::BadRequest (
ErrorKind ::InvalidParam ,
"Event was redacted and we already knew about it" ,
) ) ;
}
obj
}
Ok ( ruma ::signatures ::Verified ::All ) = > value ,
} ;
drop ( guard ) ;
@ -487,7 +519,7 @@ impl Service {
create_event : & PduEvent ,
origin : & ServerName ,
room_id : & RoomId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & RwLock < BTreeMap < String , SigningKeys > > ,
) -> Result < Option < Vec < u8 > > > {
// Skip the PDU if we already have it as a timeline event
if let Ok ( Some ( pduid ) ) = services ( ) . rooms . timeline . get_pdu_id ( & incoming_pdu . event_id ) {
@ -1097,7 +1129,7 @@ impl Service {
create_event : & ' a PduEvent ,
room_id : & ' a RoomId ,
room_version_id : & ' a RoomVersionId ,
pub_key_map : & ' a RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & ' a RwLock < BTreeMap < String , SigningKeys > > ,
) -> AsyncRecursiveType < ' a , Vec < ( Arc < PduEvent > , Option < BTreeMap < String , CanonicalJsonValue > > ) > >
{
Box ::pin ( async move {
@ -1280,7 +1312,7 @@ impl Service {
create_event : & PduEvent ,
room_id : & RoomId ,
room_version_id : & RoomVersionId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & RwLock < BTreeMap < String , SigningKeys > > ,
initial_set : Vec < Arc < EventId > > ,
) -> Result < (
Vec < Arc < EventId > > ,
@ -1378,7 +1410,7 @@ impl Service {
pub ( crate ) async fn fetch_required_signing_keys (
& self ,
event : & BTreeMap < String , CanonicalJsonValue > ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & RwLock < BTreeMap < String , SigningKeys > > ,
) -> Result < ( ) > {
let signatures = event
. get ( "signatures" )
@ -1407,6 +1439,7 @@ impl Service {
)
} ) ? ,
signature_ids ,
true ,
)
. await ;
@ -1434,7 +1467,7 @@ impl Service {
pdu : & RawJsonValue ,
servers : & mut BTreeMap < OwnedServerName , BTreeMap < OwnedServerSigningKeyId , QueryCriteria > > ,
room_version : & RoomVersionId ,
pub_key_map : & mut RwLockWriteGuard < ' _ , BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & mut RwLockWriteGuard < ' _ , BTreeMap < String , SigningKeys > > ,
) -> Result < ( ) > {
let value : CanonicalJsonObject = serde_json ::from_str ( pdu . get ( ) ) . map_err ( | e | {
error ! ( "Invalid PDU in server response: {:?}: {:?}" , pdu , e ) ;
@ -1485,8 +1518,18 @@ impl Service {
let signature_ids = signature_object . keys ( ) . cloned ( ) . collect ::< Vec < _ > > ( ) ;
let contains_all_ids = | keys : & BTreeMap < String , Base64 > | {
signature_ids . iter ( ) . all ( | id | keys . contains_key ( id ) )
let contains_all_ids = | keys : & SigningKeys | {
signature_ids . iter ( ) . all ( | id | {
keys . verify_keys
. keys ( )
. map ( ToString ::to_string )
. any ( | key_id | id = = & key_id )
| | keys
. old_verify_keys
. keys ( )
. map ( ToString ::to_string )
. any ( | key_id | id = = & key_id )
} )
} ;
let origin = < & ServerName > ::try_from ( signature_server . as_str ( ) ) . map_err ( | _ | {
@ -1499,19 +1542,14 @@ impl Service {
trace ! ( "Loading signing keys for {}" , origin ) ;
let result : BTreeMap < _ , _ > = services ( )
. globals
. signing_keys_for ( origin ) ?
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) )
. collect ( ) ;
if ! contains_all_ids ( & result ) {
trace ! ( "Signing key not loaded for {}" , origin ) ;
servers . insert ( origin . to_owned ( ) , BTreeMap ::new ( ) ) ;
}
if let Some ( result ) = services ( ) . globals . signing_keys_for ( origin ) ? {
if ! contains_all_ids ( & result ) {
trace ! ( "Signing key not loaded for {}" , origin ) ;
servers . insert ( origin . to_owned ( ) , BTreeMap ::new ( ) ) ;
}
pub_key_map . insert ( origin . to_string ( ) , result ) ;
pub_key_map . insert ( origin . to_string ( ) , result ) ;
}
}
Ok ( ( ) )
@ -1521,7 +1559,7 @@ impl Service {
& self ,
event : & create_join_event ::v2 ::Response ,
room_version : & RoomVersionId ,
pub_key_map : & RwLock < BTreeMap < String , BTreeMap< String , Base64 > > > ,
pub_key_map : & RwLock < BTreeMap < String , SigningKeys > > ,
) -> Result < ( ) > {
let mut servers : BTreeMap <
OwnedServerName ,
@ -1584,10 +1622,7 @@ impl Service {
let result = services ( )
. globals
. add_signing_key ( & k . server_name , k . clone ( ) ) ?
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) )
. collect ::< BTreeMap < _ , _ > > ( ) ;
. add_signing_key_from_trusted_server ( & k . server_name , k . clone ( ) ) ? ;
pkm . insert ( k . server_name . to_string ( ) , result ) ;
}
@ -1618,12 +1653,9 @@ impl Service {
if let ( Ok ( get_keys_response ) , origin ) = result {
info ! ( "Result is from {origin}" ) ;
if let Ok ( key ) = get_keys_response . server_key . deserialize ( ) {
let result : BTreeMap < _ , _ > = services ( )
let result = services ( )
. globals
. add_signing_key ( & origin , key ) ?
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) )
. collect ( ) ;
. add_signing_key_from_origin ( & origin , key ) ? ;
pub_key_map . write ( ) . await . insert ( origin . to_string ( ) , result ) ;
}
}
@ -1681,9 +1713,23 @@ impl Service {
& self ,
origin : & ServerName ,
signature_ids : Vec < String > ,
) -> Result < BTreeMap < String , Base64 > > {
let contains_all_ids =
| keys : & BTreeMap < String , Base64 > | signature_ids . iter ( ) . all ( | id | keys . contains_key ( id ) ) ;
// Whether to ask for keys from trusted servers. Should be false when getting
// keys for validating requests, as per MSC4029
query_via_trusted_servers : bool ,
) -> Result < SigningKeys > {
let contains_all_ids = | keys : & SigningKeys | {
signature_ids . iter ( ) . all ( | id | {
keys . verify_keys
. keys ( )
. map ( ToString ::to_string )
. any ( | key_id | id = = & key_id )
| | keys
. old_verify_keys
. keys ( )
. map ( ToString ::to_string )
. any ( | key_id | id = = & key_id )
} )
} ;
let permit = services ( )
. globals
@ -1744,94 +1790,172 @@ impl Service {
trace ! ( "Loading signing keys for {}" , origin ) ;
let mut result : BTreeMap < _ , _ > = services ( )
. globals
. signing_keys_for ( origin ) ?
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) )
. collect ( ) ;
let result = services ( ) . globals . signing_keys_for ( origin ) ? ;
let mut expires_soon_or_has_expired = false ;
if contains_all_ids ( & result ) {
return Ok ( result ) ;
if let Some ( result ) = result . clone ( ) {
let ts_threshold = MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( ) + Duration ::from_secs ( 30 * 60 ) ,
)
. expect ( "Should be valid until year 500,000,000" ) ;
debug ! (
"The treshhold is {:?}, found time is {:?} for server {}" ,
ts_threshold , result . valid_until_ts , origin
) ;
if contains_all_ids ( & result ) {
// We want to ensure that the keys remain valid by the time the other functions that handle signatures reach them
if result . valid_until_ts > ts_threshold {
debug ! (
"Keys for {} are deemed as valid, as they expire at {:?}" ,
& origin , & result . valid_until_ts
) ;
return Ok ( result ) ;
}
expires_soon_or_has_expired = true ;
}
}
let mut keys = result . unwrap_or_else ( | | SigningKeys {
verify_keys : BTreeMap ::new ( ) ,
old_verify_keys : BTreeMap ::new ( ) ,
valid_until_ts : MilliSecondsSinceUnixEpoch ::now ( ) ,
} ) ;
// We want to set this to the max, and then lower it whenever we see older keys
keys . valid_until_ts = MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( ) + Duration ::from_secs ( 7 * 86400 ) ,
)
. expect ( "Should be valid until year 500,000,000" ) ;
debug ! ( "Fetching signing keys for {} over federation" , origin ) ;
if let Some ( server_key ) = services ( )
if let Some ( mut server_key ) = services ( )
. sending
. send_federation_request ( origin , get_server_keys ::v2 ::Request ::new ( ) )
. await
. ok ( )
. and_then ( | resp | resp . server_key . deserialize ( ) . ok ( ) )
{
// Keys should only be valid for a maximum of seven days
server_key . valid_until_ts = server_key . valid_until_ts . min (
MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( ) + Duration ::from_secs ( 7 * 86400 ) ,
)
. expect ( "Should be valid until year 500,000,000" ) ,
) ;
services ( )
. globals
. add_signing_key ( origin , server_key . clone ( ) ) ? ;
. add_signing_key _from_origin ( origin , server_key . clone ( ) ) ? ;
result . extend (
if keys . valid_until_ts > server_key . valid_until_ts {
keys . valid_until_ts = server_key . valid_until_ts ;
}
keys . verify_keys . extend (
server_key
. verify_keys
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) ) ,
. map ( | ( id, key ) | ( id . to_string ( ) , key ) ) ,
) ;
result . extend (
keys. old_verify_keys . extend (
server_key
. old_verify_keys
. into_iter ( )
. map ( | ( k, v ) | ( k . to_string ( ) , v . key ) ) ,
. map ( | ( id, key ) | ( id . to_string ( ) , key ) ) ,
) ;
if contains_all_ids ( & result ) {
return Ok ( result ) ;
if contains_all_ids ( & keys ) {
return Ok ( keys ) ;
}
}
for server in services ( ) . globals . trusted_servers ( ) {
debug ! ( "Asking {} for {}'s signing key" , server , origin ) ;
if let Some ( server_keys ) = services ( )
. sending
. send_federation_request (
server ,
get_remote_server_keys ::v2 ::Request ::new (
origin . to_owned ( ) ,
MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( )
. checked_add ( Duration ::from_secs ( 3600 ) )
. expect ( "SystemTime to large" ) ,
)
. expect ( "time is valid" ) ,
) ,
)
. await
. ok ( )
. map ( | resp | {
resp . server_keys
. into_iter ( )
. filter_map ( | e | e . deserialize ( ) . ok ( ) )
. collect ::< Vec < _ > > ( )
} )
{
trace ! ( "Got signing keys: {:?}" , server_keys ) ;
for k in server_keys {
services ( ) . globals . add_signing_key ( origin , k . clone ( ) ) ? ;
result . extend (
k . verify_keys
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) ) ,
) ;
result . extend (
k . old_verify_keys
if query_via_trusted_servers {
for server in services ( ) . globals . trusted_servers ( ) {
debug ! ( "Asking {} for {}'s signing key" , server , origin ) ;
if let Some ( server_keys ) = services ( )
. sending
. send_federation_request (
server ,
get_remote_server_keys ::v2 ::Request ::new (
origin . to_owned ( ) ,
MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( )
. checked_add ( Duration ::from_secs ( 3600 ) )
. expect ( "SystemTime to large" ) ,
)
. expect ( "time is valid" ) ,
) ,
)
. await
. ok ( )
. map ( | resp | {
resp . server_keys
. into_iter ( )
. map ( | ( k , v ) | ( k . to_string ( ) , v . key ) ) ,
) ;
}
. filter_map ( | e | e . deserialize ( ) . ok ( ) )
. collect ::< Vec < _ > > ( )
} )
{
trace ! ( "Got signing keys: {:?}" , server_keys ) ;
for mut k in server_keys {
if k . valid_until_ts
// Half an hour should give plenty of time for the server to respond with keys that are still
// valid, given we requested keys which are valid at least an hour from now
< MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( ) + Duration ::from_secs ( 30 * 60 ) ,
)
. expect ( "Should be valid until year 500,000,000" )
{
// Keys should only be valid for a maximum of seven days
k . valid_until_ts = k . valid_until_ts . min (
MilliSecondsSinceUnixEpoch ::from_system_time (
SystemTime ::now ( ) + Duration ::from_secs ( 7 * 86400 ) ,
)
. expect ( "Should be valid until year 500,000,000" ) ,
) ;
if contains_all_ids ( & result ) {
return Ok ( result ) ;
if keys . valid_until_ts > k . valid_until_ts {
keys . valid_until_ts = k . valid_until_ts ;
}
services ( )
. globals
. add_signing_key_from_trusted_server ( origin , k . clone ( ) ) ? ;
keys . verify_keys . extend (
k . verify_keys
. into_iter ( )
. map ( | ( id , key ) | ( id . to_string ( ) , key ) ) ,
) ;
keys . old_verify_keys . extend (
k . old_verify_keys
. into_iter ( )
. map ( | ( id , key ) | ( id . to_string ( ) , key ) ) ,
) ;
} else {
warn ! (
"Server {} gave us keys older than we requested, valid until: {:?}" ,
origin , k . valid_until_ts
) ;
}
if contains_all_ids ( & keys ) {
return Ok ( keys ) ;
}
}
}
}
}
// We should return these keys if fresher keys were not found
if expires_soon_or_has_expired {
info ! ( "Returning stale keys for {}" , origin ) ;
return Ok ( keys ) ;
}
drop ( permit ) ;
back_off ( signature_ids ) . await ;