fix: Properly sync newly joined rooms
This commit is contained in:
parent
1444f43fa7
commit
afd115eedc
2 changed files with 74 additions and 63 deletions
|
|
@ -87,8 +87,11 @@ pub(super) async fn load_joined_room(
|
|||
filter,
|
||||
} = sync_context;
|
||||
|
||||
let mut device_list_updates = DeviceListUpdates::new();
|
||||
// the global count as of the end of the last sync.
|
||||
// this will be None if we are doing an initial sync.
|
||||
let previous_sync_end_count = since.map(PduCount::Normal);
|
||||
let next_batchcount = PduCount::Normal(next_batch);
|
||||
let mut device_list_updates = DeviceListUpdates::new();
|
||||
|
||||
// the room state right now
|
||||
let current_shortstatehash = services
|
||||
|
|
@ -97,29 +100,27 @@ pub(super) async fn load_joined_room(
|
|||
.get_room_shortstatehash(room_id)
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
||||
|
||||
// the global count and room state as of the end of the last sync.
|
||||
// this will be None if we are doing an initial sync.
|
||||
let previous_sync_end = OptionFuture::from(since.map(|since| async move {
|
||||
let previous_sync_end_count = PduCount::Normal(since);
|
||||
|
||||
let previous_sync_end_shortstatehash = services
|
||||
// the room state as of the end of the last sync.
|
||||
// this will be None if we are doing an initial sync or if we just joined this
|
||||
// room.
|
||||
let previous_sync_end_shortstatehash = OptionFuture::from(since.map(|since| {
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, since)
|
||||
.await?;
|
||||
|
||||
Ok((previous_sync_end_count, previous_sync_end_shortstatehash))
|
||||
.ok()
|
||||
}))
|
||||
.map(Option::transpose);
|
||||
.map(Option::flatten)
|
||||
.map(Ok);
|
||||
|
||||
let (current_shortstatehash, previous_sync_end) =
|
||||
try_join(current_shortstatehash, previous_sync_end).await?;
|
||||
let (current_shortstatehash, previous_sync_end_shortstatehash) =
|
||||
try_join(current_shortstatehash, previous_sync_end_shortstatehash).await?;
|
||||
|
||||
let timeline = load_timeline(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
previous_sync_end.map(at!(0)),
|
||||
previous_sync_end_count,
|
||||
Some(next_batchcount),
|
||||
10_usize,
|
||||
);
|
||||
|
|
@ -167,9 +168,9 @@ pub(super) async fn load_joined_room(
|
|||
})
|
||||
.into();
|
||||
|
||||
// the syncing user's membership event during the last sync
|
||||
let membership_during_previous_sync: OptionFuture<_> = previous_sync_end
|
||||
.map(at!(1))
|
||||
// the syncing user's membership event during the last sync.
|
||||
// this will be None if `previous_sync_end_shortstatehash` is None.
|
||||
let membership_during_previous_sync: OptionFuture<_> = previous_sync_end_shortstatehash
|
||||
.map(|shortstatehash| {
|
||||
services
|
||||
.rooms
|
||||
|
|
@ -244,7 +245,7 @@ pub(super) async fn load_joined_room(
|
|||
.await;
|
||||
|
||||
// reset lazy loading state on initial sync
|
||||
if previous_sync_end.is_none() {
|
||||
if previous_sync_end_count.is_none() {
|
||||
services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
|
|
@ -252,48 +253,51 @@ pub(super) async fn load_joined_room(
|
|||
.await;
|
||||
}
|
||||
|
||||
let mut state_events =
|
||||
if let Some((previous_sync_end_count, previous_sync_end_shortstatehash)) =
|
||||
previous_sync_end
|
||||
&& !full_state
|
||||
{
|
||||
let state_incremental = calculate_state_incremental(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
previous_sync_end_count,
|
||||
previous_sync_end_shortstatehash,
|
||||
timeline_start_shortstatehash,
|
||||
current_shortstatehash,
|
||||
&timeline,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
/*
|
||||
compute the state delta between the previous sync and this sync. if this is an initial sync
|
||||
*or* we just joined this room, `calculate_state_initial` will be used, otherwise `calculate_state_incremental`
|
||||
will be used.
|
||||
*/
|
||||
let mut state_events = if let Some(previous_sync_end_count) = previous_sync_end_count
|
||||
&& let Some(previous_sync_end_shortstatehash) = previous_sync_end_shortstatehash
|
||||
&& !full_state
|
||||
{
|
||||
calculate_state_incremental(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
previous_sync_end_count,
|
||||
previous_sync_end_shortstatehash,
|
||||
timeline_start_shortstatehash,
|
||||
current_shortstatehash,
|
||||
&timeline,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?
|
||||
} else {
|
||||
calculate_state_initial(
|
||||
services,
|
||||
sender_user,
|
||||
timeline_start_shortstatehash,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?
|
||||
};
|
||||
|
||||
if is_encrypted_room {
|
||||
calculate_device_list_updates(
|
||||
services,
|
||||
sync_context,
|
||||
room_id,
|
||||
&mut device_list_updates,
|
||||
&state_incremental,
|
||||
joined_since_last_sync,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
state_incremental
|
||||
} else {
|
||||
calculate_state_initial(
|
||||
services,
|
||||
sender_user,
|
||||
timeline_start_shortstatehash,
|
||||
lazily_loaded_members.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?
|
||||
};
|
||||
// for incremental syncs, calculate updates to E2EE device lists
|
||||
if previous_sync_end_count.is_some() && is_encrypted_room {
|
||||
calculate_device_list_updates(
|
||||
services,
|
||||
sync_context,
|
||||
room_id,
|
||||
&mut device_list_updates,
|
||||
&state_events,
|
||||
joined_since_last_sync,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// only compute room counts and heroes (aka the summary) if the room's members
|
||||
// changed since the last sync
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use conduwuit::{
|
|||
ReadyExt, TryFutureExtExt,
|
||||
stream::{BroadbandExt, Tools, WidebandExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_service::Services;
|
||||
use futures::{
|
||||
|
|
@ -210,10 +211,16 @@ pub(crate) async fn build_sync_events(
|
|||
.state_cache
|
||||
.rooms_joined(sender_user)
|
||||
.map(ToOwned::to_owned)
|
||||
.broad_filter_map(|room_id| {
|
||||
load_joined_room(services, context, room_id.clone())
|
||||
.map_ok(move |(joined_room, updates)| (room_id, joined_room, updates))
|
||||
.ok()
|
||||
.broad_filter_map(|room_id| async {
|
||||
let joined_room = load_joined_room(services, context, room_id.clone()).await;
|
||||
|
||||
match joined_room {
|
||||
| Ok((room, updates)) => Some((room_id, room, updates)),
|
||||
| Err(err) => {
|
||||
warn!(?err, ?room_id, "error loading joined room {}", room_id);
|
||||
None
|
||||
},
|
||||
}
|
||||
})
|
||||
.ready_fold(
|
||||
(BTreeMap::new(), DeviceListUpdates::new()),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue