refactor(sync/v3): Split load_joined_room into smaller functions

This commit is contained in:
Ginger 2025-11-10 12:37:11 -05:00
parent 6246c11265
commit a6d325440c
9 changed files with 625 additions and 343 deletions

View file

@ -388,8 +388,9 @@ pub async fn remote_leave_room<S: ::std::hash::BuildHasher>(
.outlier
.add_pdu_outlier(&event_id, &leave_event);
let leave_pdu = Pdu::from_id_val(&event_id, leave_event)
.map_err(|e| err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}")))?;
let leave_pdu = Pdu::from_id_val(&event_id, leave_event).map_err(|e| {
err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}"))
})?;
Ok(leave_pdu)
}

View file

@ -4,7 +4,7 @@ mod v5;
use std::collections::VecDeque;
use conduwuit::{
Event, PduCount, Result,
Event, PduCount, Result, err,
matrix::pdu::PduEvent,
ref_at, trace,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
@ -54,7 +54,10 @@ async fn load_timeline(
.rooms
.timeline
.last_timeline_count(Some(sender_user), room_id)
.await?;
.await
.map_err(|err| {
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
})?;
if last_timeline_count <= starting_count {
// no messages have been sent in this room since `starting_count`

File diff suppressed because it is too large Load diff

View file

@ -75,6 +75,8 @@ impl DeviceListUpdates {
self.changed.extend(other.changed);
self.left.extend(other.left);
}
fn is_empty(&self) -> bool { self.changed.is_empty() && self.left.is_empty() }
}
impl From<DeviceListUpdates> for DeviceLists {

View file

@ -105,7 +105,11 @@ pub(super) async fn build_state_incremental<'a>(
The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described
by the Matrix specification. This is because the specification's description of the `state` property does not accurately
reflect how Synapse behaves, and therefore how client SDKs behave.
reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include:
1. We do not compute the delta using the naive approach of "every state event from the end of the last sync
up to the start of this sync's timeline". see below for details.
2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined
elsewhere and supplied to this function in the `lazily_loaded_members` parameter.
*/
/*
@ -206,9 +210,11 @@ pub(super) async fn build_state_incremental<'a>(
at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates
computing the incremental state (which may be empty).
NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering
if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon
enough that the performance penalty is acceptable.
NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included
in the incremental state. therefore, the incremental state may include "redundant" membership events,
which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`,
and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that
the performance penalty is acceptable.
*/
trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync");

View file

@ -472,7 +472,7 @@ where
.filter_map(|(read_user, _ts, v)| async move {
services
.users
.user_is_ignored(read_user, sender_user)
.user_is_ignored(&read_user, sender_user)
.await
.or_some(v)
})

View file

@ -7,7 +7,7 @@ use conduwuit::{
use database::{Deserialized, Json, Map};
use futures::{Stream, StreamExt};
use ruma::{
CanonicalJsonObject, RoomId, UserId,
CanonicalJsonObject, OwnedUserId, RoomId, UserId,
events::{AnySyncEphemeralRoomEvent, receipt::ReceiptEvent},
serde::Raw,
};
@ -25,7 +25,7 @@ struct Services {
globals: Dep<globals::Service>,
}
pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw<AnySyncEphemeralRoomEvent>);
pub(super) type ReceiptItem = (OwnedUserId, u64, Raw<AnySyncEphemeralRoomEvent>);
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
@ -65,7 +65,7 @@ impl Data {
&'a self,
room_id: &'a RoomId,
since: u64,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
type Key<'a> = (&'a RoomId, u64, &'a UserId);
type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
@ -81,7 +81,7 @@ impl Data {
let event = serde_json::value::to_raw_value(&json)?;
Ok((user_id, count, Raw::from_json(event)))
Ok((user_id.to_owned(), count, Raw::from_json(event)))
})
.ignore_err()
}

View file

@ -112,7 +112,7 @@ impl Service {
&'a self,
room_id: &'a RoomId,
since: Option<u64>,
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
) -> impl Stream<Item = ReceiptItem> + Send + 'a {
self.db.readreceipts_since(room_id, since.unwrap_or(0))
}

View file

@ -530,7 +530,7 @@ impl Service {
}
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {
if !self.services.globals.user_is_local(&user_id) {
continue;
}
@ -554,7 +554,7 @@ impl Service {
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(user_id)
.remove(&user_id)
.expect("our read receipts always have the user here");
let receipt_data = ReceiptData {
@ -562,7 +562,7 @@ impl Service {
event_ids: vec![event_id.clone()],
};
if read.insert(user_id.to_owned(), receipt_data).is_none() {
if read.insert(user_id, receipt_data).is_none() {
*num = num.saturating_add(1);
if *num >= SELECT_RECEIPT_LIMIT {
break;