diff --git a/src/client_server.rs b/src/client_server.rs index 3083ff24..ab7e5155 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -2763,7 +2763,7 @@ pub fn get_context_route( .filter_map(|r| r.ok()) // Remove buggy events .collect::>(); - let start_token = events_before.last().map_or(Ok(None), |e| { + let start_token = events_before.last().map_or(Ok(None), |(_, e)| { Ok::<_, Error>(Some( db.rooms .get_pdu_count(&e.event_id)? @@ -2774,7 +2774,7 @@ pub fn get_context_route( let events_before = events_before .into_iter() - .map(|pdu| pdu.to_room_event()) + .map(|(_, pdu)| pdu.to_room_event()) .collect::>(); let events_after = db @@ -2789,18 +2789,19 @@ pub fn get_context_route( .filter_map(|r| r.ok()) // Remove buggy events .collect::>(); - let end_token = events_after.last().map_or(Ok(None), |e| { - Ok::<_, Error>(Some( - db.rooms - .get_pdu_count(&e.event_id)? - .ok_or_else(|| Error::bad_database("Can't find count from event in db."))? + let end_token = if let Some(last_event) = events_after.last() { + Some( + utils::u64_from_bytes(&last_event.0) + .map_err(|_| Error::bad_database("Invalid pdu id in db."))? .to_string(), - )) - })?; + ) + } else { + None + }; let events_after = events_after .into_iter() - .map(|pdu| pdu.to_room_event()) + .map(|(_, pdu)| pdu.to_room_event()) .collect::>(); Ok(get_context::Response { @@ -2839,32 +2840,38 @@ pub fn get_message_events_route( .clone() .parse() .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from` value."))?; + + let to = body.to.as_ref().map(|t| t.as_bytes()); + + // Use limit or else 10 let limit = body .limit .try_into() .map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?; + match body.dir { get_message_events::Direction::Forward => { let events_after = db .rooms .pdus_after(&user_id, &body.room_id, from) - // Use limit or else 10 .take(limit) .filter_map(|r| r.ok()) // Filter out buggy events + .take_while(|(k, _)| Some(&**k) != to) // Stop at `to` .collect::>(); - let end_token = events_after.last().map_or(Ok::<_, Error>(None), |e| { - Ok(Some( - db.rooms - .get_pdu_count(&e.event_id)? - .ok_or_else(|| Error::bad_database("Can't find count from event in db."))? + let end_token = if let Some(last_event) = events_after.last() { + Some( + utils::u64_from_bytes(&last_event.0) + .map_err(|_| Error::bad_database("Invalid pdu id in db."))? .to_string(), - )) - })?; + ) + } else { + None + }; let events_after = events_after .into_iter() - .map(|pdu| pdu.to_room_event()) + .map(|(_, pdu)| pdu.to_room_event()) .collect::>(); Ok(get_message_events::Response { @@ -2879,23 +2886,24 @@ pub fn get_message_events_route( let events_before = db .rooms .pdus_until(&user_id, &body.room_id, from) - // Use limit or else 10 .take(limit) .filter_map(|r| r.ok()) // Filter out buggy events + .take_while(|(k, _)| Some(&**k) != to) // Stop at `to` .collect::>(); - let start_token = events_before.last().map_or(Ok::<_, Error>(None), |e| { - Ok(Some( - db.rooms - .get_pdu_count(&e.event_id)? - .ok_or_else(|| Error::bad_database("Can't find count from event in db."))? + let start_token = if let Some(last_event) = events_before.last() { + Some( + utils::u64_from_bytes(&last_event.0) + .map_err(|_| Error::bad_database("Invalid pdu id in db."))? .to_string(), - )) - })?; + ) + } else { + None + }; let events_before = events_before .into_iter() - .map(|pdu| pdu.to_room_event()) + .map(|(_, pdu)| pdu.to_room_event()) .collect::>(); Ok(get_message_events::Response { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5b9659ef..3ef4f3f1 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -654,14 +654,14 @@ impl Rooms { })) } - /// Returns an iterator over all events in a room that happened before the event with id - /// `until` in reverse-chronological order. + /// Returns an iterator over all events and their tokens in a room that happened before the + /// event with id `until` in reverse-chronological order. pub fn pdus_until( &self, user_id: &UserId, room_id: &RoomId, until: u64, - ) -> impl Iterator> { + ) -> impl Iterator> { // Create the first part of the full pdu id let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); @@ -677,24 +677,24 @@ impl Rooms { .rev() .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(_, v)| { + .map(move |(k, v)| { let mut pdu = serde_json::from_slice::(&v) .map_err(|_| Error::bad_database("PDU in db is invalid."))?; if pdu.sender != user_id { pdu.unsigned.remove("transaction_id"); } - Ok(pdu) + Ok((k, pdu)) }) } - /// Returns an iterator over all events in a room that happened after the event with id - /// `from` in chronological order. + /// Returns an iterator over all events and their token in a room that happened after the event + /// with id `from` in chronological order. pub fn pdus_after( &self, user_id: &UserId, room_id: &RoomId, from: u64, - ) -> impl Iterator> { + ) -> impl Iterator> { // Create the first part of the full pdu id let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); @@ -709,13 +709,13 @@ impl Rooms { .range(current..) .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(move |(_, v)| { + .map(move |(k, v)| { let mut pdu = serde_json::from_slice::(&v) .map_err(|_| Error::bad_database("PDU in db is invalid."))?; if pdu.sender != user_id { pdu.unsigned.remove("transaction_id"); } - Ok(pdu) + Ok((k, pdu)) }) }