From 0eff173c0bf064cdf6dfea4e1ac10d41a58b60a0 Mon Sep 17 00:00:00 2001 From: Ginger Date: Fri, 24 Oct 2025 14:57:31 -0400 Subject: [PATCH] fix(sync/v3): Further cleanup + improve incremental sync consistency --- src/api/client/message.rs | 10 +- src/api/client/sync/mod.rs | 6 +- src/api/client/sync/v3/joined.rs | 640 ++++++++++++++-------- src/service/rooms/lazy_loading/mod.rs | 8 +- src/service/rooms/short/mod.rs | 35 +- src/service/rooms/state_accessor/state.rs | 26 +- src/service/rooms/state_compressor/mod.rs | 2 +- src/service/rooms/timeline/mod.rs | 4 +- 8 files changed, 455 insertions(+), 276 deletions(-) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 3e6c7336..4af3fae5 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -16,7 +16,7 @@ use conduwuit_service::{ Services, rooms::{ lazy_loading, - lazy_loading::{Options, Witness}, + lazy_loading::{MemberSet, Options}, timeline::PdusIterItem, }, }; @@ -162,7 +162,7 @@ pub(crate) async fn get_message_events_route( let state = witness .map(Option::into_iter) - .map(|option| option.flat_map(Witness::into_iter)) + .map(|option| option.flat_map(MemberSet::into_iter)) .map(IterStream::stream) .into_stream() .flatten() @@ -192,7 +192,7 @@ pub(crate) async fn lazy_loading_witness<'a, I>( services: &Services, lazy_loading_context: &lazy_loading::Context<'_>, events: I, -) -> Witness +) -> MemberSet where I: Iterator + Clone + Send, { @@ -216,7 +216,7 @@ where .readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned())); pin_mut!(receipts); - let witness: Witness = events + let witness: MemberSet = events .stream() .map(ref_at!(1)) .map(Event::sender) @@ -232,7 +232,7 @@ where services .rooms .lazy_loading - .witness_retain(witness, lazy_loading_context) + .retain_lazy_members(witness, lazy_loading_context) .await } diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 92c39253..5b853ec2 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -49,7 +49,6 @@ async fn load_timeline( // no messages have been sent in this room since `starting_count` return Ok(TimelinePdus::default()); } - trace!(?last_timeline_count, ?starting_count, ?ending_count); // for incremental sync, stream from the DB all PDUs which were sent after // `starting_count` but before `ending_count`, including `ending_count` but @@ -64,10 +63,7 @@ async fn load_timeline( ending_count.map(|count| count.saturating_add(1)), ) .ignore_err() - .ready_take_while(move |&(pducount, ref pdu)| { - trace!(?pducount, ?pdu, "glubbins"); - pducount > starting_count - }) + .ready_take_while(move |&(pducount, _)| pducount > starting_count) .boxed() }, | None => { diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 34cb917f..9255a7cd 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -1,4 +1,7 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, + ops::ControlFlow, +}; use conduwuit::{ Result, at, err, extract_variant, is_equal_to, @@ -10,23 +13,23 @@ use conduwuit::{ result::FlatOk, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, - future::OptionStream, math::ruma_from_u64, - stream::{BroadbandExt, Tools, WidebandExt}, + stream::{BroadbandExt, Tools, TryIgnore, WidebandExt}, }, }; use conduwuit_service::{ Services, rooms::{ lazy_loading, - lazy_loading::{Options, Witness}, + lazy_loading::{MemberSet, Options}, short::ShortStateHash, }, }; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join, join3, join4, try_join4}, + future::{OptionFuture, join, join3, join4, try_join}, }; +use itertools::Itertools; use ruma::{ OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, api::client::sync::sync_events::{ @@ -41,7 +44,7 @@ use ruma::{ serde::Raw, uint, }; -use service::rooms::short::{ShortEventId, ShortStateKey}; +use service::rooms::short::ShortEventId; use tracing::trace; use super::{load_timeline, share_encrypted_room}; @@ -50,6 +53,7 @@ use crate::client::{ sync::v3::{DeviceListUpdates, SyncContext}, }; +/// Generate the sync response for a room the user is joined to. #[tracing::instrument( name = "joined", level = "debug", @@ -61,43 +65,61 @@ use crate::client::{ #[allow(clippy::too_many_arguments)] pub(super) async fn load_joined_room( services: &Services, - SyncContext { + sync_context: SyncContext<'_>, + ref room_id: OwnedRoomId, +) -> Result<(JoinedRoom, DeviceListUpdates)> { + /* + this is a large function with a lot of logic. we try to parallelize as much as possible + by fetching data concurrently, so the code is roughly split into stages separated by calls to `join`. + + 1. `current_shortstatehash` and `since_shortstatehash` are fetched from the DB. a shortstatehash is + a token which identifies the state of the room at a point in time. + 2. `load_timeline` is called to fetch timeline events that happened since `since`. + 3. + */ + + let SyncContext { sender_user, sender_device, since, next_batch, full_state, filter, - }: SyncContext<'_>, - ref room_id: OwnedRoomId, -) -> Result<(JoinedRoom, DeviceListUpdates)> { + } = sync_context; + let mut device_list_updates = DeviceListUpdates::new(); - let sincecount = since.map(PduCount::Normal); let next_batchcount = PduCount::Normal(next_batch); - // the shortstatehash of the room's state right now + // the room state right now let current_shortstatehash = services .rooms .state .get_room_shortstatehash(room_id) .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); - // the shortstatehash of what the room's state was when the `since` token was - // issued - let since_shortstatehash = OptionFuture::from(since.map(|since| { - services + // the global count and room state as of the end of the last sync. + // this will be None if we are doing an initial sync. + let previous_sync_end = OptionFuture::from(since.map(|since| async move { + let previous_sync_end_count = PduCount::Normal(since); + + let previous_sync_end_shortstatehash = services .rooms .user .get_token_shortstatehash(room_id, since) - .ok() + .await?; + + Ok((previous_sync_end_count, previous_sync_end_shortstatehash)) })) - .map(|v| Ok(v.flatten())); + .map(Option::transpose); + + let (current_shortstatehash, previous_sync_end) = + try_join(current_shortstatehash, previous_sync_end).await?; let timeline = load_timeline( services, sender_user, room_id, - sincecount, + previous_sync_end.map(at!(0)), Some(next_batchcount), 10_usize, ); @@ -116,16 +138,11 @@ pub(super) async fn load_joined_room( .collect::>>() .map(Ok); - let (current_shortstatehash, since_shortstatehash, timeline, receipt_events) = - try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events) - .boxed() - .await?; - - let TimelinePdus { pdus: timeline_pdus, limited } = timeline; - let is_initial_sync = since_shortstatehash.is_none(); + let (timeline, receipt_events) = try_join(timeline, receipt_events).boxed().await?; + // the state at the beginning of the timeline let timeline_start_shortstatehash = async { - if let Some((_, pdu)) = timeline_pdus.front() { + if let Some((_, pdu)) = timeline.pdus.front() { if let Ok(shortstatehash) = services .rooms .state_accessor @@ -139,7 +156,8 @@ pub(super) async fn load_joined_room( current_shortstatehash }; - let last_notification_read: OptionFuture<_> = timeline_pdus + let last_notification_read: OptionFuture<_> = timeline + .pdus .is_empty() .then(|| { services @@ -149,12 +167,18 @@ pub(super) async fn load_joined_room( }) .into(); - let since_sender_member: OptionFuture<_> = since_shortstatehash - .map(|short| { + // the syncing user's membership event during the last sync + let membership_during_previous_sync: OptionFuture<_> = previous_sync_end + .map(at!(1)) + .map(|shortstatehash| { services .rooms .state_accessor - .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) + .state_get_content( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) .ok() }) .into(); @@ -167,24 +191,27 @@ pub(super) async fn load_joined_room( let ( last_notification_read, - since_sender_member, + membership_during_previous_sync, timeline_start_shortstatehash, is_encrypted_room, ) = join4( last_notification_read, - since_sender_member, + membership_during_previous_sync, timeline_start_shortstatehash, is_encrypted_room, ) .await; - let joined_since_last_sync = - since_sender_member - .flatten() - .is_none_or(|content: RoomMemberEventContent| { - content.membership != MembershipState::Join - }); + // TODO: If the requesting user got state-reset out of the room, this + // will be `true` when it shouldn't be. this function should never be called + // in that situation, but it may be if the membership cache didn't get updated. + // the root cause of this needs to be addressed + let joined_since_last_sync = membership_during_previous_sync.flatten().is_none_or( + |content: RoomMemberEventContent| content.membership != MembershipState::Join, + ); + // lazy loading is only enabled if the filter allows for it and we aren't + // requesting the full state let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled() || filter.room.timeline.lazy_load_options.is_enabled()) && !full_state; @@ -197,8 +224,11 @@ pub(super) async fn load_joined_room( options: Some(&filter.room.state.lazy_load_options), }; - let lazy_loading_witness = OptionFuture::from(lazy_loading_enabled.then(|| { - let witness: Witness = timeline_pdus + // the user IDs of members whose membership needs to be sent to the client, if + // lazy-loading is enabled. + let lazily_loaded_members = OptionFuture::from(lazy_loading_enabled.then(|| { + let witness: MemberSet = timeline + .pdus .iter() .map(ref_at!(1)) .map(Event::sender) @@ -209,118 +239,61 @@ pub(super) async fn load_joined_room( services .rooms .lazy_loading - .witness_retain(witness, lazy_loading_context) + .retain_lazy_members(witness, lazy_loading_context) })) .await; - /* replace with multiple steps - glossary for my own sanity: - - full state: every state event from the start of the room to the start of the timeline - - incremental state: state events from `since` to the start of the timeline - - state_events: the `state` key on the JSON object we return + // reset lazy loading state on initial sync + if previous_sync_end.is_none() { + services + .rooms + .lazy_loading + .reset(lazy_loading_context) + .await; + } - if initial sync or full_state: - get full state - use full state as state_events - else if TL is limited: - get incremental state - use incremental state as state_events - if encryption is enabled: - use incremental state to extend device list - else: - state_events is empty - compute counts and heroes from state_events - */ - let mut state_events = if is_initial_sync || full_state { - // reset lazy loading state on initial sync - if is_initial_sync { - services - .rooms - .lazy_loading - .reset(lazy_loading_context) + let mut state_events = + if let Some((previous_sync_end_count, previous_sync_end_shortstatehash)) = + previous_sync_end + && !full_state + { + let state_incremental = calculate_state_incremental( + services, + sender_user, + room_id, + previous_sync_end_count, + previous_sync_end_shortstatehash, + timeline_start_shortstatehash, + current_shortstatehash, + &timeline, + lazily_loaded_members.as_ref(), + ) + .boxed() + .await?; + + if is_encrypted_room { + calculate_device_list_updates( + services, + sync_context, + room_id, + &mut device_list_updates, + &state_incremental, + joined_since_last_sync, + ) .await; - } - - calculate_state_initial( - services, - sender_user, - timeline_start_shortstatehash, - lazy_loading_witness.as_ref(), - ) - .boxed() - .await? - } else if limited { - let state_incremental = calculate_state_incremental( - services, - sender_user, - since_shortstatehash, - timeline_start_shortstatehash, - lazy_loading_witness.as_ref(), - ) - .boxed() - .await?; - - // calculate device list updates for E2EE - if is_encrypted_room { - // add users with changed keys to the `changed` list - services - .users - .room_keys_changed(room_id, since, Some(next_batch)) - .map(at!(0)) - .map(ToOwned::to_owned) - .ready_for_each(|user_id| { - device_list_updates.changed.insert(user_id); - }) - .await; - - // add users who now share encrypted rooms to `changed` and - // users who no longer share encrypted rooms to `left` - for state_event in &state_incremental { - if state_event.kind == RoomMember { - let Some(content): Option = - state_event.get_content().ok() - else { - continue; - }; - - let Some(user_id): Option = state_event - .state_key - .as_ref() - .and_then(|key| key.parse().ok()) - else { - continue; - }; - - { - use MembershipState::*; - - if matches!(content.membership, Leave | Join) { - let shares_encrypted_room = share_encrypted_room( - services, - sender_user, - &user_id, - Some(room_id), - ) - .await; - match content.membership { - | Leave if !shares_encrypted_room => { - device_list_updates.left.insert(user_id); - }, - | Join if joined_since_last_sync || shares_encrypted_room => { - device_list_updates.changed.insert(user_id); - }, - | _ => (), - } - } - } - } } - } - state_incremental - } else { - vec![] - }; + state_incremental + } else { + calculate_state_initial( + services, + sender_user, + timeline_start_shortstatehash, + lazily_loaded_members.as_ref(), + ) + .boxed() + .await? + }; // only compute room counts and heroes (aka the summary) if the room's members // changed since the last sync @@ -339,26 +312,34 @@ pub(super) async fn load_joined_room( .is_some_and(is_equal_to!(sender_user.as_str())) }; - let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty()) - .then(|| { - state_events - .iter() - .position(is_sender_membership) - .map(|pos| state_events.swap_remove(pos)) - }) - .flatten(); + // the membership event of the syncing user, if they joined since the last sync + let sender_join_membership_event: Option<_> = (joined_since_last_sync + && timeline.pdus.is_empty()) + .then(|| { + state_events + .iter() + .position(is_sender_membership) + .map(|pos| state_events.swap_remove(pos)) + }) + .flatten(); - let prev_batch = timeline_pdus - .front() - .map(at!(0)) - .or_else(|| joined_sender_member.is_some().and(since).map(Into::into)); + // the prev_batch token for the response + let prev_batch = timeline.pdus.front().map(at!(0)).or_else(|| { + sender_join_membership_event + .is_some() + .and(since) + .map(Into::into) + }); - let timeline_pdus = timeline_pdus + let timeline_pdus = timeline + .pdus .into_iter() .stream() + // filter out ignored events from the timeline .wide_filter_map(|item| ignored_filter(services, item, sender_user)) .map(at!(1)) - .chain(joined_sender_member.into_iter().stream()) + // if the syncing user just joined, add their membership event to the timeline + .chain(sender_join_membership_event.into_iter().stream()) .map(Event::into_format) .collect::>(); @@ -368,8 +349,15 @@ pub(super) async fn load_joined_room( .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect(); - let send_notification_counts = - last_notification_read.is_none_or(|count| since.is_none_or(|since| count > since)); + /* + send notification counts if: + 1. this is an initial sync + 2. the user hasn't seen any notifications + 3. the last notification the user saw has changed since the last sync + */ + let send_notification_counts = last_notification_read.is_none_or(|last_notification_read| { + since.is_none_or(|since| last_notification_read > since) + }); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { @@ -446,8 +434,7 @@ pub(super) async fn load_joined_room( .chain(private_read_event.into_iter()) .collect(); - // Save the state after this sync so we can send the correct state diff next - // sync + // save the room state at this sync to use during the next sync services .rooms .user @@ -468,7 +455,7 @@ pub(super) async fn load_joined_room( }, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, timeline: Timeline { - limited, + limited: timeline.limited, prev_batch: prev_batch.as_ref().map(ToString::to_string), events: room_events, }, @@ -482,9 +469,11 @@ pub(super) async fn load_joined_room( Ok((joined_room, device_list_updates)) } -/// Calculate the "initial state", or all events from the start of the room up -/// to (but not including) the `current_shortstatehash`. If -/// `lazy_loading_witness` is `None`, lazy loading will be disabled. +/// Calculate the state events to include in an initial sync response. +/// +/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned +/// Vec will include the membership events of exclusively the members in +/// `lazily_loaded_members`. #[tracing::instrument( name = "initial", level = "trace", @@ -495,33 +484,41 @@ pub(super) async fn load_joined_room( async fn calculate_state_initial( services: &Services, sender_user: &UserId, - current_shortstatehash: ShortStateHash, - lazy_loading_witness: Option<&Witness>, + timeline_start_shortstatehash: ShortStateHash, + lazily_loaded_members: Option<&MemberSet>, ) -> Result> { + // load the keys and event IDs of the state events at the start of the timeline let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services .rooms .state_accessor - .state_full_ids(current_shortstatehash) + .state_full_ids(timeline_start_shortstatehash) .unzip() .await; - trace!("event ids for initial sync @ {:?}: {:?}", current_shortstatehash, event_ids); + trace!("performing initial sync of {} state events", event_ids.len()); services .rooms .short + // look up the full state keys .multi_get_statekey_from_short(shortstatekeys.into_iter().stream()) .zip(event_ids.into_iter().stream()) .ready_filter_map(|item| Some((item.0.ok()?, item.1))) .ready_filter_map(|((event_type, state_key), event_id)| { - let lazy = lazy_loading_witness.is_some_and(|witness| { - event_type == StateEventType::RoomMember + if let Some(lazily_loaded_members) = lazily_loaded_members { + /* + if lazy loading is enabled, filter out membership events which aren't for a user + included in `lazily_loaded_members` or for the user requesting the sync. + */ + let event_is_redundant = event_type == StateEventType::RoomMember && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { - sender_user != user_id && !witness.contains(user_id) - }) - }); + sender_user != user_id && !lazily_loaded_members.contains(user_id) + }); - lazy.or_some(event_id) + event_is_redundant.or_some(event_id) + } else { + Some(event_id) + } }) .broad_filter_map(|event_id: OwnedEventId| async move { services.rooms.timeline.get_pdu(&event_id).await.ok() @@ -531,89 +528,246 @@ async fn calculate_state_initial( .await } -/// Calculate the "incremental state", or all events from the -/// `since_shortstatehash` up to (but not including) -/// the `current_shortstatehash`. If `lazy_loading_witness` is `None`, lazy -/// loading will be disabled. +/// Calculate the state events to include in an incremental sync response. +/// +/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned +/// Vec will include the membership events of all the members in +/// `lazily_loaded_members`. #[tracing::instrument(name = "incremental", level = "trace", skip_all)] #[allow(clippy::too_many_arguments)] async fn calculate_state_incremental<'a>( services: &Services, sender_user: &'a UserId, - since_shortstatehash: Option, - current_shortstatehash: ShortStateHash, - lazy_loading_witness: Option<&'a Witness>, + room_id: &RoomId, + previous_sync_end_count: PduCount, + previous_sync_end_shortstatehash: ShortStateHash, + timeline_start_shortstatehash: ShortStateHash, + timeline_end_shortstatehash: ShortStateHash, + timeline: &TimelinePdus, + lazily_loaded_members: Option<&'a MemberSet>, ) -> Result> { - let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); + // NB: a limited sync is one where `timeline.limited == true`. Synapse calls + // this a "gappy" sync internally. - let state_get_shorteventid = |user_id: &'a UserId| { - services + /* + the state events returned from an incremental sync which isn't limited are usually empty. + however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`), + the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state + at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline + merged a split in the DAG. + + see: https://github.com/element-hq/synapse/issues/16941 + */ + + let timeline_is_linear = timeline.pdus.is_empty() || { + let last_pdu_of_last_sync = services .rooms - .state_accessor - .state_get_shortid( - current_shortstatehash, - &StateEventType::RoomMember, - user_id.as_str(), - ) - .ok() + .timeline + .pdus_rev(Some(sender_user), room_id, Some(previous_sync_end_count.saturating_add(1))) + .boxed() + .next() + .await + .transpose() + .expect("last sync should have had some PDUs") + .map(at!(1)); + + // make sure the prev_events of each pdu in the timeline refer only to the + // previous pdu + timeline + .pdus + .iter() + .try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| { + if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() { + if prev_event_id + .as_ref() + .is_none_or(is_equal_to!(pdu_prev_event_id)) + { + return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned())); + } + } + + trace!( + "pdu {:?} has split prev_events (expected {:?}): {:?}", + pdu.event_id, prev_event_id, pdu.prev_events + ); + ControlFlow::Break(()) + }) + .is_continue() }; - let lazy_state_ids: OptionFuture<_> = lazy_loading_witness - .map(|witness| { - StreamExt::into_future( - witness - .iter() - .stream() - .broad_filter_map(|user_id| state_get_shorteventid(user_id)), - ) - }) - .into(); + if timeline_is_linear && !timeline.limited { + // if there are no splits in the DAG and the timeline isn't limited, then + // `state` will always be empty unless lazy loading is enabled. - let state_diff_shortids = services - .rooms - .state_accessor - .state_added((since_shortstatehash, current_shortstatehash)) - .boxed(); + if let Some(lazily_loaded_members) = lazily_loaded_members + && !timeline.pdus.is_empty() + { + // lazy loading is enabled, so we return the membership events which were + // requested by the caller. + let lazy_membership_events: Vec<_> = lazily_loaded_members + .iter() + .stream() + .broad_filter_map(|user_id| async move { + if user_id == sender_user { + return None; + } - state_diff_shortids - .broad_filter_map(|(shortstatekey, shorteventid)| async move { - if lazy_loading_witness.is_none() { - return Some(shorteventid); + services + .rooms + .state_accessor + .state_get( + timeline_start_shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + ) + .ok() + .await + }) + .collect() + .await; + + if !lazy_membership_events.is_empty() { + trace!( + "syncing lazy membership events for members: {:?}", + lazy_membership_events + .iter() + .map(|pdu| pdu.state_key().unwrap()) + ); } + return Ok(lazy_membership_events); + } - lazy_filter(services, sender_user, shortstatekey, shorteventid).await - }) - .chain(lazy_state_ids.stream()) - .broad_filter_map(|shorteventid| { - services - .rooms - .short - .get_eventid_from_short(shorteventid) - .ok() - }) - .broad_filter_map(|event_id: OwnedEventId| async move { - services.rooms.timeline.get_pdu(&event_id).await.ok() - }) - .collect::>() - .map(Ok) - .await -} + // lazy loading is disabled, `state` is empty. + return Ok(vec![]); + } -async fn lazy_filter( - services: &Services, - sender_user: &UserId, - shortstatekey: ShortStateKey, - shorteventid: ShortEventId, -) -> Option { - let (event_type, state_key) = services + /* + at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates + computing the incremental state (which may be empty). + + NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering + if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon + enough that the performance penalty is acceptable. + */ + + trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync"); + + // fetch the shorteventids of state events in the timeline + let state_events_in_timeline: BTreeSet = services .rooms .short - .get_statekey_from_short(shortstatekey) - .await - .ok()?; + .multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| { + if pdu.state_key().is_some() { + Some(pdu.event_id.as_ref()) + } else { + None + } + })) + .collect() + .await; - (event_type != StateEventType::RoomMember || state_key == sender_user.as_str()) - .then_some(shorteventid) + trace!("{} state events in timeline", state_events_in_timeline.len()); + + /* + fetch the state events which were added since the last sync. + + specifically we fetch the difference between the state at the last sync and the state at the _end_ + of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched. + this is necessary to account for splits in the DAG, as explained above. + */ + let state_diff = services + .rooms + .short + .multi_get_eventid_from_short::<'_, OwnedEventId, _>( + services + .rooms + .state_accessor + .state_added((previous_sync_end_shortstatehash, timeline_end_shortstatehash)) + .await? + .stream() + .ready_filter_map(|(_, shorteventid)| { + if state_events_in_timeline.contains(&shorteventid) { + None + } else { + Some(shorteventid) + } + }), + ) + .ignore_err(); + + // finally, fetch the PDU contents and collect them into a vec + let state_diff_pdus = state_diff + .broad_filter_map(|event_id| async move { + services + .rooms + .timeline + .get_non_outlier_pdu(&event_id) + .await + .ok() + }) + .collect::>() + .await; + + trace!(?state_diff_pdus, "collected state PDUs for incremental sync"); + Ok(state_diff_pdus) +} + +async fn calculate_device_list_updates( + services: &Services, + SyncContext { sender_user, since, next_batch, .. }: SyncContext<'_>, + room_id: &RoomId, + device_list_updates: &mut DeviceListUpdates, + state_events: &Vec, + joined_since_last_sync: bool, +) { + // add users with changed keys to the `changed` list + services + .users + .room_keys_changed(room_id, since, Some(next_batch)) + .map(at!(0)) + .map(ToOwned::to_owned) + .ready_for_each(|user_id| { + device_list_updates.changed.insert(user_id); + }) + .await; + + // add users who now share encrypted rooms to `changed` and + // users who no longer share encrypted rooms to `left` + for state_event in state_events { + if state_event.kind == RoomMember { + let Some(content): Option = state_event.get_content().ok() + else { + continue; + }; + + let Some(user_id): Option = state_event + .state_key + .as_ref() + .and_then(|key| key.parse().ok()) + else { + continue; + }; + + { + use MembershipState::*; + + if matches!(content.membership, Leave | Join) { + let shares_encrypted_room = + share_encrypted_room(services, sender_user, &user_id, Some(room_id)) + .await; + match content.membership { + | Leave if !shares_encrypted_room => { + device_list_updates.left.insert(user_id); + }, + | Join if joined_since_last_sync || shares_encrypted_room => { + device_list_updates.changed.insert(user_id); + }, + | _ => (), + } + } + } + } + } } async fn calculate_counts( diff --git a/src/service/rooms/lazy_loading/mod.rs b/src/service/rooms/lazy_loading/mod.rs index 61f081a9..c17d5c4f 100644 --- a/src/service/rooms/lazy_loading/mod.rs +++ b/src/service/rooms/lazy_loading/mod.rs @@ -39,7 +39,7 @@ pub enum Status { Seen(u64), } -pub type Witness = HashSet; +pub type MemberSet = HashSet; type Key<'a> = (&'a UserId, Option<&'a DeviceId>, &'a RoomId, &'a UserId); impl crate::Service for Service { @@ -67,9 +67,11 @@ pub async fn reset(&self, ctx: &Context<'_>) { .await; } +/// Returns only the subset of `senders` which should be sent to the client +/// according to the provided lazy loading context. #[implement(Service)] #[tracing::instrument(name = "retain", level = "debug", skip_all)] -pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witness { +pub async fn retain_lazy_members(&self, senders: MemberSet, ctx: &Context<'_>) -> MemberSet { debug_assert!( ctx.options.is_none_or(Options::is_enabled), "lazy loading should be enabled by your options" @@ -84,7 +86,7 @@ pub async fn witness_retain(&self, senders: Witness, ctx: &Context<'_>) -> Witne pin_mut!(witness); let _cork = self.db.db.cork(); - let mut senders = Witness::with_capacity(senders.len()); + let mut senders = MemberSet::with_capacity(senders.len()); while let Some((status, sender)) = witness.next().await { if include_redundant || status == Status::Unseen { senders.insert(sender.into()); diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index 660bb7de..6eafc8ee 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -1,10 +1,18 @@ use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; pub use conduwuit::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; -use conduwuit::{Result, err, implement, matrix::StateKey, utils, utils::IterStream}; +use conduwuit::{ + Result, err, implement, + matrix::StateKey, + pair_of, + utils::{self, IterStream, ReadyExt}, +}; use database::{Deserialized, Get, Map, Qry}; -use futures::{Stream, StreamExt}; -use ruma::{EventId, RoomId, events::StateEventType}; +use futures::{ + Stream, StreamExt, + stream::{self}, +}; +use ruma::{EventId, OwnedEventId, RoomId, events::StateEventType}; use serde::Deserialize; use crate::{Dep, globals}; @@ -258,3 +266,24 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId { short }) } + +#[implement(Service)] +pub async fn multi_get_state_from_short<'a, S>( + &'a self, + short_state: S, +) -> impl Stream> + Send + 'a +where + S: Stream + Send + 'a, +{ + let (short_state_keys, short_event_ids): pair_of!(Vec<_>) = short_state.unzip().await; + + StreamExt::zip( + self.multi_get_statekey_from_short(stream::iter(short_state_keys.into_iter())), + self.multi_get_eventid_from_short(stream::iter(short_event_ids.into_iter())), + ) + .ready_filter_map(|state_event| match state_event { + | (Ok(state_key), Ok(event_id)) => Some(Ok((state_key, event_id))), + | (Err(e), _) => Some(Err(e)), + | (_, Err(e)) => Some(Err(e)), + }) +} diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index 66c26d76..136a570d 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -10,7 +10,7 @@ use conduwuit::{ }, }; use database::Deserialized; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join, pin_mut}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, pin_mut}; use ruma::{ EventId, OwnedEventId, UserId, events::{ @@ -286,28 +286,28 @@ pub fn state_keys<'a>( /// not in .1) #[implement(super::Service)] #[inline] -pub fn state_removed( +pub async fn state_removed( &self, shortstatehash: pair_of!(ShortStateHash), -) -> impl Stream + Send + '_ { - self.state_added((shortstatehash.1, shortstatehash.0)) +) -> Result> { + self.state_added((shortstatehash.1, shortstatehash.0)).await } /// Returns the state events added between the interval (present in .1 but /// not in .0) #[implement(super::Service)] -pub fn state_added( +pub async fn state_added( &self, shortstatehash: pair_of!(ShortStateHash), -) -> impl Stream + Send + '_ { - let a = self.load_full_state(shortstatehash.0); - let b = self.load_full_state(shortstatehash.1); - try_join(a, b) - .map_ok(|(a, b)| b.difference(&a).copied().collect::>()) - .map_ok(IterStream::try_stream) - .try_flatten_stream() - .ignore_err() +) -> Result> { + let full_state_a = self.load_full_state(shortstatehash.0).await?; + let full_state_b = self.load_full_state(shortstatehash.1).await?; + + Ok(full_state_b + .difference(&full_state_a) + .copied() .map(parse_compressed_state_event) + .collect()) } #[implement(super::Service)] diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index 028c3e51..039429a3 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -526,7 +526,7 @@ pub(crate) fn compress_state_event( #[inline] #[must_use] -pub(crate) fn parse_compressed_state_event( +pub fn parse_compressed_state_event( compressed_event: CompressedStateEvent, ) -> (ShortStateKey, ShortEventId) { use utils::u64_from_u8; diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index aac8f30c..a19eab9b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -186,10 +186,8 @@ impl Service { } /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. #[inline] - pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { + pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { self.db.get_non_outlier_pdu(event_id).await }