fix: Correctly send limited timelines again
This commit is contained in:
parent
3491f653a5
commit
72bf8e5927
4 changed files with 47 additions and 29 deletions
|
|
@ -1,6 +1,8 @@
|
|||
mod v3;
|
||||
mod v5;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use conduwuit::{
|
||||
PduCount, Result,
|
||||
matrix::pdu::PduEvent,
|
||||
|
|
@ -23,7 +25,7 @@ pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
|
|||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TimelinePdus {
|
||||
pub pdus: Vec<(PduCount, PduEvent)>,
|
||||
pub pdus: VecDeque<(PduCount, PduEvent)>,
|
||||
pub limited: bool,
|
||||
}
|
||||
|
||||
|
|
@ -35,27 +37,36 @@ async fn load_timeline(
|
|||
ending_count: Option<PduCount>,
|
||||
limit: usize,
|
||||
) -> Result<TimelinePdus> {
|
||||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.await?;
|
||||
|
||||
let mut pdus_between_counts = match starting_count {
|
||||
let mut pdu_stream = match starting_count {
|
||||
| Some(starting_count) => {
|
||||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.await?;
|
||||
|
||||
if last_timeline_count <= starting_count {
|
||||
// no messages have been sent in this room since `starting_count`
|
||||
return Ok(TimelinePdus::default());
|
||||
}
|
||||
trace!(?last_timeline_count, ?starting_count, ?ending_count);
|
||||
|
||||
// Stream from the DB all PDUs which were sent after `starting_count` but before
|
||||
// `ending_count`, including both endpoints
|
||||
// for incremental sync, stream from the DB all PDUs which were sent after
|
||||
// `starting_count` but before `ending_count`, including `ending_count` but
|
||||
// not `starting_count`. this code is pretty similar to the initial sync
|
||||
// branch, they're separate to allow for future optimization
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(starting_count))
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.ignore_err()
|
||||
.ready_take_while(|&(pducount, _)| {
|
||||
pducount <= ending_count.unwrap_or_else(PduCount::max)
|
||||
.ready_take_while(move |&(pducount, ref pdu)| {
|
||||
trace!(?pducount, ?pdu, "glubbins");
|
||||
pducount > starting_count
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
|
|
@ -65,21 +76,28 @@ async fn load_timeline(
|
|||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, ending_count)
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.ignore_err()
|
||||
.boxed()
|
||||
},
|
||||
};
|
||||
|
||||
// Return at most `limit` PDUs from the stream
|
||||
let mut pdus: Vec<_> = pdus_between_counts.by_ref().take(limit).collect().await;
|
||||
if starting_count.is_none() {
|
||||
// `pdus_rev` returns PDUs in reverse order. fix that here
|
||||
pdus.reverse();
|
||||
}
|
||||
// The timeline is limited if more than `limit` PDUs exist in the DB after
|
||||
// `starting_count`
|
||||
let limited = pdus_between_counts.next().await.is_some();
|
||||
let pdus = pdu_stream
|
||||
.by_ref()
|
||||
.take(limit)
|
||||
.ready_fold(VecDeque::with_capacity(limit), |mut pdus, item| {
|
||||
pdus.push_front(item);
|
||||
pdus
|
||||
})
|
||||
.await;
|
||||
|
||||
// The timeline is limited if there are still more PDUs in the stream
|
||||
let limited = pdu_stream.next().await.is_some();
|
||||
|
||||
trace!(
|
||||
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ pub(super) async fn load_joined_room(
|
|||
let is_initial_sync = since_shortstatehash.is_none();
|
||||
|
||||
let timeline_start_shortstatehash = async {
|
||||
if let Some((_, pdu)) = timeline_pdus.first() {
|
||||
if let Some((_, pdu)) = timeline_pdus.front() {
|
||||
if let Ok(shortstatehash) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
|
|
@ -349,7 +349,7 @@ pub(super) async fn load_joined_room(
|
|||
.flatten();
|
||||
|
||||
let prev_batch = timeline_pdus
|
||||
.first()
|
||||
.front()
|
||||
.map(at!(0))
|
||||
.or_else(|| joined_sender_member.is_some().and(since).map(Into::into));
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
cmp::{self, Ordering},
|
||||
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
|
||||
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
|
||||
ops::Deref,
|
||||
time::Duration,
|
||||
};
|
||||
|
|
@ -411,7 +411,7 @@ where
|
|||
.await
|
||||
.ok();
|
||||
|
||||
(timeline_pdus, limited) = (Vec::new(), true);
|
||||
(timeline_pdus, limited) = (VecDeque::new(), true);
|
||||
} else {
|
||||
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
|
||||
services,
|
||||
|
|
@ -501,7 +501,7 @@ where
|
|||
}
|
||||
|
||||
let prev_batch = timeline_pdus
|
||||
.first()
|
||||
.front()
|
||||
.map_or(Ok::<_, Error>(None), |(pdu_count, _)| {
|
||||
Ok(Some(match pdu_count {
|
||||
| PduCount::Backfilled(_) => {
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ impl Service {
|
|||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
||||
}
|
||||
|
||||
/// Reverse iteration starting at from.
|
||||
/// Reverse iteration starting after `until`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
|
|
@ -255,7 +255,7 @@ impl Service {
|
|||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
||||
}
|
||||
|
||||
/// Forward iteration starting at from.
|
||||
/// Forward iteration starting after `from`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus<'a>(
|
||||
&'a self,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue