diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index c67178ca..4ef4a2b0 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -388,8 +388,9 @@ pub async fn remote_leave_room( .outlier .add_pdu_outlier(&event_id, &leave_event); - let leave_pdu = Pdu::from_id_val(&event_id, leave_event) - .map_err(|e| err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}")))?; + let leave_pdu = Pdu::from_id_val(&event_id, leave_event).map_err(|e| { + err!(BadServerResponse("Invalid leave PDU received during federated leave: {e:?}")) + })?; Ok(leave_pdu) } diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 930a393d..f47b591c 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -4,7 +4,7 @@ mod v5; use std::collections::VecDeque; use conduwuit::{ - Event, PduCount, Result, + Event, PduCount, Result, err, matrix::pdu::PduEvent, ref_at, trace, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, @@ -54,7 +54,10 @@ async fn load_timeline( .rooms .timeline .last_timeline_count(Some(sender_user), room_id) - .await?; + .await + .map_err(|err| { + err!(Database(warn!("Failed to fetch end of room timeline: {}", err))) + })?; if last_timeline_count <= starting_count { // no messages have been sent in this room since `starting_count` diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 14066ad9..65663017 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -1,11 +1,12 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use conduwuit::{ - Result, at, err, extract_variant, + Result, at, debug_warn, err, extract_variant, matrix::{ Event, pdu::{PduCount, PduEvent}, }, + trace, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::ruma_from_u64, @@ -16,7 +17,7 @@ use conduwuit::{ use conduwuit_service::Services; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join, join3, join4, try_join}, + future::{OptionFuture, join, join3, join4, try_join, try_join3}, }; use ruma::{ OwnedRoomId, OwnedUserId, RoomId, UserId, @@ -25,7 +26,7 @@ use ruma::{ v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline}, }, events::{ - AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, + AnyRawAccountDataEvent, StateEventType, TimelineEventType::*, room::member::{MembershipState, RoomMemberEventContent}, }, @@ -36,7 +37,7 @@ use service::rooms::short::ShortStateHash; use super::{load_timeline, share_encrypted_room}; use crate::client::{ - ignored_filter, + TimelinePdus, ignored_filter, sync::v3::{ DEFAULT_TIMELINE_LIMIT, DeviceListUpdates, SyncContext, prepare_lazily_loaded_members, state::{build_state_incremental, build_state_initial}, @@ -50,25 +51,301 @@ use crate::client::{ skip_all, fields( room_id = ?room_id, + syncing_user = ?sync_context.syncing_user, ), )] -#[allow(clippy::too_many_arguments)] pub(super) async fn load_joined_room( services: &Services, sync_context: SyncContext<'_>, ref room_id: OwnedRoomId, ) -> Result<(JoinedRoom, DeviceListUpdates)> { - let SyncContext { + /* + Building a sync response involves many steps which all depend on each other. + To parallelize the process as much as possible, each step is divided into its own function, + and `join*` functions are used to perform steps in parallel which do not depend on each other. + */ + + let ( + account_data, + ephemeral, + StateAndTimeline { + state_events, + timeline, + summary, + notification_counts, + device_list_updates, + }, + ) = try_join3( + build_account_data(services, sync_context, room_id), + build_ephemeral(services, sync_context, room_id), + build_state_and_timeline(services, sync_context, room_id), + ) + .await?; + + if !timeline.is_empty() || !state_events.is_empty() { + trace!( + "syncing {} timeline events (limited = {}) and {} state events", + timeline.events.len(), + timeline.limited, + state_events.len() + ); + } + + let joined_room = JoinedRoom { + account_data, + summary: summary.unwrap_or_default(), + unread_notifications: notification_counts.unwrap_or_default(), + timeline, + state: RoomState { + events: state_events.into_iter().map(Event::into_format).collect(), + }, + ephemeral, + unread_thread_notifications: BTreeMap::new(), + }; + + Ok((joined_room, device_list_updates)) +} + +/// Collect changes to the syncing user's account data events. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_account_data( + services: &Services, + SyncContext { syncing_user, last_sync_end_count, current_count, - full_state, - filter, .. - } = sync_context; - let mut device_list_updates = DeviceListUpdates::new(); + }: SyncContext<'_>, + room_id: &RoomId, +) -> Result { + let account_data_changes = services + .account_data + .changes_since(Some(room_id), syncing_user, last_sync_end_count, Some(current_count)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await; - // the room state as of `next_batch`. + Ok(RoomAccountData { events: account_data_changes }) +} + +/// Collect new ephemeral events. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_ephemeral( + services: &Services, + SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>, + room_id: &RoomId, +) -> Result { + // note: some of the futures below are boxed. this is because, without the box, + // rustc produces over thirty inscrutable errors in `mod.rs` at the call-site + // of `load_joined_room`. I don't know why boxing them fixes this -- it seems + // to be related to the async closures and borrowing from the sync context. + + // collect updates to read receipts + let receipt_events = services + .rooms + .read_receipt + .readreceipts_since(room_id, last_sync_end_count) + .filter_map(async |(read_user, _, edu)| { + let is_ignored = services + .users + .user_is_ignored(&read_user, syncing_user) + .await; + + // filter out read receipts for ignored users + is_ignored.or_some(edu) + }) + .collect::>() + .boxed(); + + // collect the updated list of typing users, if it's changed + let typing_event = async { + let should_send_typing_event = match last_sync_end_count { + | Some(last_sync_end_count) => { + match services.rooms.typing.last_typing_update(room_id).await { + | Ok(last_typing_update) => { + // update the typing list if the users typing have changed since the last + // sync + last_typing_update > last_sync_end_count + }, + | Err(err) => { + warn!("Error checking last typing update: {}", err); + return None; + }, + } + }, + // always update the typing list on an initial sync + | None => true, + }; + + if should_send_typing_event { + let event = services + .rooms + .typing + .typings_event_for_user(room_id, syncing_user) + .await; + + if let Ok(event) = event { + return Some( + Raw::new(&event) + .expect("typing event should be valid") + .cast(), + ); + } + } + + return None; + }; + + // collect the syncing user's private-read marker, if it's changed + let private_read_event = async { + let should_send_private_read = match last_sync_end_count { + | Some(last_sync_end_count) => { + let last_privateread_update = services + .rooms + .read_receipt + .last_privateread_update(syncing_user, room_id) + .await; + + // update the marker if it's changed since the last sync + last_privateread_update > last_sync_end_count + }, + // always update the marker on an initial sync + | None => true, + }; + + if should_send_private_read { + services + .rooms + .read_receipt + .private_read_get(room_id, syncing_user) + .await + .ok() + } else { + None + } + }; + + let (receipt_events, typing_event, private_read_event) = + join3(receipt_events, typing_event, private_read_event).await; + + let mut edus = receipt_events; + edus.extend(typing_event); + edus.extend(private_read_event); + + Ok(Ephemeral { events: edus }) +} + +/// A struct to hold the state events, timeline, and other data which is +/// computed from them. +struct StateAndTimeline { + state_events: Vec, + timeline: Timeline, + summary: Option, + notification_counts: Option, + device_list_updates: DeviceListUpdates, +} + +/// Compute changes to the room's state and timeline. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_state_and_timeline( + services: &Services, + sync_context: SyncContext<'_>, + room_id: &RoomId, +) -> Result { + let (shortstatehashes, timeline) = try_join( + fetch_shortstatehashes(services, sync_context, room_id), + build_timeline(services, sync_context, room_id), + ) + .await?; + + let (state_events, notification_counts, joined_since_last_sync) = try_join3( + build_state_events(services, sync_context, room_id, shortstatehashes, &timeline), + build_notification_counts(services, sync_context, room_id, &timeline), + check_joined_since_last_sync(services, shortstatehashes, sync_context), + ) + .await?; + + // the timeline should always include at least one PDU if the syncing user + // joined since the last sync, that being the syncing user's join event. if + // it's empty something is wrong. + if joined_since_last_sync && timeline.pdus.is_empty() { + warn!("timeline for newly joined room is empty"); + } + + let (summary, device_list_updates) = try_join( + build_room_summary( + services, + sync_context, + room_id, + shortstatehashes, + &timeline, + &state_events, + joined_since_last_sync, + ), + build_device_list_updates( + services, + sync_context, + room_id, + shortstatehashes, + &state_events, + joined_since_last_sync, + ), + ) + .await?; + + // the token which may be passed to the messages API to backfill room history + let prev_batch = timeline.pdus.front().map(at!(0)); + + // note: we always indicate a limited timeline if the syncing user just joined + // the room, to indicate to the client that it should request backfill (and to + // copy Synapse's behavior). for federated room joins, the `timeline` will + // usually only include the syncing user's join event. + let limited = timeline.limited || joined_since_last_sync; + + // filter out ignored events from the timeline and convert the PDUs into Ruma's + // AnySyncTimelineEvent type + let filtered_timeline = timeline + .pdus + .into_iter() + .stream() + .wide_filter_map(|item| ignored_filter(services, item, sync_context.syncing_user)) + .map(at!(1)) + .map(Event::into_format) + .collect::>() + .await; + + Ok(StateAndTimeline { + state_events, + timeline: Timeline { + limited, + prev_batch: prev_batch.as_ref().map(ToString::to_string), + events: filtered_timeline, + }, + summary, + notification_counts, + device_list_updates, + }) +} + +/// Shortstatehashes necessary to compute what state events to sync. +#[derive(Clone, Copy)] +struct ShortStateHashes { + /// The current state of the syncing room. + current_shortstatehash: ShortStateHash, + /// The state of the syncing room at the end of the last sync. + last_sync_end_shortstatehash: Option, +} + +/// Fetch the current_shortstatehash and last_sync_end_shortstatehash. +#[tracing::instrument(level = "debug", skip_all)] +async fn fetch_shortstatehashes( + services: &Services, + SyncContext { last_sync_end_count, current_count, .. }: SyncContext<'_>, + room_id: &RoomId, +) -> Result { + // the room state currently. + // TODO: this should be the room state as of `current_count`, but there's no way + // to get that right now. let current_shortstatehash = services .rooms .state @@ -80,10 +357,18 @@ pub(super) async fn load_joined_room( // room. let last_sync_end_shortstatehash = OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| { + // look up the shortstatehash saved by the last sync's call to + // `associate_token_shortstatehash` services .rooms .user .get_token_shortstatehash(room_id, last_sync_end_count) + .inspect_err(move |_| { + debug_warn!( + token = last_sync_end_count, + "Room has no shortstatehash for this token" + ) + }) .ok() })) .map(Option::flatten) @@ -92,11 +377,47 @@ pub(super) async fn load_joined_room( let (current_shortstatehash, last_sync_end_shortstatehash) = try_join(current_shortstatehash, last_sync_end_shortstatehash).await?; - // load recent timeline events. - // if the filter specifies a limit, that will be used, otherwise - // `DEFAULT_TIMELINE_LIMIT` will be used. `DEFAULT_TIMELINE_LIMIT` will also be - // used if the limit is somehow greater than usize::MAX. + /* + associate the `current_count` with the `current_shortstatehash`, so we can + use it on the next sync as the `last_sync_end_shortstatehash`. + TODO: the table written to by this call grows extremely fast, gaining one new entry for each + joined room on _every single sync request_. we need to find a better way to remember the shortstatehash + between syncs. + */ + services + .rooms + .user + .associate_token_shortstatehash(room_id, current_count, current_shortstatehash) + .await; + + Ok(ShortStateHashes { + current_shortstatehash, + last_sync_end_shortstatehash, + }) +} + +/// Fetch recent timeline events. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_timeline( + services: &Services, + sync_context: SyncContext<'_>, + room_id: &RoomId, +) -> Result { + let SyncContext { + syncing_user, + last_sync_end_count, + current_count, + filter, + .. + } = sync_context; + + /* + determine the maximum number of events to return in this sync. + if the sync filter specifies a limit, that will be used, otherwise + `DEFAULT_TIMELINE_LIMIT` will be used. `DEFAULT_TIMELINE_LIMIT` will also be + used if the limit is somehow greater than usize::MAX. + */ let timeline_limit = filter .room .timeline @@ -104,32 +425,41 @@ pub(super) async fn load_joined_room( .and_then(|limit| limit.try_into().ok()) .unwrap_or(DEFAULT_TIMELINE_LIMIT); - let timeline = load_timeline( + load_timeline( services, syncing_user, room_id, last_sync_end_count.map(PduCount::Normal), Some(PduCount::Normal(current_count)), timeline_limit, - ); + ) + .await +} - let receipt_events = services - .rooms - .read_receipt - .readreceipts_since(room_id, last_sync_end_count) - .filter_map(|(read_user, _, edu)| async move { - services - .users - .user_is_ignored(read_user, syncing_user) - .await - .or_some((read_user.to_owned(), edu)) - }) - .collect::>>() - .map(Ok); +/// Calculate the state events to sync. +async fn build_state_events( + services: &Services, + sync_context: SyncContext<'_>, + room_id: &RoomId, + shortstatehashes: ShortStateHashes, + timeline: &TimelinePdus, +) -> Result> { + let SyncContext { + syncing_user, + last_sync_end_count, + full_state, + .. + } = sync_context; - let (timeline, receipt_events) = try_join(timeline, receipt_events).boxed().await?; + let ShortStateHashes { + current_shortstatehash, + last_sync_end_shortstatehash, + } = shortstatehashes; - // the state at the beginning of the timeline + // the spec states that the `state` property only includes state events up to + // the beginning of the timeline, so we determine the state of the syncing room + // as of the first timeline event. NOTE: this explanation is not entirely + // accurate; see the implementation of `build_state_incremental`. let timeline_start_shortstatehash = async { if let Some((_, pdu)) = timeline.pdus.front() { if let Ok(shortstatehash) = services @@ -145,74 +475,16 @@ pub(super) async fn load_joined_room( current_shortstatehash }; - let last_notification_read: OptionFuture<_> = timeline - .pdus - .is_empty() - .then(|| { - services - .rooms - .user - .last_notification_read(syncing_user, room_id) - }) - .into(); - - // the syncing user's membership event during the last sync. - // this will be None if `previous_sync_end_shortstatehash` is None. - let membership_during_previous_sync: OptionFuture<_> = last_sync_end_shortstatehash - .map(|shortstatehash| { - services - .rooms - .state_accessor - .state_get_content( - shortstatehash, - &StateEventType::RoomMember, - syncing_user.as_str(), - ) - .ok() - }) - .into(); - - let is_encrypted_room = services - .rooms - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); - - let ( - last_notification_read, - membership_during_previous_sync, - timeline_start_shortstatehash, - is_encrypted_room, - ) = join4( - last_notification_read, - membership_during_previous_sync, - timeline_start_shortstatehash, - is_encrypted_room, - ) - .await; - - // TODO: If the requesting user got state-reset out of the room, this - // will be `true` when it shouldn't be. this function should never be called - // in that situation, but it may be if the membership cache didn't get updated. - // the root cause of this needs to be addressed - let joined_since_last_sync = membership_during_previous_sync.flatten().is_none_or( - |content: RoomMemberEventContent| content.membership != MembershipState::Join, - ); - - // the timeline should always include at least one PDU if the syncing user - // joined since the last sync, that being the syncing user's join event. if - // it's empty something is wrong. - if joined_since_last_sync && timeline.pdus.is_empty() { - warn!("timeline for newly joined room is empty"); - } - // the user IDs of members whose membership needs to be sent to the client, if // lazy-loading is enabled. let lazily_loaded_members = - prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()).await; + prepare_lazily_loaded_members(services, sync_context, room_id, timeline.senders()); + + let (timeline_start_shortstatehash, lazily_loaded_members) = + join(timeline_start_shortstatehash, lazily_loaded_members).await; // compute the state delta between the previous sync and this sync. - let state_events = match (last_sync_end_count, last_sync_end_shortstatehash) { + match (last_sync_end_count, last_sync_end_shortstatehash) { /* if `last_sync_end_count` is Some (meaning this is an incremental sync), and `last_sync_end_shortstatehash` is Some (meaning the syncing user didn't just join this room for the first time ever), and `full_state` is false, @@ -231,7 +503,7 @@ pub(super) async fn load_joined_room( lazily_loaded_members.as_ref(), ) .boxed() - .await?, + .await, /* otherwise use `build_state_initial`. note that this branch will be taken if the user joined this room since the last sync for the first time ever, because in that case we have no `last_sync_end_shortstatehash` and can't correctly calculate @@ -245,245 +517,146 @@ pub(super) async fn load_joined_room( lazily_loaded_members.as_ref(), ) .boxed() - .await?, + .await, + } +} + +/// Compute the number of unread notifications in this room. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_notification_counts( + services: &Services, + SyncContext { syncing_user, last_sync_end_count, .. }: SyncContext<'_>, + room_id: &RoomId, + timeline: &TimelinePdus, +) -> Result> { + // determine whether to actually update the notification counts + let should_send_notification_counts = async { + // if we're going to sync some timeline events, the notification count has + // definitely changed to include them + if !timeline.pdus.is_empty() { + return true; + } + + // if this is an initial sync, we need to send notification counts because the + // client doesn't know what they are yet + let Some(last_sync_end_count) = last_sync_end_count else { + return true; + }; + + let last_notification_read = services + .rooms + .user + .last_notification_read(syncing_user, room_id) + .await; + + // if the syncing user has read the events we sent during the last sync, we need + // to send a new notification count on this sync. + if last_notification_read > last_sync_end_count { + return true; + } + + // otherwise, nothing's changed. + return false; }; - // for incremental syncs, calculate updates to E2EE device lists - if last_sync_end_count.is_some() && is_encrypted_room { - extend_device_list_updates( - services, - sync_context, - room_id, - &mut device_list_updates, - &state_events, - joined_since_last_sync, + if should_send_notification_counts.await { + let (notification_count, highlight_count) = join( + services + .rooms + .user + .notification_count(syncing_user, room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)), + services + .rooms + .user + .highlight_count(syncing_user, room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)), ) .await; + + trace!(?notification_count, ?highlight_count, "syncing new notification counts"); + + Ok(Some(UnreadNotificationsCount { + notification_count: Some(notification_count), + highlight_count: Some(highlight_count), + })) + } else { + Ok(None) + } +} + +/// Check if the syncing user joined the room since their last incremental sync. +#[tracing::instrument(level = "debug", skip_all)] +async fn check_joined_since_last_sync( + services: &Services, + ShortStateHashes { last_sync_end_shortstatehash, .. }: ShortStateHashes, + SyncContext { syncing_user, .. }: SyncContext<'_>, +) -> Result { + // fetch the syncing user's membership event during the last sync. + // this will be None if `previous_sync_end_shortstatehash` is None. + let membership_during_previous_sync = match last_sync_end_shortstatehash { + | Some(last_sync_end_shortstatehash) => services + .rooms + .state_accessor + .state_get_content( + last_sync_end_shortstatehash, + &StateEventType::RoomMember, + syncing_user.as_str(), + ) + .await + .inspect_err(|_| debug_warn!("User has no previous membership")) + .ok(), + | None => None, + }; + + // TODO: If the requesting user got state-reset out of the room, this + // will be `true` when it shouldn't be. this function should never be called + // in that situation, but it may be if the membership cache didn't get updated. + // the root cause of this needs to be addressed + let joined_since_last_sync = + membership_during_previous_sync.is_none_or(|content: RoomMemberEventContent| { + content.membership != MembershipState::Join + }); + + if joined_since_last_sync { + trace!("user joined since last sync"); } - /* - build the `summary` field of the room object. this is necessary if: - 1. the syncing user joined this room since the last sync, because their client doesn't have a summary for this room yet, or - 2. we're going to sync a membership event in either `state` or `timeline`, because that event may impact - the joined/invited counts in the summary - */ - let sending_membership_events = timeline + Ok(joined_since_last_sync) +} + +/// Build the `summary` field of the room object, which includes +/// the number of joined and invited users and the room's heroes. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_room_summary( + services: &Services, + SyncContext { syncing_user, .. }: SyncContext<'_>, + room_id: &RoomId, + ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes, + timeline: &TimelinePdus, + state_events: &Vec, + joined_since_last_sync: bool, +) -> Result> { + // determine whether any events in the state or timeline are membership events. + let are_syncing_membership_events = timeline .pdus .iter() .map(|(_, pdu)| pdu) .chain(state_events.iter()) .any(|event| event.kind == RoomMember); - let summary = if sending_membership_events || joined_since_last_sync { - build_room_summary(services, room_id, syncing_user, current_shortstatehash).await? - } else { - RoomSummary::default() - }; - - // the prev_batch token for the response - let prev_batch = timeline.pdus.front().map(at!(0)); - - let filtered_timeline = timeline - .pdus - .into_iter() - .stream() - // filter out ignored events from the timeline - .wide_filter_map(|item| ignored_filter(services, item, syncing_user)) - .map(at!(1)) - .map(Event::into_format) - .collect::>(); - - let account_data_events = services - .account_data - .changes_since(Some(room_id), syncing_user, last_sync_end_count, Some(current_count)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(); - /* - send notification counts if: - 1. this is an initial sync, or - 2. the user hasn't seen any notifications, or - 3. the last notification the user saw has changed since the last sync + we only need to send an updated room summary if: + 1. there are membership events in the state or timeline, because they might have changed the + membership counts or heroes, or + 2. the syncing user just joined this room, which usually implies #1 because their join event should be in the timeline. */ - let send_notification_counts = last_notification_read.is_none_or(|last_notification_read| { - last_sync_end_count - .is_none_or(|last_sync_end_count| last_notification_read > last_sync_end_count) - }); - - let notification_count: OptionFuture<_> = send_notification_counts - .then(|| { - services - .rooms - .user - .notification_count(syncing_user, room_id) - .map(TryInto::try_into) - .unwrap_or(uint!(0)) - }) - .into(); - - let highlight_count: OptionFuture<_> = send_notification_counts - .then(|| { - services - .rooms - .user - .highlight_count(syncing_user, room_id) - .map(TryInto::try_into) - .unwrap_or(uint!(0)) - }) - .into(); - - let typing_events = services - .rooms - .typing - .last_typing_update(room_id) - .and_then(|count| async move { - if last_sync_end_count.is_some_and(|last_sync_end_count| count <= last_sync_end_count) - { - return Ok(Vec::>::new()); - } - - let typings = services - .rooms - .typing - .typings_event_for_user(room_id, syncing_user) - .await?; - - Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) - }) - .unwrap_or(Vec::new()); - - let unread_notifications = join(notification_count, highlight_count); - let events = join3(filtered_timeline, account_data_events, typing_events); - let (unread_notifications, events) = join(unread_notifications, events).boxed().await; - - let (timeline_events, account_data_events, typing_events) = events; - let (notification_count, highlight_count) = unread_notifications; - - let last_privateread_update = if let Some(last_sync_end_count) = last_sync_end_count { - services - .rooms - .read_receipt - .last_privateread_update(syncing_user, room_id) - .await > last_sync_end_count - } else { - true - }; - - let private_read_event = if last_privateread_update { - services - .rooms - .read_receipt - .private_read_get(room_id, syncing_user) - .await - .ok() - } else { - None - }; - - let edus: Vec> = receipt_events - .into_values() - .chain(typing_events.into_iter()) - .chain(private_read_event.into_iter()) - .collect(); - - // save the room state at this sync to use during the next sync - services - .rooms - .user - .associate_token_shortstatehash(room_id, current_count, current_shortstatehash) - .await; - - let joined_room = JoinedRoom { - account_data: RoomAccountData { events: account_data_events }, - summary, - unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, - timeline: Timeline { - // mirror Synapse behavior by setting `limited` if the user joined since the last sync - limited: timeline.limited || joined_since_last_sync, - prev_batch: prev_batch.as_ref().map(ToString::to_string), - events: timeline_events, - }, - state: RoomState { - events: state_events.into_iter().map(Event::into_format).collect(), - }, - ephemeral: Ephemeral { events: edus }, - unread_thread_notifications: BTreeMap::new(), - }; - - Ok((joined_room, device_list_updates)) -} - -async fn extend_device_list_updates( - services: &Services, - SyncContext { - syncing_user, - last_sync_end_count: since, - current_count, - .. - }: SyncContext<'_>, - room_id: &RoomId, - device_list_updates: &mut DeviceListUpdates, - state_events: &Vec, - joined_since_last_sync: bool, -) { - // add users with changed keys to the `changed` list - services - .users - .room_keys_changed(room_id, since, Some(current_count)) - .map(at!(0)) - .map(ToOwned::to_owned) - .ready_for_each(|user_id| { - device_list_updates.changed.insert(user_id); - }) - .await; - - // add users who now share encrypted rooms to `changed` and - // users who no longer share encrypted rooms to `left` - for state_event in state_events { - if state_event.kind == RoomMember { - let Some(content): Option = state_event.get_content().ok() - else { - continue; - }; - - let Some(user_id): Option = state_event - .state_key - .as_ref() - .and_then(|key| key.parse().ok()) - else { - continue; - }; - - { - use MembershipState::*; - - if matches!(content.membership, Leave | Join) { - let shares_encrypted_room = - share_encrypted_room(services, syncing_user, &user_id, Some(room_id)) - .await; - match content.membership { - | Leave if !shares_encrypted_room => { - device_list_updates.left.insert(user_id); - }, - | Join if joined_since_last_sync || shares_encrypted_room => { - device_list_updates.changed.insert(user_id); - }, - | _ => (), - } - } - } - } + if !(are_syncing_membership_events || joined_since_last_sync) { + return Ok(None); } -} -/// Build the `summary` field of the room object, which includes -/// the number of joined and invited users and the room's heroes. -async fn build_room_summary( - services: &Services, - room_id: &RoomId, - syncing_user: &UserId, - current_shortstatehash: ShortStateHash, -) -> Result { let joined_member_count = services .rooms .state_cache @@ -510,18 +683,26 @@ async fn build_room_summary( join4(joined_member_count, invited_member_count, has_name, has_canonical_alias).await; // only send heroes if the room has neither a name nor a canonical alias - let heroes: OptionFuture<_> = (!(has_name || has_canonical_alias)) - .then(|| build_heroes(services, room_id, syncing_user, current_shortstatehash)) - .into(); + let heroes = if !(has_name || has_canonical_alias) { + Some(build_heroes(services, room_id, syncing_user, current_shortstatehash).await) + } else { + None + }; - Ok(RoomSummary { + trace!( + ?joined_member_count, + ?invited_member_count, + heroes_length = heroes.as_ref().map(|h| h.len()), + "syncing updated summary" + ); + + Ok(Some(RoomSummary { heroes: heroes - .await .map(|heroes| heroes.into_iter().collect()) .unwrap_or_default(), joined_member_count: Some(ruma_from_u64(joined_member_count)), invited_member_count: Some(ruma_from_u64(invited_member_count)), - }) + })) } /// Fetch the user IDs to include in the `m.heroes` property of the room @@ -578,3 +759,92 @@ async fn build_heroes( .collect() .await } + +/// Collect updates to users' device lists for E2EE. +#[tracing::instrument(level = "debug", skip_all)] +async fn build_device_list_updates( + services: &Services, + SyncContext { + syncing_user, + last_sync_end_count, + current_count, + .. + }: SyncContext<'_>, + room_id: &RoomId, + ShortStateHashes { current_shortstatehash, .. }: ShortStateHashes, + state_events: &Vec, + joined_since_last_sync: bool, +) -> Result { + let is_encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + // initial syncs don't include device updates, and rooms which aren't encrypted + // don't affect them, so return early in either of those cases + if last_sync_end_count.is_none() || !(is_encrypted_room.await) { + return Ok(DeviceListUpdates::new()); + } + + let mut device_list_updates = DeviceListUpdates::new(); + + // add users with changed keys to the `changed` list + services + .users + .room_keys_changed(room_id, last_sync_end_count, Some(current_count)) + .map(at!(0)) + .map(ToOwned::to_owned) + .ready_for_each(|user_id| { + device_list_updates.changed.insert(user_id); + }) + .await; + + // add users who now share encrypted rooms to `changed` and + // users who no longer share encrypted rooms to `left` + for state_event in state_events { + if state_event.kind == RoomMember { + let Some(content): Option = state_event.get_content().ok() + else { + continue; + }; + + let Some(user_id): Option = state_event + .state_key + .as_ref() + .and_then(|key| key.parse().ok()) + else { + continue; + }; + + { + use MembershipState::*; + + if matches!(content.membership, Leave | Join) { + let shares_encrypted_room = + share_encrypted_room(services, syncing_user, &user_id, Some(room_id)) + .await; + match content.membership { + | Leave if !shares_encrypted_room => { + device_list_updates.left.insert(user_id); + }, + | Join if joined_since_last_sync || shares_encrypted_room => { + device_list_updates.changed.insert(user_id); + }, + | _ => (), + } + } + } + } + } + + if !device_list_updates.is_empty() { + trace!( + changed = device_list_updates.changed.len(), + left = device_list_updates.left.len(), + "syncing device list updates" + ); + } + + Ok(device_list_updates) +} diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index bec91559..9d187de6 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -75,6 +75,8 @@ impl DeviceListUpdates { self.changed.extend(other.changed); self.left.extend(other.left); } + + fn is_empty(&self) -> bool { self.changed.is_empty() && self.left.is_empty() } } impl From for DeviceLists { diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index 4799c884..e1f016f7 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -105,7 +105,11 @@ pub(super) async fn build_state_incremental<'a>( The algorithm implemented in this function is, currently, quite different from the algorithm vaguely described by the Matrix specification. This is because the specification's description of the `state` property does not accurately - reflect how Synapse behaves, and therefore how client SDKs behave. + reflect how Synapse behaves, and therefore how client SDKs behave. Notable differences include: + 1. We do not compute the delta using the naive approach of "every state event from the end of the last sync + up to the start of this sync's timeline". see below for details. + 2. If lazy-loading is enabled, we include lazily-loaded membership events. The specific users to include are determined + elsewhere and supplied to this function in the `lazily_loaded_members` parameter. */ /* @@ -206,9 +210,11 @@ pub(super) async fn build_state_incremental<'a>( at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates computing the incremental state (which may be empty). - NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering - if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon - enough that the performance penalty is acceptable. + NOTE: this code path does not use the `lazy_membership_events` parameter. any changes to membership will be included + in the incremental state. therefore, the incremental state may include "redundant" membership events, + which we do not filter out because A. the spec forbids lazy-load filtering if the timeline is `limited`, + and B. DAG splits which require sending extra membership state events are (probably) uncommon enough that + the performance penalty is acceptable. */ trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync"); diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index f38083db..af510104 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -472,7 +472,7 @@ where .filter_map(|(read_user, _ts, v)| async move { services .users - .user_is_ignored(read_user, sender_user) + .user_is_ignored(&read_user, sender_user) .await .or_some(v) }) diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index 9a2fa70c..c8df393d 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -7,7 +7,7 @@ use conduwuit::{ use database::{Deserialized, Json, Map}; use futures::{Stream, StreamExt}; use ruma::{ - CanonicalJsonObject, RoomId, UserId, + CanonicalJsonObject, OwnedUserId, RoomId, UserId, events::{AnySyncEphemeralRoomEvent, receipt::ReceiptEvent}, serde::Raw, }; @@ -25,7 +25,7 @@ struct Services { globals: Dep, } -pub(super) type ReceiptItem<'a> = (&'a UserId, u64, Raw); +pub(super) type ReceiptItem = (OwnedUserId, u64, Raw); impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { @@ -65,7 +65,7 @@ impl Data { &'a self, room_id: &'a RoomId, since: u64, - ) -> impl Stream> + Send + 'a { + ) -> impl Stream + Send + 'a { type Key<'a> = (&'a RoomId, u64, &'a UserId); type KeyVal<'a> = (Key<'a>, CanonicalJsonObject); @@ -81,7 +81,7 @@ impl Data { let event = serde_json::value::to_raw_value(&json)?; - Ok((user_id, count, Raw::from_json(event))) + Ok((user_id.to_owned(), count, Raw::from_json(event))) }) .ignore_err() } diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index df66d942..09381e15 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -112,7 +112,7 @@ impl Service { &'a self, room_id: &'a RoomId, since: Option, - ) -> impl Stream> + Send + 'a { + ) -> impl Stream + Send + 'a { self.db.readreceipts_since(room_id, since.unwrap_or(0)) } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 757e0d07..1cf3763c 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -530,7 +530,7 @@ impl Service { } max_edu_count.fetch_max(count, Ordering::Relaxed); - if !self.services.globals.user_is_local(user_id) { + if !self.services.globals.user_is_local(&user_id) { continue; } @@ -554,7 +554,7 @@ impl Service { let receipt = receipt .remove(&ReceiptType::Read) .expect("our read receipts always set this") - .remove(user_id) + .remove(&user_id) .expect("our read receipts always have the user here"); let receipt_data = ReceiptData { @@ -562,7 +562,7 @@ impl Service { event_ids: vec![event_id.clone()], }; - if read.insert(user_id.to_owned(), receipt_data).is_none() { + if read.insert(user_id, receipt_data).is_none() { *num = num.saturating_add(1); if *num >= SELECT_RECEIPT_LIMIT { break;