fix: Properly sync newly joined rooms

This commit is contained in:
Ginger 2025-10-27 09:32:28 -04:00
parent 0eff173c0b
commit fceaaedc04
2 changed files with 74 additions and 63 deletions

View file

@ -87,8 +87,11 @@ pub(super) async fn load_joined_room(
filter,
} = sync_context;
let mut device_list_updates = DeviceListUpdates::new();
// the global count as of the end of the last sync.
// this will be None if we are doing an initial sync.
let previous_sync_end_count = since.map(PduCount::Normal);
let next_batchcount = PduCount::Normal(next_batch);
let mut device_list_updates = DeviceListUpdates::new();
// the room state right now
let current_shortstatehash = services
@ -97,29 +100,27 @@ pub(super) async fn load_joined_room(
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
// the global count and room state as of the end of the last sync.
// this will be None if we are doing an initial sync.
let previous_sync_end = OptionFuture::from(since.map(|since| async move {
let previous_sync_end_count = PduCount::Normal(since);
let previous_sync_end_shortstatehash = services
// the room state as of the end of the last sync.
// this will be None if we are doing an initial sync or if we just joined this
// room.
let previous_sync_end_shortstatehash = OptionFuture::from(since.map(|since| {
services
.rooms
.user
.get_token_shortstatehash(room_id, since)
.await?;
Ok((previous_sync_end_count, previous_sync_end_shortstatehash))
.ok()
}))
.map(Option::transpose);
.map(Option::flatten)
.map(Ok);
let (current_shortstatehash, previous_sync_end) =
try_join(current_shortstatehash, previous_sync_end).await?;
let (current_shortstatehash, previous_sync_end_shortstatehash) =
try_join(current_shortstatehash, previous_sync_end_shortstatehash).await?;
let timeline = load_timeline(
services,
sender_user,
room_id,
previous_sync_end.map(at!(0)),
previous_sync_end_count,
Some(next_batchcount),
10_usize,
);
@ -167,9 +168,9 @@ pub(super) async fn load_joined_room(
})
.into();
// the syncing user's membership event during the last sync
let membership_during_previous_sync: OptionFuture<_> = previous_sync_end
.map(at!(1))
// the syncing user's membership event during the last sync.
// this will be None if `previous_sync_end_shortstatehash` is None.
let membership_during_previous_sync: OptionFuture<_> = previous_sync_end_shortstatehash
.map(|shortstatehash| {
services
.rooms
@ -244,7 +245,7 @@ pub(super) async fn load_joined_room(
.await;
// reset lazy loading state on initial sync
if previous_sync_end.is_none() {
if previous_sync_end_count.is_none() {
services
.rooms
.lazy_loading
@ -252,48 +253,51 @@ pub(super) async fn load_joined_room(
.await;
}
let mut state_events =
if let Some((previous_sync_end_count, previous_sync_end_shortstatehash)) =
previous_sync_end
&& !full_state
{
let state_incremental = calculate_state_incremental(
services,
sender_user,
room_id,
previous_sync_end_count,
previous_sync_end_shortstatehash,
timeline_start_shortstatehash,
current_shortstatehash,
&timeline,
lazily_loaded_members.as_ref(),
)
.boxed()
.await?;
/*
compute the state delta between the previous sync and this sync. if this is an initial sync
*or* we just joined this room, `calculate_state_initial` will be used, otherwise `calculate_state_incremental`
will be used.
*/
let mut state_events = if let Some(previous_sync_end_count) = previous_sync_end_count
&& let Some(previous_sync_end_shortstatehash) = previous_sync_end_shortstatehash
&& !full_state
{
calculate_state_incremental(
services,
sender_user,
room_id,
previous_sync_end_count,
previous_sync_end_shortstatehash,
timeline_start_shortstatehash,
current_shortstatehash,
&timeline,
lazily_loaded_members.as_ref(),
)
.boxed()
.await?
} else {
calculate_state_initial(
services,
sender_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.boxed()
.await?
};
if is_encrypted_room {
calculate_device_list_updates(
services,
sync_context,
room_id,
&mut device_list_updates,
&state_incremental,
joined_since_last_sync,
)
.await;
}
state_incremental
} else {
calculate_state_initial(
services,
sender_user,
timeline_start_shortstatehash,
lazily_loaded_members.as_ref(),
)
.boxed()
.await?
};
// for incremental syncs, calculate updates to E2EE device lists
if previous_sync_end_count.is_some() && is_encrypted_room {
calculate_device_list_updates(
services,
sync_context,
room_id,
&mut device_list_updates,
&state_events,
joined_since_last_sync,
)
.await;
}
// only compute room counts and heroes (aka the summary) if the room's members
// changed since the last sync

View file

@ -14,6 +14,7 @@ use conduwuit::{
ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt},
},
warn,
};
use conduwuit_service::Services;
use futures::{
@ -210,10 +211,16 @@ pub(crate) async fn build_sync_events(
.state_cache
.rooms_joined(sender_user)
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| {
load_joined_room(services, context, room_id.clone())
.map_ok(move |(joined_room, updates)| (room_id, joined_room, updates))
.ok()
.broad_filter_map(|room_id| async {
let joined_room = load_joined_room(services, context, room_id.clone()).await;
match joined_room {
| Ok((room, updates)) => Some((room_id, room, updates)),
| Err(err) => {
warn!(?err, ?room_id, "error loading joined room {}", room_id);
None
},
}
})
.ready_fold(
(BTreeMap::new(), DeviceListUpdates::new()),