From 3491f653a582bdb4b907b49d37e8dd4a8869d929 Mon Sep 17 00:00:00 2001 From: Ginger Date: Wed, 22 Oct 2025 09:31:08 -0400 Subject: [PATCH] refactor: Split sync v3 into multiple files --- src/api/client/sync/{v3.rs => v3/joined.rs} | 567 +------------------- src/api/client/sync/v3/left.rs | 196 +++++++ src/api/client/sync/v3/mod.rs | 395 ++++++++++++++ 3 files changed, 605 insertions(+), 553 deletions(-) rename src/api/client/sync/{v3.rs => v3/joined.rs} (52%) create mode 100644 src/api/client/sync/v3/left.rs create mode 100644 src/api/client/sync/v3/mod.rs diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3/joined.rs similarity index 52% rename from src/api/client/sync/v3.rs rename to src/api/client/sync/v3/joined.rs index 1209e88b..92ad4c35 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3/joined.rs @@ -1,25 +1,19 @@ -use std::{ - cmp::{self}, - collections::{BTreeMap, HashMap, HashSet}, - time::Duration, -}; +use std::collections::{BTreeMap, HashMap}; -use axum::extract::State; use conduwuit::{ - Result, at, err, error, extract_variant, is_equal_to, + Result, at, err, extract_variant, is_equal_to, matrix::{ Event, - pdu::{EventHash, PduCount, PduEvent}, + pdu::{PduCount, PduEvent}, }, ref_at, result::FlatOk, utils::{ - self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, - future::{OptionStream, ReadyEqExt}, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, + future::OptionStream, math::ruma_from_u64, stream::{BroadbandExt, Tools, WidebandExt}, }, - warn, }; use conduwuit_service::{ Services, @@ -31,27 +25,17 @@ use conduwuit_service::{ }; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join, join3, join4, join5, try_join4}, - pin_mut, + future::{OptionFuture, join, join3, join4, try_join4}, }; use ruma::{ - DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, - api::client::{ - filter::FilterDefinition, - sync::sync_events::{ - self, DeviceLists, UnreadNotificationsCount, - v3::{ - Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom, - KnockState, KnockedRoom, LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, - State as RoomState, Timeline, ToDevice, - }, - }, - uiaa::UiaaResponse, + OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + api::client::sync::sync_events::{ + UnreadNotificationsCount, + v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline}, }, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType::*, - presence::{PresenceEvent, PresenceEventContent}, room::member::{MembershipState, RoomMemberEventContent}, }, serde::Raw, @@ -61,534 +45,11 @@ use service::rooms::short::{ShortEventId, ShortStateKey}; use tracing::trace; use super::{load_timeline, share_encrypted_room}; -use crate::{ - Ruma, RumaResponse, - client::{TimelinePdus, ignored_filter, is_ignored_invite}, +use crate::client::{ + TimelinePdus, ignored_filter, + sync::v3::{DeviceListUpdates, SyncContext}, }; -struct DeviceListUpdates { - changed: HashSet, - left: HashSet, -} - -impl DeviceListUpdates { - fn new() -> Self { - Self { - changed: HashSet::new(), - left: HashSet::new(), - } - } - - fn merge(&mut self, other: Self) { - self.changed.extend(other.changed); - self.left.extend(other.left); - } -} - -impl From for DeviceLists { - fn from(val: DeviceListUpdates) -> Self { - Self { - changed: val.changed.into_iter().collect(), - left: val.left.into_iter().collect(), - } - } -} - -#[derive(Clone, Copy)] -struct SyncContext<'a> { - sender_user: &'a UserId, - sender_device: &'a DeviceId, - since: Option, - next_batch: u64, - full_state: bool, - filter: &'a FilterDefinition, -} - -type PresenceUpdates = HashMap; - -/// # `GET /_matrix/client/r0/sync` -/// -/// Synchronize the client's state with the latest state on the server. -/// -/// - This endpoint takes a `since` parameter which should be the `next_batch` -/// value from a previous request for incremental syncs. -/// -/// Calling this endpoint without a `since` parameter returns: -/// - Some of the most recent events of each timeline -/// - Notification counts for each room -/// - Joined and invited member counts, heroes -/// - All state events -/// -/// Calling this endpoint with a `since` parameter from a previous `next_batch` -/// returns: For joined rooms: -/// - Some of the most recent events of each timeline that happened after since -/// - If user joined the room after since: All state events (unless lazy loading -/// is activated) and all device list updates in that room -/// - If the user was already in the room: A list of all events that are in the -/// state now, but were not in the state at `since` -/// - If the state we send contains a member event: Joined and invited member -/// counts, heroes -/// - Device list updates that happened after `since` -/// - If there are events in the timeline we send or the user send updated his -/// read mark: Notification counts -/// - EDUs that are active now (read receipts, typing updates, presence) -/// - TODO: Allow multiple sync streams to support Pantalaimon -/// -/// For invited rooms: -/// - If the user was invited after `since`: A subset of the state of the room -/// at the point of the invite -/// -/// For left rooms: -/// - If the user left after `since`: `prev_batch` token, empty state (TODO: -/// subset of the state at the point of the leave) -#[tracing::instrument( - name = "sync", - level = "debug", - skip_all, - fields( - since = %body.body.since.as_deref().unwrap_or_default(), - ) -)] -pub(crate) async fn sync_events_route( - State(services): State, - body: Ruma, -) -> Result> { - let (sender_user, sender_device) = body.sender(); - - // Presence update - if services.config.allow_local_presence { - services - .presence - .ping_presence(sender_user, &body.body.set_presence) - .await?; - } - - // Setup watchers, so if there's no response, we can wait for them - let watcher = services.sync.watch(sender_user, sender_device); - - let response = build_sync_events(&services, &body).await?; - if body.body.full_state - || !(response.rooms.is_empty() - && response.presence.is_empty() - && response.account_data.is_empty() - && response.device_lists.is_empty() - && response.to_device.is_empty()) - { - return Ok(response); - } - - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives - let default = Duration::from_secs(30); - let duration = cmp::min(body.body.timeout.unwrap_or(default), default); - _ = tokio::time::timeout(duration, watcher).await; - - // Retry returning data - build_sync_events(&services, &body).await -} - -pub(crate) async fn build_sync_events( - services: &Services, - body: &Ruma, -) -> Result> { - let (sender_user, sender_device) = body.sender(); - - let next_batch = services.globals.current_count()?; - let since = body - .body - .since - .as_ref() - .and_then(|string| string.parse().ok()); - - let full_state = body.body.full_state; - - // FilterDefinition is very large (0x1000 bytes), let's put it on the heap - let filter = Box::new(match body.body.filter.as_ref() { - | None => FilterDefinition::default(), - | Some(Filter::FilterDefinition(filter)) => filter.clone(), - | Some(Filter::FilterId(filter_id)) => services - .users - .get_filter(sender_user, filter_id) - .await - .unwrap_or_default(), - }); - - let context = SyncContext { - sender_user, - sender_device, - since, - next_batch, - full_state, - filter: &filter, - }; - - let joined_rooms = services - .rooms - .state_cache - .rooms_joined(sender_user) - .map(ToOwned::to_owned) - .broad_filter_map(|room_id| { - load_joined_room(services, context, room_id.clone()) - .map_ok(move |(joined_room, updates)| (room_id, joined_room, updates)) - .ok() - }) - .ready_fold( - (BTreeMap::new(), DeviceListUpdates::new()), - |(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| { - all_updates.merge(updates); - - if !joined_room.is_empty() { - joined_rooms.insert(room_id, joined_room); - } - - (joined_rooms, all_updates) - }, - ); - - let left_rooms = services - .rooms - .state_cache - .rooms_left(sender_user) - .broad_filter_map(|(room_id, _)| { - handle_left_room(services, context, room_id.clone()) - .map_ok(move |left_room| (room_id, left_room)) - .ok() - }) - .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room))) - .collect(); - - let invited_rooms = services - .rooms - .state_cache - .rooms_invited(sender_user) - .wide_filter_map(async |(room_id, invite_state)| { - if is_ignored_invite(services, sender_user, &room_id).await { - None - } else { - Some((room_id, invite_state)) - } - }) - .fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move { - let invite_count = services - .rooms - .state_cache - .get_invite_count(&room_id, sender_user) - .await - .ok(); - - // only sync this invite if it was sent after the last /sync call - if since < invite_count { - let invited_room = InvitedRoom { - invite_state: InviteState { events: invite_state }, - }; - - invited_rooms.insert(room_id, invited_room); - } - invited_rooms - }); - - let knocked_rooms = services - .rooms - .state_cache - .rooms_knocked(sender_user) - .fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move { - let knock_count = services - .rooms - .state_cache - .get_knock_count(&room_id, sender_user) - .await - .ok(); - - // only sync this knock if it was sent after the last /sync call - if since < knock_count { - let knocked_room = KnockedRoom { - knock_state: KnockState { events: knock_state }, - }; - - knocked_rooms.insert(room_id, knocked_room); - } - knocked_rooms - }); - - let presence_updates: OptionFuture<_> = services - .config - .allow_local_presence - .then(|| process_presence_updates(services, since, sender_user)) - .into(); - - let account_data = services - .account_data - .changes_since(None, sender_user, since, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) - .collect(); - - // Look for device list updates of this account - let keys_changed = services - .users - .keys_changed(sender_user, since, Some(next_batch)) - .map(ToOwned::to_owned) - .collect::>(); - - let to_device_events = services - .users - .get_to_device_events(sender_user, sender_device, since, Some(next_batch)) - .collect::>(); - - let device_one_time_keys_count = services - .users - .count_one_time_keys(sender_user, sender_device); - - // Remove all to-device events the device received *last time* - let remove_to_device_events = - services - .users - .remove_to_device_events(sender_user, sender_device, since); - - let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms); - let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); - let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) - .boxed() - .await; - - let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; - let ((), to_device_events, presence_updates) = ephemeral; - let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms; - let (joined_rooms, mut device_list_updates) = joined_rooms; - device_list_updates.changed.extend(keys_changed); - - let response = sync_events::v3::Response { - account_data: GlobalAccountData { events: account_data }, - device_lists: device_list_updates.into(), - device_one_time_keys_count, - // Fallback keys are not yet supported - device_unused_fallback_key_types: None, - next_batch: next_batch.to_string(), - presence: Presence { - events: presence_updates - .into_iter() - .flat_map(IntoIterator::into_iter) - .map(|(sender, content)| PresenceEvent { content, sender }) - .map(|ref event| Raw::new(event)) - .filter_map(Result::ok) - .collect(), - }, - rooms: Rooms { - leave: left_rooms, - join: joined_rooms, - invite: invited_rooms, - knock: knocked_rooms, - }, - to_device: ToDevice { events: to_device_events }, - }; - - Ok(response) -} - -#[tracing::instrument(name = "presence", level = "debug", skip_all)] -async fn process_presence_updates( - services: &Services, - since: Option, - syncing_user: &UserId, -) -> PresenceUpdates { - services - .presence - .presence_since(since.unwrap_or(0)) // send all presences on initial sync - .filter(|(user_id, ..)| { - services - .rooms - .state_cache - .user_sees_user(syncing_user, user_id) - }) - .filter_map(|(user_id, _, presence_bytes)| { - services - .presence - .from_json_bytes_to_event(presence_bytes, user_id) - .map_ok(move |event| (user_id, event)) - .ok() - }) - .map(|(user_id, event)| (user_id.to_owned(), event.content)) - .collect() - .await -} - -#[tracing::instrument( - name = "left", - level = "debug", - skip_all, - fields( - room_id = %room_id, - full = %full_state, - ), -)] -#[allow(clippy::too_many_arguments)] -async fn handle_left_room( - services: &Services, - SyncContext { - sender_user, - since, - next_batch, - full_state, - filter, - .. - }: SyncContext<'_>, - ref room_id: OwnedRoomId, -) -> Result> { - let left_count = services - .rooms - .state_cache - .get_left_count(room_id, sender_user) - .await - .ok(); - - // Left before last sync - let include_leave = filter.room.include_leave; - if (since >= left_count && !include_leave) || Some(next_batch) < left_count { - return Ok(None); - } - - let is_not_found = services.rooms.metadata.exists(room_id).eq(&false); - - let is_disabled = services.rooms.metadata.is_disabled(room_id); - - let is_banned = services.rooms.metadata.is_banned(room_id); - - pin_mut!(is_not_found, is_disabled, is_banned); - if is_not_found.or(is_disabled).or(is_banned).await { - // This is just a rejected invite, not a room we know - // Insert a leave event anyways for the client - let event = PduEvent { - event_id: EventId::new(services.globals.server_name()), - sender: sender_user.to_owned(), - origin: None, - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - kind: RoomMember, - content: serde_json::from_str(r#"{"membership":"leave"}"#) - .expect("this is valid JSON"), - state_key: Some(sender_user.as_str().into()), - unsigned: None, - // The following keys are dropped on conversion - room_id: Some(room_id.clone()), - prev_events: vec![], - depth: uint!(1), - auth_events: vec![], - redacts: None, - hashes: EventHash { sha256: String::new() }, - signatures: None, - }; - - return Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch.to_string()), - events: Vec::new(), - }, - state: RoomState { events: vec![event.into_format()] }, - })); - } - - let mut left_state_events = Vec::new(); - - let since_state_ids = async { - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(room_id, since?) - .ok() - .await?; - - services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .collect::>() - .map(Some) - .await - } - .await - .unwrap_or_default(); - - let Ok(left_event_id): Result = services - .rooms - .state_accessor - .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) - .await - else { - warn!("Left {room_id} but no left state event"); - return Ok(None); - }; - - let Ok(left_shortstatehash) = services - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id) - .await - else { - warn!(event_id = %left_event_id, "Leave event has no state in {room_id}"); - return Ok(None); - }; - - let mut left_state_ids: HashMap<_, _> = services - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .collect() - .await; - - let leave_shortstatekey = services - .rooms - .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str()) - .await; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - for (shortstatekey, event_id) in left_state_ids { - if full_state || since_state_ids.get(&shortstatekey) != Some(&event_id) { - let (event_type, state_key) = services - .rooms - .short - .get_statekey_from_short(shortstatekey) - .await?; - - if filter.room.state.lazy_load_options.is_enabled() - && event_type == StateEventType::RoomMember - && !full_state - && state_key - .as_str() - .try_into() - .is_ok_and(|user_id: &UserId| sender_user != user_id) - { - continue; - } - - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - if !include_leave && pdu.sender == sender_user { - continue; - } - - left_state_events.push(pdu.into_format()); - } - } - - Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, - timeline: Timeline { - // TODO: support left timeline events so we dont need to set limited to true - limited: true, - prev_batch: Some(next_batch.to_string()), - events: Vec::new(), // and so we dont need to set this to empty vec - }, - state: RoomState { events: left_state_events }, - })) -} - #[tracing::instrument( name = "joined", level = "debug", @@ -598,7 +59,7 @@ async fn handle_left_room( ), )] #[allow(clippy::too_many_arguments)] -async fn load_joined_room( +pub(super) async fn load_joined_room( services: &Services, SyncContext { sender_user, diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs new file mode 100644 index 00000000..4e6378b4 --- /dev/null +++ b/src/api/client/sync/v3/left.rs @@ -0,0 +1,196 @@ +use std::collections::HashMap; + +use conduwuit::{ + Event, PduEvent, Result, error, + pdu::EventHash, + utils::{self, FutureBoolExt, TryFutureExtExt, future::ReadyEqExt}, + warn, +}; +use futures::{FutureExt, StreamExt, pin_mut}; +use ruma::{ + EventId, OwnedEventId, OwnedRoomId, UserId, + api::client::sync::sync_events::v3::{LeftRoom, RoomAccountData, State, Timeline}, + events::{StateEventType, TimelineEventType::*}, + uint, +}; +use service::{Services, rooms::lazy_loading::Options}; + +use crate::client::sync::v3::SyncContext; + +#[tracing::instrument( + name = "left", + level = "debug", + skip_all, + fields( + room_id = %room_id, + full = %full_state, + ), +)] +#[allow(clippy::too_many_arguments)] +pub(super) async fn load_left_room( + services: &Services, + SyncContext { + sender_user, + since, + next_batch, + full_state, + filter, + .. + }: SyncContext<'_>, + ref room_id: OwnedRoomId, +) -> Result> { + let left_count = services + .rooms + .state_cache + .get_left_count(room_id, sender_user) + .await + .ok(); + + // Left before last sync + let include_leave = filter.room.include_leave; + if (since >= left_count && !include_leave) || Some(next_batch) < left_count { + return Ok(None); + } + + let is_not_found = services.rooms.metadata.exists(room_id).eq(&false); + + let is_disabled = services.rooms.metadata.is_disabled(room_id); + + let is_banned = services.rooms.metadata.is_banned(room_id); + + pin_mut!(is_not_found, is_disabled, is_banned); + if is_not_found.or(is_disabled).or(is_banned).await { + // This is just a rejected invite, not a room we know + // Insert a leave event anyways for the client + let event = PduEvent { + event_id: EventId::new(services.globals.server_name()), + sender: sender_user.to_owned(), + origin: None, + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + kind: RoomMember, + content: serde_json::from_str(r#"{"membership":"leave"}"#) + .expect("this is valid JSON"), + state_key: Some(sender_user.as_str().into()), + unsigned: None, + // The following keys are dropped on conversion + room_id: Some(room_id.clone()), + prev_events: vec![], + depth: uint!(1), + auth_events: vec![], + redacts: None, + hashes: EventHash { sha256: String::new() }, + signatures: None, + }; + + return Ok(Some(LeftRoom { + account_data: RoomAccountData { events: Vec::new() }, + timeline: Timeline { + limited: false, + prev_batch: Some(next_batch.to_string()), + events: Vec::new(), + }, + state: State { events: vec![event.into_format()] }, + })); + } + + let mut left_state_events = Vec::new(); + + let since_state_ids = async { + let since_shortstatehash = services + .rooms + .user + .get_token_shortstatehash(room_id, since?) + .ok() + .await?; + + services + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .collect::>() + .map(Some) + .await + } + .await + .unwrap_or_default(); + + let Ok(left_event_id): Result = services + .rooms + .state_accessor + .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) + .await + else { + warn!("Left {room_id} but no left state event"); + return Ok(None); + }; + + let Ok(left_shortstatehash) = services + .rooms + .state_accessor + .pdu_shortstatehash(&left_event_id) + .await + else { + warn!(event_id = %left_event_id, "Leave event has no state in {room_id}"); + return Ok(None); + }; + + let mut left_state_ids: HashMap<_, _> = services + .rooms + .state_accessor + .state_full_ids(left_shortstatehash) + .collect() + .await; + + let leave_shortstatekey = services + .rooms + .short + .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str()) + .await; + + left_state_ids.insert(leave_shortstatekey, left_event_id); + + for (shortstatekey, event_id) in left_state_ids { + if full_state || since_state_ids.get(&shortstatekey) != Some(&event_id) { + let (event_type, state_key) = services + .rooms + .short + .get_statekey_from_short(shortstatekey) + .await?; + + if filter.room.state.lazy_load_options.is_enabled() + && event_type == StateEventType::RoomMember + && !full_state + && state_key + .as_str() + .try_into() + .is_ok_and(|user_id: &UserId| sender_user != user_id) + { + continue; + } + + let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { + error!("Pdu in state not found: {event_id}"); + continue; + }; + + if !include_leave && pdu.sender == sender_user { + continue; + } + + left_state_events.push(pdu.into_format()); + } + } + + Ok(Some(LeftRoom { + account_data: RoomAccountData { events: Vec::new() }, + timeline: Timeline { + // TODO: support left timeline events so we dont need to set limited to true + limited: true, + prev_batch: Some(next_batch.to_string()), + events: Vec::new(), // and so we dont need to set this to empty vec + }, + state: State { events: left_state_events }, + })) +} diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs new file mode 100644 index 00000000..66d9023a --- /dev/null +++ b/src/api/client/sync/v3/mod.rs @@ -0,0 +1,395 @@ +mod joined; +mod left; + +use std::{ + cmp::{self}, + collections::{BTreeMap, HashMap, HashSet}, + time::Duration, +}; + +use axum::extract::State; +use conduwuit::{ + Result, extract_variant, + utils::{ + ReadyExt, TryFutureExtExt, + stream::{BroadbandExt, Tools, WidebandExt}, + }, +}; +use conduwuit_service::Services; +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::{OptionFuture, join3, join4, join5}, +}; +use ruma::{ + DeviceId, OwnedUserId, UserId, + api::client::{ + filter::FilterDefinition, + sync::sync_events::{ + self, DeviceLists, + v3::{ + Filter, GlobalAccountData, InviteState, InvitedRoom, KnockState, KnockedRoom, + Presence, Rooms, ToDevice, + }, + }, + uiaa::UiaaResponse, + }, + events::{ + AnyRawAccountDataEvent, + presence::{PresenceEvent, PresenceEventContent}, + }, + serde::Raw, +}; + +use super::{load_timeline, share_encrypted_room}; +use crate::{ + Ruma, RumaResponse, + client::{ + is_ignored_invite, + sync::v3::{joined::load_joined_room, left::load_left_room}, + }, +}; + +struct DeviceListUpdates { + changed: HashSet, + left: HashSet, +} + +impl DeviceListUpdates { + fn new() -> Self { + Self { + changed: HashSet::new(), + left: HashSet::new(), + } + } + + fn merge(&mut self, other: Self) { + self.changed.extend(other.changed); + self.left.extend(other.left); + } +} + +impl From for DeviceLists { + fn from(val: DeviceListUpdates) -> Self { + Self { + changed: val.changed.into_iter().collect(), + left: val.left.into_iter().collect(), + } + } +} + +#[derive(Clone, Copy)] +struct SyncContext<'a> { + sender_user: &'a UserId, + sender_device: &'a DeviceId, + since: Option, + next_batch: u64, + full_state: bool, + filter: &'a FilterDefinition, +} + +type PresenceUpdates = HashMap; + +/// # `GET /_matrix/client/r0/sync` +/// +/// Synchronize the client's state with the latest state on the server. +/// +/// - This endpoint takes a `since` parameter which should be the `next_batch` +/// value from a previous request for incremental syncs. +/// +/// Calling this endpoint without a `since` parameter returns: +/// - Some of the most recent events of each timeline +/// - Notification counts for each room +/// - Joined and invited member counts, heroes +/// - All state events +/// +/// Calling this endpoint with a `since` parameter from a previous `next_batch` +/// returns: For joined rooms: +/// - Some of the most recent events of each timeline that happened after since +/// - If user joined the room after since: All state events (unless lazy loading +/// is activated) and all device list updates in that room +/// - If the user was already in the room: A list of all events that are in the +/// state now, but were not in the state at `since` +/// - If the state we send contains a member event: Joined and invited member +/// counts, heroes +/// - Device list updates that happened after `since` +/// - If there are events in the timeline we send or the user send updated his +/// read mark: Notification counts +/// - EDUs that are active now (read receipts, typing updates, presence) +/// - TODO: Allow multiple sync streams to support Pantalaimon +/// +/// For invited rooms: +/// - If the user was invited after `since`: A subset of the state of the room +/// at the point of the invite +/// +/// For left rooms: +/// - If the user left after `since`: `prev_batch` token, empty state (TODO: +/// subset of the state at the point of the leave) +#[tracing::instrument( + name = "sync", + level = "debug", + skip_all, + fields( + since = %body.body.since.as_deref().unwrap_or_default(), + ) +)] +pub(crate) async fn sync_events_route( + State(services): State, + body: Ruma, +) -> Result> { + let (sender_user, sender_device) = body.sender(); + + // Presence update + if services.config.allow_local_presence { + services + .presence + .ping_presence(sender_user, &body.body.set_presence) + .await?; + } + + // Setup watchers, so if there's no response, we can wait for them + let watcher = services.sync.watch(sender_user, sender_device); + + let response = build_sync_events(&services, &body).await?; + if body.body.full_state + || !(response.rooms.is_empty() + && response.presence.is_empty() + && response.account_data.is_empty() + && response.device_lists.is_empty() + && response.to_device.is_empty()) + { + return Ok(response); + } + + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let default = Duration::from_secs(30); + let duration = cmp::min(body.body.timeout.unwrap_or(default), default); + _ = tokio::time::timeout(duration, watcher).await; + + // Retry returning data + build_sync_events(&services, &body).await +} + +pub(crate) async fn build_sync_events( + services: &Services, + body: &Ruma, +) -> Result> { + let (sender_user, sender_device) = body.sender(); + + let next_batch = services.globals.current_count()?; + let since = body + .body + .since + .as_ref() + .and_then(|string| string.parse().ok()); + + let full_state = body.body.full_state; + + // FilterDefinition is very large (0x1000 bytes), let's put it on the heap + let filter = Box::new(match body.body.filter.as_ref() { + | None => FilterDefinition::default(), + | Some(Filter::FilterDefinition(filter)) => filter.clone(), + | Some(Filter::FilterId(filter_id)) => services + .users + .get_filter(sender_user, filter_id) + .await + .unwrap_or_default(), + }); + + let context = SyncContext { + sender_user, + sender_device, + since, + next_batch, + full_state, + filter: &filter, + }; + + let joined_rooms = services + .rooms + .state_cache + .rooms_joined(sender_user) + .map(ToOwned::to_owned) + .broad_filter_map(|room_id| { + load_joined_room(services, context, room_id.clone()) + .map_ok(move |(joined_room, updates)| (room_id, joined_room, updates)) + .ok() + }) + .ready_fold( + (BTreeMap::new(), DeviceListUpdates::new()), + |(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| { + all_updates.merge(updates); + + if !joined_room.is_empty() { + joined_rooms.insert(room_id, joined_room); + } + + (joined_rooms, all_updates) + }, + ); + + let left_rooms = services + .rooms + .state_cache + .rooms_left(sender_user) + .broad_filter_map(|(room_id, _)| { + load_left_room(services, context, room_id.clone()) + .map_ok(move |left_room| (room_id, left_room)) + .ok() + }) + .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room))) + .collect(); + + let invited_rooms = services + .rooms + .state_cache + .rooms_invited(sender_user) + .wide_filter_map(async |(room_id, invite_state)| { + if is_ignored_invite(services, sender_user, &room_id).await { + None + } else { + Some((room_id, invite_state)) + } + }) + .fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move { + let invite_count = services + .rooms + .state_cache + .get_invite_count(&room_id, sender_user) + .await + .ok(); + + // only sync this invite if it was sent after the last /sync call + if since < invite_count { + let invited_room = InvitedRoom { + invite_state: InviteState { events: invite_state }, + }; + + invited_rooms.insert(room_id, invited_room); + } + invited_rooms + }); + + let knocked_rooms = services + .rooms + .state_cache + .rooms_knocked(sender_user) + .fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move { + let knock_count = services + .rooms + .state_cache + .get_knock_count(&room_id, sender_user) + .await + .ok(); + + // only sync this knock if it was sent after the last /sync call + if since < knock_count { + let knocked_room = KnockedRoom { + knock_state: KnockState { events: knock_state }, + }; + + knocked_rooms.insert(room_id, knocked_room); + } + knocked_rooms + }); + + let presence_updates: OptionFuture<_> = services + .config + .allow_local_presence + .then(|| process_presence_updates(services, since, sender_user)) + .into(); + + let account_data = services + .account_data + .changes_since(None, sender_user, since, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect(); + + // Look for device list updates of this account + let keys_changed = services + .users + .keys_changed(sender_user, since, Some(next_batch)) + .map(ToOwned::to_owned) + .collect::>(); + + let to_device_events = services + .users + .get_to_device_events(sender_user, sender_device, since, Some(next_batch)) + .collect::>(); + + let device_one_time_keys_count = services + .users + .count_one_time_keys(sender_user, sender_device); + + // Remove all to-device events the device received *last time* + let remove_to_device_events = + services + .users + .remove_to_device_events(sender_user, sender_device, since); + + let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms); + let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); + let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) + .boxed() + .await; + + let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; + let ((), to_device_events, presence_updates) = ephemeral; + let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms; + let (joined_rooms, mut device_list_updates) = joined_rooms; + device_list_updates.changed.extend(keys_changed); + + let response = sync_events::v3::Response { + account_data: GlobalAccountData { events: account_data }, + device_lists: device_list_updates.into(), + device_one_time_keys_count, + // Fallback keys are not yet supported + device_unused_fallback_key_types: None, + next_batch: next_batch.to_string(), + presence: Presence { + events: presence_updates + .into_iter() + .flat_map(IntoIterator::into_iter) + .map(|(sender, content)| PresenceEvent { content, sender }) + .map(|ref event| Raw::new(event)) + .filter_map(Result::ok) + .collect(), + }, + rooms: Rooms { + leave: left_rooms, + join: joined_rooms, + invite: invited_rooms, + knock: knocked_rooms, + }, + to_device: ToDevice { events: to_device_events }, + }; + + Ok(response) +} + +#[tracing::instrument(name = "presence", level = "debug", skip_all)] +async fn process_presence_updates( + services: &Services, + since: Option, + syncing_user: &UserId, +) -> PresenceUpdates { + services + .presence + .presence_since(since.unwrap_or(0)) // send all presences on initial sync + .filter(|(user_id, ..)| { + services + .rooms + .state_cache + .user_sees_user(syncing_user, user_id) + }) + .filter_map(|(user_id, _, presence_bytes)| { + services + .presence + .from_json_bytes_to_event(presence_bytes, user_id) + .map_ok(move |event| (user_id, event)) + .ok() + }) + .map(|(user_id, event)| (user_id.to_owned(), event.content)) + .collect() + .await +}