diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 4667f25d..68a3ea6b 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -619,16 +619,9 @@ async fn join_room_by_id_helper( // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = db.rooms.append_to_state(&pdu, &db.globals)?; - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - db.rooms.append_pdu( &pdu, utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"), - count, - &pdu_id, &[pdu.event_id.clone()], db, )?; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index af0761ff..75ef3344 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -666,11 +666,10 @@ impl Rooms { &self, pdu: &PduEvent, mut pdu_json: CanonicalJsonObject, - count: u64, - pdu_id: &[u8], leaves: &[EventId], db: &Database, - ) -> Result<()> { + ) -> Result> { + // returns pdu id // Make unsigned fields correct. This is not properly documented in the spec, but state // events need to have previous content in the unsigned field, so clients can easily // interpret things like membership changes @@ -708,20 +707,30 @@ impl Rooms { self.replace_pdu_leaves(&pdu.room_id, leaves)?; + let count1 = db.globals.next_count()?; // Mark as read first so the sending client doesn't get a notification even if appending // fails self.edus - .private_read_set(&pdu.room_id, &pdu.sender, count, &db.globals)?; + .private_read_set(&pdu.room_id, &pdu.sender, count1, &db.globals)?; self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; + let count2 = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count2.to_be_bytes()); + + // There's a brief moment of time here where the count is updated but the pdu does not + // exist. This could theoretically lead to dropped pdus, but it's extremely rare + self.pduid_pdu.insert( - pdu_id, + &pdu_id, &serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"), )?; // This also replaces the eventid of any outliers with the correct // pduid, removing the place holder. - self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?; + self.eventid_pduid + .insert(pdu.event_id.as_bytes(), &pdu_id)?; // See if the event matches any known pushers for user in db @@ -909,7 +918,7 @@ impl Rooms { _ => {} } - Ok(()) + Ok(pdu_id) } pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { @@ -1354,11 +1363,9 @@ impl Rooms { // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = self.append_to_state(&pdu, &db.globals)?; - self.append_pdu( + let pdu_id = self.append_pdu( &pdu, pdu_json, - count, - &pdu_id, // Since this PDU references all pdu_leaves we can update the leaves // of the room &[pdu.event_id.clone()], diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 4f6318a2..a4beac64 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -320,6 +320,7 @@ where }), Err(e) => { warn!("{:?}", e); + // Bad Json Failure((Status::new(583), ())) } } diff --git a/src/server_server.rs b/src/server_server.rs index 458e32d0..3515d857 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1579,21 +1579,14 @@ pub(crate) fn append_incoming_pdu( new_room_leaves: HashSet, state: &StateMap>, ) -> Result> { - let count = db.globals.next_count()?; - let mut pdu_id = pdu.room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. db.rooms .set_event_state(&pdu.event_id, state, &db.globals)?; - db.rooms.append_pdu( + let pdu_id = db.rooms.append_pdu( pdu, pdu_json, - count, - &pdu_id, &new_room_leaves.into_iter().collect::>(), &db, )?;