diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index 69147c9a..cd770bd1 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -56,6 +56,7 @@ pub async fn send_event_to_device_route( }, )) .expect("DirectToDevice EDU can be serialized"), + db.globals.next_count()?, )?; continue; diff --git a/src/database/sending.rs b/src/database/sending.rs index 31a1f67e..1050c077 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -84,8 +84,8 @@ pub enum SendingEventType { pub struct Sending { /// The state for a given state hash. pub(super) servername_educount: Arc, // EduCount: Count of last EDU sync - pub(super) servernameevent_data: Arc, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / * (for edus), Data = EDU content - pub(super) servercurrentevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / * (for edus), Data = EDU content + pub(super) servernameevent_data: Arc, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content + pub(super) servercurrentevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content pub(super) maximum_requests: Arc, pub sender: mpsc::UnboundedSender<(Vec, Vec)>, } @@ -435,10 +435,15 @@ impl Sending { } #[tracing::instrument(skip(self, server, serialized))] - pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec) -> Result<()> { + pub fn send_reliable_edu( + &self, + server: &ServerName, + serialized: Vec, + id: u64, + ) -> Result<()> { let mut key = server.as_bytes().to_vec(); key.push(0xff); - key.push(b'*'); + key.extend_from_slice(&id.to_be_bytes()); self.servernameevent_data.insert(&key, &serialized)?; self.sender.unbounded_send((key, serialized)).unwrap(); @@ -714,10 +719,10 @@ impl Sending { OutgoingKind::Appservice(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), - if event.starts_with(b"*") { - SendingEventType::Edu(value) - } else { + if value.is_empty() { SendingEventType::Pdu(event.to_vec()) + } else { + SendingEventType::Edu(value) }, ) } else if key.starts_with(b"$") { @@ -732,10 +737,10 @@ impl Sending { .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; ( OutgoingKind::Push(user.to_vec(), pushkey.to_vec()), - if event.starts_with(b"*") { - SendingEventType::Edu(value) - } else { + if value.is_empty() { SendingEventType::Pdu(event.to_vec()) + } else { + SendingEventType::Edu(value) }, ) } else { @@ -753,10 +758,10 @@ impl Sending { OutgoingKind::Normal(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), - if event.starts_with(b"*") { - SendingEventType::Edu(event[1..].to_vec()) - } else { + if value.is_empty() { SendingEventType::Pdu(event.to_vec()) + } else { + SendingEventType::Edu(value) }, ) })