|
|
|
@ -238,30 +238,41 @@ async fn sync_helper(
|
|
|
|
|
let insert_lock = mutex_insert.lock().unwrap();
|
|
|
|
|
drop(insert_lock);
|
|
|
|
|
|
|
|
|
|
let mut non_timeline_pdus = db
|
|
|
|
|
.rooms
|
|
|
|
|
.pdus_until(&sender_user, &room_id, u64::MAX)?
|
|
|
|
|
.filter_map(|r| {
|
|
|
|
|
// Filter out buggy events
|
|
|
|
|
if r.is_err() {
|
|
|
|
|
error!("Bad pdu in pdus_since: {:?}", r);
|
|
|
|
|
}
|
|
|
|
|
r.ok()
|
|
|
|
|
})
|
|
|
|
|
.take_while(|(pduid, _)| {
|
|
|
|
|
db.rooms
|
|
|
|
|
.pdu_count(pduid)
|
|
|
|
|
.map_or(false, |count| count > since)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Take the last 10 events for the timeline
|
|
|
|
|
let timeline_pdus: Vec<_> = non_timeline_pdus
|
|
|
|
|
.by_ref()
|
|
|
|
|
.take(10)
|
|
|
|
|
.collect::<Vec<_>>()
|
|
|
|
|
.into_iter()
|
|
|
|
|
.rev()
|
|
|
|
|
.collect();
|
|
|
|
|
let timeline_pdus;
|
|
|
|
|
let limited;
|
|
|
|
|
if db.rooms.last_timeline_count(&sender_user, &room_id)? > since {
|
|
|
|
|
let mut non_timeline_pdus = db
|
|
|
|
|
.rooms
|
|
|
|
|
.pdus_until(&sender_user, &room_id, u64::MAX)?
|
|
|
|
|
.filter_map(|r| {
|
|
|
|
|
// Filter out buggy events
|
|
|
|
|
if r.is_err() {
|
|
|
|
|
error!("Bad pdu in pdus_since: {:?}", r);
|
|
|
|
|
}
|
|
|
|
|
r.ok()
|
|
|
|
|
})
|
|
|
|
|
.take_while(|(pduid, _)| {
|
|
|
|
|
db.rooms
|
|
|
|
|
.pdu_count(pduid)
|
|
|
|
|
.map_or(false, |count| count > since)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Take the last 10 events for the timeline
|
|
|
|
|
timeline_pdus = non_timeline_pdus
|
|
|
|
|
.by_ref()
|
|
|
|
|
.take(10)
|
|
|
|
|
.collect::<Vec<_>>()
|
|
|
|
|
.into_iter()
|
|
|
|
|
.rev()
|
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
|
|
// They /sync response doesn't always return all messages, so we say the output is
|
|
|
|
|
// limited unless there are events in non_timeline_pdus
|
|
|
|
|
limited = non_timeline_pdus.next().is_some();
|
|
|
|
|
} else {
|
|
|
|
|
timeline_pdus = Vec::new();
|
|
|
|
|
limited = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let send_notification_counts = !timeline_pdus.is_empty()
|
|
|
|
|
|| db
|
|
|
|
@ -270,10 +281,6 @@ async fn sync_helper(
|
|
|
|
|
.last_privateread_update(&sender_user, &room_id)?
|
|
|
|
|
> since;
|
|
|
|
|
|
|
|
|
|
// They /sync response doesn't always return all messages, so we say the output is
|
|
|
|
|
// limited unless there are events in non_timeline_pdus
|
|
|
|
|
let limited = non_timeline_pdus.next().is_some();
|
|
|
|
|
|
|
|
|
|
let mut timeline_users = HashSet::new();
|
|
|
|
|
for (_, event) in &timeline_pdus {
|
|
|
|
|
timeline_users.insert(event.sender.as_str().to_owned());
|
|
|
|
|