From fceaaedc048a0105d162368d6c2c311a2fc7981c Mon Sep 17 00:00:00 2001 From: Ginger Date: Mon, 27 Oct 2025 09:32:28 -0400 Subject: [PATCH] fix: Properly sync newly joined rooms --- src/api/client/sync/v3/joined.rs | 122 ++++++++++++++++--------------- src/api/client/sync/v3/mod.rs | 15 +++- 2 files changed, 74 insertions(+), 63 deletions(-) diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 9255a7cd..3f929951 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -87,8 +87,11 @@ pub(super) async fn load_joined_room( filter, } = sync_context; - let mut device_list_updates = DeviceListUpdates::new(); + // the global count as of the end of the last sync. + // this will be None if we are doing an initial sync. + let previous_sync_end_count = since.map(PduCount::Normal); let next_batchcount = PduCount::Normal(next_batch); + let mut device_list_updates = DeviceListUpdates::new(); // the room state right now let current_shortstatehash = services @@ -97,29 +100,27 @@ pub(super) async fn load_joined_room( .get_room_shortstatehash(room_id) .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); - // the global count and room state as of the end of the last sync. - // this will be None if we are doing an initial sync. - let previous_sync_end = OptionFuture::from(since.map(|since| async move { - let previous_sync_end_count = PduCount::Normal(since); - - let previous_sync_end_shortstatehash = services + // the room state as of the end of the last sync. + // this will be None if we are doing an initial sync or if we just joined this + // room. + let previous_sync_end_shortstatehash = OptionFuture::from(since.map(|since| { + services .rooms .user .get_token_shortstatehash(room_id, since) - .await?; - - Ok((previous_sync_end_count, previous_sync_end_shortstatehash)) + .ok() })) - .map(Option::transpose); + .map(Option::flatten) + .map(Ok); - let (current_shortstatehash, previous_sync_end) = - try_join(current_shortstatehash, previous_sync_end).await?; + let (current_shortstatehash, previous_sync_end_shortstatehash) = + try_join(current_shortstatehash, previous_sync_end_shortstatehash).await?; let timeline = load_timeline( services, sender_user, room_id, - previous_sync_end.map(at!(0)), + previous_sync_end_count, Some(next_batchcount), 10_usize, ); @@ -167,9 +168,9 @@ pub(super) async fn load_joined_room( }) .into(); - // the syncing user's membership event during the last sync - let membership_during_previous_sync: OptionFuture<_> = previous_sync_end - .map(at!(1)) + // the syncing user's membership event during the last sync. + // this will be None if `previous_sync_end_shortstatehash` is None. + let membership_during_previous_sync: OptionFuture<_> = previous_sync_end_shortstatehash .map(|shortstatehash| { services .rooms @@ -244,7 +245,7 @@ pub(super) async fn load_joined_room( .await; // reset lazy loading state on initial sync - if previous_sync_end.is_none() { + if previous_sync_end_count.is_none() { services .rooms .lazy_loading @@ -252,48 +253,51 @@ pub(super) async fn load_joined_room( .await; } - let mut state_events = - if let Some((previous_sync_end_count, previous_sync_end_shortstatehash)) = - previous_sync_end - && !full_state - { - let state_incremental = calculate_state_incremental( - services, - sender_user, - room_id, - previous_sync_end_count, - previous_sync_end_shortstatehash, - timeline_start_shortstatehash, - current_shortstatehash, - &timeline, - lazily_loaded_members.as_ref(), - ) - .boxed() - .await?; + /* + compute the state delta between the previous sync and this sync. if this is an initial sync + *or* we just joined this room, `calculate_state_initial` will be used, otherwise `calculate_state_incremental` + will be used. + */ + let mut state_events = if let Some(previous_sync_end_count) = previous_sync_end_count + && let Some(previous_sync_end_shortstatehash) = previous_sync_end_shortstatehash + && !full_state + { + calculate_state_incremental( + services, + sender_user, + room_id, + previous_sync_end_count, + previous_sync_end_shortstatehash, + timeline_start_shortstatehash, + current_shortstatehash, + &timeline, + lazily_loaded_members.as_ref(), + ) + .boxed() + .await? + } else { + calculate_state_initial( + services, + sender_user, + timeline_start_shortstatehash, + lazily_loaded_members.as_ref(), + ) + .boxed() + .await? + }; - if is_encrypted_room { - calculate_device_list_updates( - services, - sync_context, - room_id, - &mut device_list_updates, - &state_incremental, - joined_since_last_sync, - ) - .await; - } - - state_incremental - } else { - calculate_state_initial( - services, - sender_user, - timeline_start_shortstatehash, - lazily_loaded_members.as_ref(), - ) - .boxed() - .await? - }; + // for incremental syncs, calculate updates to E2EE device lists + if previous_sync_end_count.is_some() && is_encrypted_room { + calculate_device_list_updates( + services, + sync_context, + room_id, + &mut device_list_updates, + &state_events, + joined_since_last_sync, + ) + .await; + } // only compute room counts and heroes (aka the summary) if the room's members // changed since the last sync diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index 66d9023a..44c57fd5 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -14,6 +14,7 @@ use conduwuit::{ ReadyExt, TryFutureExtExt, stream::{BroadbandExt, Tools, WidebandExt}, }, + warn, }; use conduwuit_service::Services; use futures::{ @@ -210,10 +211,16 @@ pub(crate) async fn build_sync_events( .state_cache .rooms_joined(sender_user) .map(ToOwned::to_owned) - .broad_filter_map(|room_id| { - load_joined_room(services, context, room_id.clone()) - .map_ok(move |(joined_room, updates)| (room_id, joined_room, updates)) - .ok() + .broad_filter_map(|room_id| async { + let joined_room = load_joined_room(services, context, room_id.clone()).await; + + match joined_room { + | Ok((room, updates)) => Some((room_id, room, updates)), + | Err(err) => { + warn!(?err, ?room_id, "error loading joined room {}", room_id); + None + }, + } }) .ready_fold( (BTreeMap::new(), DeviceListUpdates::new()),