From 91d07a9bfc4ef97dc876bc73f82b1cafb5fe24fb Mon Sep 17 00:00:00 2001 From: Ginger Date: Wed, 22 Oct 2025 13:06:33 -0400 Subject: [PATCH] fix: Correctly send limited timelines again --- src/api/client/sync/mod.rs | 62 ++++++++++++++++++++----------- src/api/client/sync/v3/joined.rs | 4 +- src/api/client/sync/v5.rs | 6 +-- src/service/rooms/timeline/mod.rs | 4 +- 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 679c6f9e..92c39253 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -1,6 +1,8 @@ mod v3; mod v5; +use std::collections::VecDeque; + use conduwuit::{ PduCount, Result, matrix::pdu::PduEvent, @@ -23,7 +25,7 @@ pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] = #[derive(Default)] pub(crate) struct TimelinePdus { - pub pdus: Vec<(PduCount, PduEvent)>, + pub pdus: VecDeque<(PduCount, PduEvent)>, pub limited: bool, } @@ -35,27 +37,36 @@ async fn load_timeline( ending_count: Option, limit: usize, ) -> Result { - let last_timeline_count = services - .rooms - .timeline - .last_timeline_count(Some(sender_user), room_id) - .await?; - - let mut pdus_between_counts = match starting_count { + let mut pdu_stream = match starting_count { | Some(starting_count) => { + let last_timeline_count = services + .rooms + .timeline + .last_timeline_count(Some(sender_user), room_id) + .await?; + if last_timeline_count <= starting_count { + // no messages have been sent in this room since `starting_count` return Ok(TimelinePdus::default()); } + trace!(?last_timeline_count, ?starting_count, ?ending_count); - // Stream from the DB all PDUs which were sent after `starting_count` but before - // `ending_count`, including both endpoints + // for incremental sync, stream from the DB all PDUs which were sent after + // `starting_count` but before `ending_count`, including `ending_count` but + // not `starting_count`. this code is pretty similar to the initial sync + // branch, they're separate to allow for future optimization services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(starting_count)) + .pdus_rev( + Some(sender_user), + room_id, + ending_count.map(|count| count.saturating_add(1)), + ) .ignore_err() - .ready_take_while(|&(pducount, _)| { - pducount <= ending_count.unwrap_or_else(PduCount::max) + .ready_take_while(move |&(pducount, ref pdu)| { + trace!(?pducount, ?pdu, "glubbins"); + pducount > starting_count }) .boxed() }, @@ -65,21 +76,28 @@ async fn load_timeline( services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, ending_count) + .pdus_rev( + Some(sender_user), + room_id, + ending_count.map(|count| count.saturating_add(1)), + ) .ignore_err() .boxed() }, }; // Return at most `limit` PDUs from the stream - let mut pdus: Vec<_> = pdus_between_counts.by_ref().take(limit).collect().await; - if starting_count.is_none() { - // `pdus_rev` returns PDUs in reverse order. fix that here - pdus.reverse(); - } - // The timeline is limited if more than `limit` PDUs exist in the DB after - // `starting_count` - let limited = pdus_between_counts.next().await.is_some(); + let pdus = pdu_stream + .by_ref() + .take(limit) + .ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| { + pdus.push_front(item); + pdus + }) + .await; + + // The timeline is limited if there are still more PDUs in the stream + let limited = pdu_stream.next().await.is_some(); trace!( "syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})", diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 92ad4c35..34cb917f 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -125,7 +125,7 @@ pub(super) async fn load_joined_room( let is_initial_sync = since_shortstatehash.is_none(); let timeline_start_shortstatehash = async { - if let Some((_, pdu)) = timeline_pdus.first() { + if let Some((_, pdu)) = timeline_pdus.front() { if let Ok(shortstatehash) = services .rooms .state_accessor @@ -349,7 +349,7 @@ pub(super) async fn load_joined_room( .flatten(); let prev_batch = timeline_pdus - .first() + .front() .map(at!(0)) .or_else(|| joined_sender_member.is_some().and(since).map(Into::into)); diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index fbebebf2..76318841 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1,6 +1,6 @@ use std::{ cmp::{self, Ordering}, - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, ops::Deref, time::Duration, }; @@ -411,7 +411,7 @@ where .await .ok(); - (timeline_pdus, limited) = (Vec::new(), true); + (timeline_pdus, limited) = (VecDeque::new(), true); } else { TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline( services, @@ -501,7 +501,7 @@ where } let prev_batch = timeline_pdus - .first() + .front() .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { Ok(Some(match pdu_count { | PduCount::Backfilled(_) => { diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 70c98a09..aac8f30c 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -243,7 +243,7 @@ impl Service { self.pdus(Some(user_id), room_id, None).ignore_err() } - /// Reverse iteration starting at from. + /// Reverse iteration starting after `until`. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus_rev<'a>( &'a self, @@ -255,7 +255,7 @@ impl Service { .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) } - /// Forward iteration starting at from. + /// Forward iteration starting after `from`. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus<'a>( &'a self,