fix(sync/v3): Properly sync room heroes
This commit is contained in:
parent
876d3faec4
commit
61b6947e88
1 changed files with 99 additions and 85 deletions
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
|
||||
use conduwuit::{
|
||||
Result, at, err, extract_variant, is_equal_to,
|
||||
|
|
@ -6,11 +6,10 @@ use conduwuit::{
|
|||
Event,
|
||||
pdu::{PduCount, PduEvent},
|
||||
},
|
||||
result::FlatOk,
|
||||
utils::{
|
||||
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
math::ruma_from_u64,
|
||||
stream::{Tools, WidebandExt},
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
},
|
||||
};
|
||||
use conduwuit_service::Services;
|
||||
|
|
@ -32,6 +31,7 @@ use ruma::{
|
|||
serde::Raw,
|
||||
uint,
|
||||
};
|
||||
use service::rooms::short::ShortStateHash;
|
||||
|
||||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::client::{
|
||||
|
|
@ -238,15 +238,24 @@ pub(super) async fn load_joined_room(
|
|||
.await;
|
||||
}
|
||||
|
||||
// only compute room counts and heroes (aka the summary) if the room's members
|
||||
// changed since the last sync
|
||||
let (joined_member_count, invited_member_count, heroes) =
|
||||
// calculate counts if any of the state events we're syncing are of type `m.room.member`
|
||||
if state_events.iter().any(|event| event.kind == RoomMember) {
|
||||
build_room_summary(services, room_id, syncing_user).await?
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
/*
|
||||
build the `summary` field of the room object. this is necessary if:
|
||||
1. the syncing user joined this room since the last sync, because their client doesn't have a summary for this room yet, or
|
||||
2. we're going to sync a membership event in either `state` or `timeline`, because that event may impact
|
||||
the joined/invited counts in the summary
|
||||
*/
|
||||
let sending_membership_events = timeline
|
||||
.pdus
|
||||
.iter()
|
||||
.map(|(_, pdu)| pdu)
|
||||
.chain(state_events.iter())
|
||||
.any(|event| event.kind == RoomMember);
|
||||
|
||||
let summary = if sending_membership_events || joined_since_last_sync {
|
||||
build_room_summary(services, room_id, syncing_user, current_shortstatehash).await?
|
||||
} else {
|
||||
RoomSummary::default()
|
||||
};
|
||||
|
||||
let is_sender_membership = |pdu: &PduEvent| {
|
||||
pdu.kind == StateEventType::RoomMember.into()
|
||||
|
|
@ -295,8 +304,8 @@ pub(super) async fn load_joined_room(
|
|||
|
||||
/*
|
||||
send notification counts if:
|
||||
1. this is an initial sync
|
||||
2. the user hasn't seen any notifications
|
||||
1. this is an initial sync, or
|
||||
2. the user hasn't seen any notifications, or
|
||||
3. the last notification the user saw has changed since the last sync
|
||||
*/
|
||||
let send_notification_counts = last_notification_read.is_none_or(|last_notification_read| {
|
||||
|
|
@ -350,7 +359,7 @@ pub(super) async fn load_joined_room(
|
|||
let events = join3(timeline_pdus, account_data_events, typing_events);
|
||||
let (unread_notifications, events) = join(unread_notifications, events).boxed().await;
|
||||
|
||||
let (room_events, account_data_events, typing_events) = events;
|
||||
let (timeline_events, account_data_events, typing_events) = events;
|
||||
let (notification_count, highlight_count) = unread_notifications;
|
||||
|
||||
let last_privateread_update = if let Some(last_sync_end_count) = last_sync_end_count {
|
||||
|
|
@ -389,22 +398,13 @@ pub(super) async fn load_joined_room(
|
|||
|
||||
let joined_room = JoinedRoom {
|
||||
account_data: RoomAccountData { events: account_data_events },
|
||||
summary: RoomSummary {
|
||||
joined_member_count: joined_member_count.map(ruma_from_u64),
|
||||
invited_member_count: invited_member_count.map(ruma_from_u64),
|
||||
heroes: heroes
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(TryInto::try_into)
|
||||
.filter_map(Result::ok)
|
||||
.collect(),
|
||||
},
|
||||
summary,
|
||||
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
|
||||
timeline: Timeline {
|
||||
// mirror Synapse behavior by setting `limited` if the user joined since the last sync
|
||||
limited: timeline.limited || joined_since_last_sync,
|
||||
prev_batch: prev_batch.as_ref().map(ToString::to_string),
|
||||
events: room_events,
|
||||
events: timeline_events,
|
||||
},
|
||||
state: RoomState {
|
||||
events: state_events.into_iter().map(Event::into_format).collect(),
|
||||
|
|
@ -479,11 +479,14 @@ async fn extend_device_list_updates(
|
|||
}
|
||||
}
|
||||
|
||||
/// Build the `summary` field of the room object, which includes
|
||||
/// the number of joined and invited users and the room's heroes.
|
||||
async fn build_room_summary(
|
||||
services: &Services,
|
||||
room_id: &RoomId,
|
||||
syncing_user: &UserId,
|
||||
) -> Result<(Option<u64>, Option<u64>, Option<Vec<OwnedUserId>>)> {
|
||||
current_shortstatehash: ShortStateHash,
|
||||
) -> Result<RoomSummary> {
|
||||
let joined_member_count = services
|
||||
.rooms
|
||||
.state_cache
|
||||
|
|
@ -496,74 +499,85 @@ async fn build_room_summary(
|
|||
.room_invited_count(room_id)
|
||||
.unwrap_or(0);
|
||||
|
||||
let (joined_member_count, invited_member_count) =
|
||||
join(joined_member_count, invited_member_count).await;
|
||||
let has_name = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_contains_type(current_shortstatehash, &StateEventType::RoomName);
|
||||
|
||||
let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
|
||||
let has_canonical_alias = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_contains_type(current_shortstatehash, &StateEventType::RoomCanonicalAlias);
|
||||
|
||||
let heroes: OptionFuture<_> = small_room
|
||||
.then(|| build_heroes(services, room_id, syncing_user))
|
||||
let (joined_member_count, invited_member_count, has_name, has_canonical_alias) =
|
||||
join4(joined_member_count, invited_member_count, has_name, has_canonical_alias).await;
|
||||
|
||||
// only send heroes if the room has neither a name nor a canonical alias
|
||||
let heroes: OptionFuture<_> = (!(has_name || has_canonical_alias))
|
||||
.then(|| build_heroes(services, room_id, syncing_user, current_shortstatehash))
|
||||
.into();
|
||||
|
||||
Ok((Some(joined_member_count), Some(invited_member_count), heroes.await))
|
||||
Ok(RoomSummary {
|
||||
heroes: heroes
|
||||
.await
|
||||
.map(|heroes| heroes.into_iter().collect())
|
||||
.unwrap_or_default(),
|
||||
joined_member_count: Some(ruma_from_u64(joined_member_count)),
|
||||
invited_member_count: Some(ruma_from_u64(invited_member_count)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch the user IDs to include in the `m.heroes` property of the room
|
||||
/// summary.
|
||||
async fn build_heroes(
|
||||
services: &Services,
|
||||
room_id: &RoomId,
|
||||
syncing_user: &UserId,
|
||||
) -> Vec<OwnedUserId> {
|
||||
services
|
||||
current_shortstatehash: ShortStateHash,
|
||||
) -> HashSet<OwnedUserId> {
|
||||
const MAX_HERO_COUNT: usize = 5;
|
||||
|
||||
// fetch joined members from the state cache first
|
||||
let joined_members_stream = services
|
||||
.rooms
|
||||
.timeline
|
||||
.all_pdus(syncing_user, room_id)
|
||||
.ready_filter(|(_, pdu)| pdu.kind == RoomMember)
|
||||
.fold_default(|heroes: Vec<_>, (_, pdu)| {
|
||||
fold_hero(heroes, services, room_id, syncing_user, pdu)
|
||||
})
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.map(ToOwned::to_owned);
|
||||
|
||||
// then fetch invited members
|
||||
let invited_members_stream = services
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members_invited(room_id)
|
||||
.map(ToOwned::to_owned);
|
||||
|
||||
// then as a last resort fetch every membership event
|
||||
let all_members_stream = services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_statekey_from_short(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_shortids(current_shortstatehash)
|
||||
.ignore_err()
|
||||
.ready_filter_map(|(key, _)| Some(key)),
|
||||
)
|
||||
.ignore_err()
|
||||
.ready_filter_map(|(event_type, state_key)| {
|
||||
if event_type == StateEventType::RoomMember {
|
||||
state_key.to_string().try_into().ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
joined_members_stream
|
||||
.chain(invited_members_stream)
|
||||
.chain(all_members_stream)
|
||||
// the hero list should never include the syncing user
|
||||
.ready_filter(|user_id| user_id != syncing_user)
|
||||
.take(MAX_HERO_COUNT)
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn fold_hero(
|
||||
mut heroes: Vec<OwnedUserId>,
|
||||
services: &Services,
|
||||
room_id: &RoomId,
|
||||
syncing_user: &UserId,
|
||||
pdu: PduEvent,
|
||||
) -> Vec<OwnedUserId> {
|
||||
let Some(user_id): Option<&UserId> =
|
||||
pdu.state_key.as_deref().map(TryInto::try_into).flat_ok()
|
||||
else {
|
||||
return heroes;
|
||||
};
|
||||
|
||||
if user_id == syncing_user {
|
||||
return heroes;
|
||||
}
|
||||
|
||||
let Ok(content): Result<RoomMemberEventContent, _> = pdu.get_content() else {
|
||||
return heroes;
|
||||
};
|
||||
|
||||
// The membership was and still is invite or join
|
||||
if !matches!(content.membership, MembershipState::Join | MembershipState::Invite) {
|
||||
return heroes;
|
||||
}
|
||||
|
||||
if heroes.iter().any(is_equal_to!(user_id)) {
|
||||
return heroes;
|
||||
}
|
||||
|
||||
let (is_invited, is_joined) = join(
|
||||
services.rooms.state_cache.is_invited(user_id, room_id),
|
||||
services.rooms.state_cache.is_joined(user_id, room_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
if !is_joined && is_invited {
|
||||
return heroes;
|
||||
}
|
||||
|
||||
heroes.push(user_id.to_owned());
|
||||
heroes
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue