diff --git a/src/client_server/message.rs b/src/client_server/message.rs index d8512148..844f44d1 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -1,10 +1,13 @@ use super::State; -use crate::{pdu::PduBuilder, ConduitResult, Database, Error, Ruma}; -use ruma::api::client::{ - error::ErrorKind, - r0::message::{get_message_events, send_message_event}, +use crate::{pdu::PduBuilder, utils, ConduitResult, Database, Error, Ruma}; +use ruma::{ + api::client::{ + error::ErrorKind, + r0::message::{get_message_events, send_message_event}, + }, + EventId, }; -use std::convert::TryInto; +use std::convert::{TryFrom, TryInto}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -18,6 +21,29 @@ pub fn send_message_event_route( body: Ruma, ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); + let device_id = body.device_id.as_ref().expect("user is authenticated"); + + // Check if this is a new transaction id + if let Some(response) = db + .transaction_ids + .existing_txnid(sender_id, device_id, &body.txn_id)? + { + // The client might have sent a txnid of the /sendToDevice endpoint + // This txnid has no response associated with it + if response.is_empty() { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Tried to use txn id already used for an incompatible endpoint.", + )); + } + + let event_id = EventId::try_from( + utils::string_from_bytes(&response) + .map_err(|_| Error::bad_database("Invalid txnid bytes in database."))?, + ) + .map_err(|_| Error::bad_database("Invalid event id in txnid data."))?; + return Ok(send_message_event::Response { event_id }.into()); + } let mut unsigned = serde_json::Map::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); @@ -29,6 +55,7 @@ pub fn send_message_event_route( event_type: body.event_type.clone(), content: serde_json::from_str( body.json_body + .as_ref() .ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))? .get(), ) @@ -41,6 +68,8 @@ pub fn send_message_event_route( &db.account_data, )?; + db.transaction_ids + .add_txnid(sender_id, device_id, &body.txn_id, event_id.as_bytes())?; Ok(send_message_event::Response { event_id }.into()) } diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index ca423fef..8c06d64e 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -17,6 +17,16 @@ pub fn send_event_to_device_route( body: Ruma, ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); + let device_id = body.device_id.as_ref().expect("user is authenticated"); + + // Check if this is a new transaction id + if db + .transaction_ids + .existing_txnid(sender_id, device_id, &body.txn_id)? + .is_some() + { + return Ok(send_event_to_device::Response.into()); + } for (target_user_id, map) in &body.messages { for (target_device_id_maybe, event) in map { @@ -52,5 +62,9 @@ pub fn send_event_to_device_route( } } + // Save transaction id with empty data + db.transaction_ids + .add_txnid(sender_id, device_id, &body.txn_id, &[])?; + Ok(send_event_to_device::Response.into()) } diff --git a/src/database.rs b/src/database.rs index 41781b95..b43cc5b0 100644 --- a/src/database.rs +++ b/src/database.rs @@ -3,6 +3,7 @@ pub mod globals; pub mod key_backups; pub mod media; pub mod rooms; +pub mod transaction_ids; pub mod uiaa; pub mod users; @@ -23,6 +24,7 @@ pub struct Database { pub account_data: account_data::AccountData, pub media: media::Media, pub key_backups: key_backups::KeyBackups, + pub transaction_ids: transaction_ids::TransactionIds, pub _db: sled::Db, } @@ -90,7 +92,8 @@ impl Database { edus: rooms::RoomEdus { readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt - roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?, + roomuserid_lastprivatereadupdate: db + .open_tree("roomid_lastprivatereadupdate")?, typingid_userid: db.open_tree("typingid_userid")?, roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, presenceid_presence: db.open_tree("presenceid_presence")?, @@ -124,6 +127,9 @@ impl Database { backupid_etag: db.open_tree("backupid_etag")?, backupkeyid_backup: db.open_tree("backupkeyid_backupmetadata")?, }, + transaction_ids: transaction_ids::TransactionIds { + userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?, + }, _db: db, }) } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index bb14c8a5..8cfb6129 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -621,7 +621,8 @@ impl Rooms { } _ => {} } - self.edus.private_read_set(&room_id, &sender, index, &globals)?; + self.edus + .private_read_set(&room_id, &sender, index, &globals)?; Ok(pdu.event_id) } diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index fbd3edb6..d60e1f16 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -92,7 +92,13 @@ impl RoomEdus { } /// Sets a private read marker at `count`. - pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> { + pub fn private_read_set( + &self, + room_id: &RoomId, + user_id: &UserId, + count: u64, + globals: &super::super::globals::Globals, + ) -> Result<()> { let mut key = room_id.to_string().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(&user_id.to_string().as_bytes()); diff --git a/src/database/transaction_ids.rs b/src/database/transaction_ids.rs new file mode 100644 index 00000000..9485b361 --- /dev/null +++ b/src/database/transaction_ids.rs @@ -0,0 +1,43 @@ +use crate::Result; +use ruma::{DeviceId, UserId}; +use sled::IVec; + +pub struct TransactionIds { + pub(super) userdevicetxnid_response: sled::Tree, // Response can be empty (/sendToDevice) or the event id (/send) +} + +impl TransactionIds { + pub fn add_txnid( + &self, + user_id: &UserId, + device_id: &DeviceId, + txn_id: &str, + data: &[u8], + ) -> Result<()> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(txn_id.as_bytes()); + + self.userdevicetxnid_response.insert(key, data)?; + + Ok(()) + } + + pub fn existing_txnid( + &self, + user_id: &UserId, + device_id: &DeviceId, + txn_id: &str, + ) -> Result> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(txn_id.as_bytes()); + + // If there's no entry, this is a new transaction + Ok(self.userdevicetxnid_response.get(key)?) + } +} diff --git a/src/pdu.rs b/src/pdu.rs index 44584237..c948fef4 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -71,7 +71,9 @@ impl PduEvent { self.unsigned.insert( "redacted_because".to_owned(), - serde_json::to_string(reason).expect("PduEvent::to_string always works").into() + serde_json::to_string(reason) + .expect("PduEvent::to_string always works") + .into(), ); self.content = new_content.into(); diff --git a/sytest/sytest-whitelist b/sytest/sytest-whitelist index b0b20975..15852330 100644 --- a/sytest/sytest-whitelist +++ b/sytest/sytest-whitelist @@ -38,6 +38,7 @@ Current state appears in timeline in private history with many messages before Deleted tags appear in an incremental v2 /sync Deleting a non-existent alias should return a 404 Device messages wake up /sync +Device messages with the same txn_id are deduplicated Events come down the correct room GET /device/{deviceId} GET /device/{deviceId} gives a 404 for unknown devices