diff --git a/src/admin/query/account_data.rs b/src/admin/query/account_data.rs index 2e044cef..426a962b 100644 --- a/src/admin/query/account_data.rs +++ b/src/admin/query/account_data.rs @@ -41,7 +41,7 @@ async fn changes_since( let results: Vec<_> = self .services .account_data - .changes_since(room_id.as_deref(), &user_id, since, None) + .changes_since(room_id.as_deref(), &user_id, Some(since), None) .collect() .await; let query_time = timer.elapsed(); diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index d2bd46a0..f371d89e 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -389,7 +389,7 @@ pub(crate) async fn get_key_changes_route( device_list_updates.extend( services .users - .keys_changed(sender_user, from, Some(to)) + .keys_changed(sender_user, Some(from), Some(to)) .map(ToOwned::to_owned) .collect::>() .await, @@ -401,7 +401,7 @@ pub(crate) async fn get_key_changes_route( device_list_updates.extend( services .users - .room_keys_changed(room_id, from, Some(to)) + .room_keys_changed(room_id, Some(from), Some(to)) .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 1ec5ea50..3e6c7336 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -213,7 +213,7 @@ where let receipts = services .rooms .read_receipt - .readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned()); + .readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned())); pin_mut!(receipts); let witness: Witness = events diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 40370160..3b679091 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -3,12 +3,13 @@ mod v4; mod v5; use conduwuit::{ - Error, PduCount, Result, + PduCount, Result, matrix::pdu::PduEvent, + trace, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; use conduwuit_service::Services; -use futures::{StreamExt, pin_mut}; +use futures::StreamExt; use ruma::{ RoomId, UserId, events::TimelineEventType::{ @@ -23,43 +24,75 @@ pub(crate) use self::{ pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] = &[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker]; +#[derive(Default)] +pub(crate) struct TimelinePdus { + pub pdus: Vec<(PduCount, PduEvent)>, + pub limited: bool, +} + async fn load_timeline( services: &Services, sender_user: &UserId, room_id: &RoomId, - roomsincecount: PduCount, - next_batch: Option, + starting_count: Option, + ending_count: Option, limit: usize, -) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { +) -> Result { let last_timeline_count = services .rooms .timeline .last_timeline_count(Some(sender_user), room_id) .await?; - if last_timeline_count <= roomsincecount { - return Ok((Vec::new(), false)); + let mut pdus_between_counts = match starting_count { + | Some(starting_count) => { + if last_timeline_count <= starting_count { + return Ok(TimelinePdus::default()); + } + + // Stream from the DB all PDUs which were sent after `starting_count` but before + // `ending_count`, including both endpoints + services + .rooms + .timeline + .pdus(Some(sender_user), room_id, Some(starting_count)) + .ignore_err() + .ready_take_while(|&(pducount, _)| { + pducount <= ending_count.unwrap_or_else(PduCount::max) + }) + .boxed() + }, + | None => { + // For initial sync, stream from the DB all PDUs before and including + // `ending_count` in reverse order + services + .rooms + .timeline + .pdus_rev(Some(sender_user), room_id, ending_count) + .ignore_err() + .boxed() + }, + }; + + // Return at most `limit` PDUs from the stream + let mut pdus: Vec<_> = pdus_between_counts.by_ref().take(limit).collect().await; + if starting_count.is_none() { + // `pdus_rev` returns PDUs in reverse order. fix that here + pdus.reverse(); } + // The timeline is limited if more than `limit` PDUs exist in the DB after + // `starting_count` + let limited = pdus_between_counts.next().await.is_some(); - let non_timeline_pdus = services - .rooms - .timeline - .pdus_rev(Some(sender_user), room_id, None) - .ignore_err() - .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) - .ready_take_while(|&(pducount, _)| pducount > roomsincecount); + trace!( + "syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})", + pdus.len(), + starting_count, + ending_count, + limited, + ); - // Take the last events for the timeline - pin_mut!(non_timeline_pdus); - let timeline_pdus: Vec<_> = non_timeline_pdus.by_ref().take(limit).collect().await; - - let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect(); - - // They /sync response doesn't always return all messages, so we say the output - // is limited unless there are events in non_timeline_pdus - let limited = non_timeline_pdus.next().await.is_some(); - - Ok((timeline_pdus, limited)) + Ok(TimelinePdus { pdus, limited }) } async fn share_encrypted_room( diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index e897a9e5..cdecb2c2 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -11,13 +11,13 @@ use conduwuit::{ Event, pdu::{EventHash, PduCount, PduEvent}, }, - pair_of, ref_at, + ref_at, result::FlatOk, utils::{ self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::{OptionStream, ReadyEqExt}, math::ruma_from_u64, - stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, + stream::{BroadbandExt, Tools, WidebandExt}, }, warn, }; @@ -30,8 +30,8 @@ use conduwuit_service::{ }, }; use futures::{ - FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, join4, join5, try_join, try_join4}, + FutureExt, StreamExt, TryFutureExt, + future::{OptionFuture, join, join3, join4, join5, try_join4}, pin_mut, }; use ruma::{ @@ -58,21 +58,50 @@ use ruma::{ uint, }; use service::rooms::short::{ShortEventId, ShortStateKey}; +use tracing::trace; use super::{load_timeline, share_encrypted_room}; use crate::{ Ruma, RumaResponse, - client::{ignored_filter, is_ignored_invite}, + client::{TimelinePdus, ignored_filter, is_ignored_invite}, }; -#[derive(Default)] -struct StateChanges { - heroes: Option>, - joined_member_count: Option, - invited_member_count: Option, - state_events: Vec, - device_list_updates: HashSet, - left_encrypted_users: HashSet, +struct DeviceListUpdates { + changed: HashSet, + left: HashSet, +} + +impl DeviceListUpdates { + fn new() -> Self { + DeviceListUpdates { + changed: HashSet::new(), + left: HashSet::new(), + } + } + + fn merge(&mut self, other: DeviceListUpdates) { + self.changed.extend(other.changed); + self.left.extend(other.left); + } +} + +impl Into for DeviceListUpdates { + fn into(self) -> DeviceLists { + DeviceLists { + changed: self.changed.into_iter().collect(), + left: self.left.into_iter().collect(), + } + } +} + +#[derive(Clone, Copy)] +struct SyncContext<'a> { + sender_user: &'a UserId, + sender_device: &'a DeviceId, + since: Option, + next_batch: u64, + full_state: bool, + filter: &'a Box, } type PresenceUpdates = HashMap; @@ -169,11 +198,12 @@ pub(crate) async fn build_sync_events( .body .since .as_ref() - .and_then(|string| string.parse().ok()) - .unwrap_or(0); + .and_then(|string| string.parse().ok()); let full_state = body.body.full_state; - let filter = match body.body.filter.as_ref() { + + // FilterDefinition is very large (0x1000 bytes), let's put it on the heap + let filter = Box::new(match body.body.filter.as_ref() { | None => FilterDefinition::default(), | Some(Filter::FilterDefinition(filter)) => filter.clone(), | Some(Filter::FilterId(filter_id)) => services @@ -181,6 +211,15 @@ pub(crate) async fn build_sync_events( .get_filter(sender_user, filter_id) .await .unwrap_or_default(), + }); + + let context = SyncContext { + sender_user, + sender_device, + since, + next_batch, + full_state, + filter: &filter, }; let joined_rooms = services @@ -189,30 +228,20 @@ pub(crate) async fn build_sync_events( .rooms_joined(sender_user) .map(ToOwned::to_owned) .broad_filter_map(|room_id| { - load_joined_room( - services, - sender_user, - sender_device, - room_id.clone(), - since, - next_batch, - full_state, - &filter, - ) - .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu)) - .ok() + load_joined_room(services, context, room_id.clone()) + .map_ok(move |(joined_room, updates)| (room_id, joined_room, updates)) + .ok() }) .ready_fold( - (BTreeMap::new(), HashSet::new(), HashSet::new()), - |(mut joined_rooms, mut device_list_updates, mut left_encrypted_users), - (room_id, joined_room, dlu, leu)| { - device_list_updates.extend(dlu); - left_encrypted_users.extend(leu); + (BTreeMap::new(), DeviceListUpdates::new()), + |(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| { + all_updates.merge(updates); + if !joined_room.is_empty() { joined_rooms.insert(room_id, joined_room); } - (joined_rooms, device_list_updates, left_encrypted_users) + (joined_rooms, all_updates) }, ); @@ -221,18 +250,9 @@ pub(crate) async fn build_sync_events( .state_cache .rooms_left(sender_user) .broad_filter_map(|(room_id, _)| { - handle_left_room( - services, - since, - room_id.clone(), - sender_user, - next_batch, - full_state, - filter.room.include_leave, - &filter, - ) - .map_ok(move |left_room| (room_id, left_room)) - .ok() + handle_left_room(services, context, room_id.clone()) + .map_ok(move |left_room| (room_id, left_room)) + .ok() }) .ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room))) .collect(); @@ -256,16 +276,14 @@ pub(crate) async fn build_sync_events( .await .ok(); - // Invited before last sync - if Some(since) >= invite_count { - return invited_rooms; + // only sync this invite if it was sent after the last /sync call + if since < invite_count { + let invited_room = InvitedRoom { + invite_state: InviteState { events: invite_state }, + }; + + invited_rooms.insert(room_id, invited_room); } - - let invited_room = InvitedRoom { - invite_state: InviteState { events: invite_state }, - }; - - invited_rooms.insert(room_id, invited_room); invited_rooms }); @@ -281,16 +299,14 @@ pub(crate) async fn build_sync_events( .await .ok(); - // Knocked before last sync - if Some(since) >= knock_count { - return knocked_rooms; + // only sync this knock if it was sent after the last /sync call + if since < knock_count { + let knocked_room = KnockedRoom { + knock_state: KnockState { events: knock_state }, + }; + + knocked_rooms.insert(room_id, knocked_room); } - - let knocked_room = KnockedRoom { - knock_state: KnockState { events: knock_state }, - }; - - knocked_rooms.insert(room_id, knocked_room); knocked_rooms }); @@ -315,7 +331,7 @@ pub(crate) async fn build_sync_events( let to_device_events = services .users - .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch)) + .get_to_device_events(sender_user, sender_device, since, Some(next_batch)) .collect::>(); let device_one_time_keys_count = services @@ -337,29 +353,12 @@ pub(crate) async fn build_sync_events( let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; let ((), to_device_events, presence_updates) = ephemeral; let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms; - let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms; - device_list_updates.extend(keys_changed); - - // If the user doesn't share an encrypted room with the target anymore, we need - // to tell them - let device_list_left: HashSet<_> = left_encrypted_users - .into_iter() - .stream() - .broad_filter_map(|user_id| async move { - share_encrypted_room(services, sender_user, &user_id, None) - .await - .eq(&false) - .then_some(user_id) - }) - .collect() - .await; + let (joined_rooms, mut device_list_updates) = joined_rooms; + device_list_updates.changed.extend(keys_changed); let response = sync_events::v3::Response { account_data: GlobalAccountData { events: account_data }, - device_lists: DeviceLists { - changed: device_list_updates.into_iter().collect(), - left: device_list_left.into_iter().collect(), - }, + device_lists: device_list_updates.into(), device_one_time_keys_count, // Fallback keys are not yet supported device_unused_fallback_key_types: None, @@ -388,12 +387,12 @@ pub(crate) async fn build_sync_events( #[tracing::instrument(name = "presence", level = "debug", skip_all)] async fn process_presence_updates( services: &Services, - since: u64, + since: Option, syncing_user: &UserId, ) -> PresenceUpdates { services .presence - .presence_since(since) + .presence_since(since.unwrap_or(0)) // send all presences on initial sync .filter(|(user_id, ..)| { services .rooms @@ -424,13 +423,15 @@ async fn process_presence_updates( #[allow(clippy::too_many_arguments)] async fn handle_left_room( services: &Services, - since: u64, + SyncContext { + sender_user, + since, + next_batch, + full_state, + filter, + .. + }: SyncContext<'_>, ref room_id: OwnedRoomId, - sender_user: &UserId, - next_batch: u64, - full_state: bool, - include_leave: bool, - filter: &FilterDefinition, ) -> Result> { let left_count = services .rooms @@ -440,7 +441,8 @@ async fn handle_left_room( .ok(); // Left before last sync - if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count { + let include_leave = filter.room.include_leave; + if (since >= left_count && !include_leave) || Some(next_batch) < left_count { return Ok(None); } @@ -489,20 +491,24 @@ async fn handle_left_room( let mut left_state_events = Vec::new(); - let since_shortstatehash = services.rooms.user.get_token_shortstatehash(room_id, since); + let since_state_ids = async { + let since_shortstatehash = services + .rooms + .user + .get_token_shortstatehash(room_id, since?) + .ok() + .await?; - let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash - .map_ok(|since_shortstatehash| { - services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .map(Ok) - }) - .try_flatten_stream() - .try_collect() - .await - .unwrap_or_default(); + services + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .collect::>() + .map(Some) + .await + } + .await + .unwrap_or_default(); let Ok(left_event_id): Result = services .rooms @@ -594,29 +600,38 @@ async fn handle_left_room( #[allow(clippy::too_many_arguments)] async fn load_joined_room( services: &Services, - sender_user: &UserId, - sender_device: &DeviceId, + SyncContext { + sender_user, + sender_device, + since, + next_batch, + full_state, + filter, + .. + }: SyncContext<'_>, ref room_id: OwnedRoomId, - since: u64, - next_batch: u64, - full_state: bool, - filter: &FilterDefinition, -) -> Result<(JoinedRoom, HashSet, HashSet)> { - let sincecount = PduCount::Normal(since); +) -> Result<(JoinedRoom, DeviceListUpdates)> { + let mut device_list_updates = DeviceListUpdates::new(); + let sincecount = since.map(PduCount::Normal); let next_batchcount = PduCount::Normal(next_batch); + // the shortstatehash of the room's state right now let current_shortstatehash = services .rooms .state .get_room_shortstatehash(room_id) .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(room_id, since) - .ok() - .map(Ok); + // the shortstatehash of what the room's state was when the `since` token was + // issued + let since_shortstatehash = OptionFuture::from(since.map(|since| { + services + .rooms + .user + .get_token_shortstatehash(room_id, since) + .ok() + })) + .map(|v| Ok(v.flatten())); let timeline = load_timeline( services, @@ -646,41 +661,23 @@ async fn load_joined_room( .boxed() .await?; - let (timeline_pdus, limited) = timeline; - let initial = since_shortstatehash.is_none(); - let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() - || filter.room.timeline.lazy_load_options.is_enabled(); + let TimelinePdus { pdus: timeline_pdus, limited } = timeline; + let is_initial_sync = since_shortstatehash.is_none(); - let lazy_loading_context = &lazy_loading::Context { - user_id: sender_user, - device_id: Some(sender_device), - room_id, - token: Some(since), - options: Some(&filter.room.state.lazy_load_options), - }; - - // Reset lazy loading because this is an initial sync - let lazy_load_reset: OptionFuture<_> = initial - .then(|| services.rooms.lazy_loading.reset(lazy_loading_context)) - .into(); - - lazy_load_reset.await; - let witness: OptionFuture<_> = lazy_loading_enabled - .then(|| { - let witness: Witness = timeline_pdus - .iter() - .map(ref_at!(1)) - .map(Event::sender) - .map(Into::into) - .chain(receipt_events.keys().map(Into::into)) - .collect(); - - services + let timeline_start_shortstatehash = async { + if let Some((_, pdu)) = timeline_pdus.first() { + if let Ok(shortstatehash) = services .rooms - .lazy_loading - .witness_retain(witness, lazy_loading_context) - }) - .into(); + .state_accessor + .pdu_shortstatehash(&pdu.event_id) + .await + { + return shortstatehash; + } + } + + return current_shortstatehash; + }; let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() @@ -702,8 +699,24 @@ async fn load_joined_room( }) .into(); - let (last_notification_read, since_sender_member, witness) = - join3(last_notification_read, since_sender_member, witness).await; + let is_encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + let ( + last_notification_read, + since_sender_member, + timeline_start_shortstatehash, + is_encrypted_room, + ) = join4( + last_notification_read, + since_sender_member, + timeline_start_shortstatehash, + is_encrypted_room, + ) + .await; let joined_since_last_sync = since_sender_member @@ -712,26 +725,147 @@ async fn load_joined_room( content.membership != MembershipState::Join }); - let StateChanges { - heroes, - joined_member_count, - invited_member_count, - mut state_events, - mut device_list_updates, - left_encrypted_users, - } = calculate_state_changes( - services, - sender_user, + let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled() + || filter.room.timeline.lazy_load_options.is_enabled()) + && !full_state; + + let lazy_loading_context = &lazy_loading::Context { + user_id: sender_user, + device_id: Some(sender_device), room_id, - full_state, - filter, - since_shortstatehash, - current_shortstatehash, - joined_since_last_sync, - witness.as_ref(), - ) - .boxed() - .await?; + token: since, + options: Some(&filter.room.state.lazy_load_options), + }; + + let lazy_loading_witness = OptionFuture::from(lazy_loading_enabled.then(|| { + let witness: Witness = timeline_pdus + .iter() + .map(ref_at!(1)) + .map(Event::sender) + .map(Into::into) + .chain(receipt_events.keys().map(Into::into)) + .collect(); + + services + .rooms + .lazy_loading + .witness_retain(witness, lazy_loading_context) + })) + .await; + + /* replace with multiple steps + glossary for my own sanity: + - full state: every state event from the start of the room to the start of the timeline + - incremental state: state events from `since` to the start of the timeline + - state_events: the `state` key on the JSON object we return + + if initial sync or full_state: + get full state + use full state as state_events + else if TL is limited: + get incremental state + use incremental state as state_events + if encryption is enabled: + use incremental state to extend device list + else: + state_events is empty + compute counts and heroes from state_events + */ + let mut state_events = if is_initial_sync || full_state { + // reset lazy loading state on initial sync + if is_initial_sync { + services + .rooms + .lazy_loading + .reset(lazy_loading_context) + .await; + }; + + let state_initial = calculate_state_initial( + services, + sender_user, + timeline_start_shortstatehash, + lazy_loading_witness.as_ref(), + ) + .boxed() + .await?; + + state_initial + } else if limited { + let state_incremental = calculate_state_incremental( + services, + sender_user, + since_shortstatehash, + timeline_start_shortstatehash, + lazy_loading_witness.as_ref(), + ) + .boxed() + .await?; + + // calculate device list updates for E2EE + if is_encrypted_room { + // add users with changed keys to the `changed` list + services + .users + .room_keys_changed(room_id, since, Some(next_batch)) + .map(at!(0)) + .map(ToOwned::to_owned) + .ready_for_each(|user_id| { + device_list_updates.changed.insert(user_id); + }) + .await; + + // add users who now share encrypted rooms to `changed` and + // users who no longer share encrypted rooms to `left` + for state_event in &state_incremental { + if state_event.kind == RoomMember { + let Some(content): Option = + state_event.get_content().ok() + else { + continue; + }; + + let Some(user_id): Option = state_event + .state_key + .as_ref() + .and_then(|key| key.parse().ok()) + else { + continue; + }; + + use MembershipState::*; + + if matches!(content.membership, Leave | Join) { + let shares_encrypted_room = + share_encrypted_room(services, sender_user, &user_id, Some(room_id)) + .await; + match content.membership { + | Leave if !shares_encrypted_room => { + device_list_updates.left.insert(user_id); + }, + | Join if joined_since_last_sync || shares_encrypted_room => { + device_list_updates.changed.insert(user_id); + }, + | _ => (), + }; + } + } + } + } + + state_incremental + } else { + vec![] + }; + + // only compute room counts and heroes (aka the summary) if the room's members + // changed since the last sync + let (joined_member_count, invited_member_count, heroes) = + if state_events.iter().any(|event| event.kind == RoomMember) { + calculate_counts(services, room_id, sender_user).await? + } else { + (None, None, None) + }; let is_sender_membership = |pdu: &PduEvent| { pdu.kind == StateEventType::RoomMember.into() @@ -753,11 +887,11 @@ async fn load_joined_room( let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| { joined_sender_member .is_some() - .then_some(since) + .and_then(|| since) .map(Into::into) }); - let room_events = timeline_pdus + let timeline_pdus = timeline_pdus .into_iter() .stream() .wide_filter_map(|item| ignored_filter(services, item, sender_user)) @@ -772,15 +906,8 @@ async fn load_joined_room( .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect(); - // Look for device list updates in this room - let device_updates = services - .users - .room_keys_changed(room_id, since, Some(next_batch)) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>(); - - let send_notification_counts = last_notification_read.is_none_or(|count| count > since); + let send_notification_counts = + last_notification_read.is_none_or(|count| since.is_none_or(|since| count > since)); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { @@ -809,7 +936,7 @@ async fn load_joined_room( .typing .last_typing_update(room_id) .and_then(|count| async move { - if count <= since { + if since.is_some_and(|since| count <= since) { return Ok(Vec::>::new()); } @@ -824,22 +951,21 @@ async fn load_joined_room( .unwrap_or(Vec::new()); let unread_notifications = join(notification_count, highlight_count); - let events = join3(room_events, account_data_events, typing_events); - let (unread_notifications, events, device_updates) = - join3(unread_notifications, events, device_updates) - .boxed() - .await; + let events = join3(timeline_pdus, account_data_events, typing_events); + let (unread_notifications, events) = join(unread_notifications, events).boxed().await; let (room_events, account_data_events, typing_events) = events; let (notification_count, highlight_count) = unread_notifications; - device_list_updates.extend(device_updates); - - let last_privateread_update = services - .rooms - .read_receipt - .last_privateread_update(sender_user, room_id) - .await > since; + let last_privateread_update = if let Some(since) = since { + services + .rooms + .read_receipt + .last_privateread_update(sender_user, room_id) + .await > since + } else { + true + }; let private_read_event = if last_privateread_update { services @@ -880,7 +1006,7 @@ async fn load_joined_room( }, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, timeline: Timeline { - limited: limited || joined_since_last_sync, + limited, prev_batch: prev_batch.as_ref().map(ToString::to_string), events: room_events, }, @@ -891,69 +1017,25 @@ async fn load_joined_room( unread_thread_notifications: BTreeMap::new(), }; - Ok((joined_room, device_list_updates, left_encrypted_users)) + Ok((joined_room, device_list_updates)) } +/// Calculate the "initial state", or all events from the start of the room up +/// to (but not including) the `current_shortstatehash`. If +/// `lazy_loading_witness` is `None`, lazy loading will be disabled. #[tracing::instrument( - name = "state", + name = "initial", level = "trace", skip_all, - fields( - full = %full_state, - cs = %current_shortstatehash, - ss = ?since_shortstatehash, - ) + fields(current_shortstatehash) )] #[allow(clippy::too_many_arguments)] -async fn calculate_state_changes( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - full_state: bool, - filter: &FilterDefinition, - since_shortstatehash: Option, - current_shortstatehash: ShortStateHash, - joined_since_last_sync: bool, - witness: Option<&Witness>, -) -> Result { - if since_shortstatehash.is_none() { - calculate_state_initial( - services, - sender_user, - room_id, - full_state, - filter, - current_shortstatehash, - witness, - ) - .await - } else { - calculate_state_incremental( - services, - sender_user, - room_id, - full_state, - filter, - since_shortstatehash, - current_shortstatehash, - joined_since_last_sync, - witness, - ) - .await - } -} - -#[tracing::instrument(name = "initial", level = "trace", skip_all)] -#[allow(clippy::too_many_arguments)] async fn calculate_state_initial( services: &Services, sender_user: &UserId, - room_id: &RoomId, - full_state: bool, - _filter: &FilterDefinition, current_shortstatehash: ShortStateHash, - witness: Option<&Witness>, -) -> Result { + lazy_loading_witness: Option<&Witness>, +) -> Result> { let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services .rooms .state_accessor @@ -961,19 +1043,21 @@ async fn calculate_state_initial( .unzip() .await; - let state_events = services + trace!("event ids for initial sync @ {:?}: {:?}", current_shortstatehash, event_ids); + + services .rooms .short .multi_get_statekey_from_short(shortstatekeys.into_iter().stream()) .zip(event_ids.into_iter().stream()) .ready_filter_map(|item| Some((item.0.ok()?, item.1))) .ready_filter_map(|((event_type, state_key), event_id)| { - let lazy = !full_state - && event_type == StateEventType::RoomMember - && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { - sender_user != user_id - && witness.is_some_and(|witness| !witness.contains(user_id)) - }); + let lazy = lazy_loading_witness.is_some_and(|witness| { + event_type == StateEventType::RoomMember + && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { + sender_user != user_id && !witness.contains(user_id) + }) + }); lazy.or_some(event_id) }) @@ -981,46 +1065,25 @@ async fn calculate_state_initial( services.rooms.timeline.get_pdu(&event_id).await.ok() }) .collect() - .map(Ok); - - let counts = calculate_counts(services, room_id, sender_user); - let ((joined_member_count, invited_member_count, heroes), state_events) = - try_join(counts, state_events).boxed().await?; - - // The state_events above should contain all timeline_users, let's mark them as - // lazy loaded. - - Ok(StateChanges { - heroes, - joined_member_count, - invited_member_count, - state_events, - ..Default::default() - }) + .map(Ok) + .await } +/// Calculate the "incremental state", or all events from the +/// `since_shortstatehash` up to (but not including) +/// the `current_shortstatehash`. If `lazy_loading_witness` is `None`, lazy +/// loading will be disabled. #[tracing::instrument(name = "incremental", level = "trace", skip_all)] #[allow(clippy::too_many_arguments)] async fn calculate_state_incremental<'a>( services: &Services, sender_user: &'a UserId, - room_id: &RoomId, - full_state: bool, - _filter: &FilterDefinition, since_shortstatehash: Option, current_shortstatehash: ShortStateHash, - joined_since_last_sync: bool, - witness: Option<&'a Witness>, -) -> Result { + lazy_loading_witness: Option<&'a Witness>, +) -> Result> { let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); - let encrypted_room = services - .rooms - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok() - .await; - let state_get_shorteventid = |user_id: &'a UserId| { services .rooms @@ -1033,8 +1096,7 @@ async fn calculate_state_incremental<'a>( .ok() }; - let lazy_state_ids: OptionFuture<_> = witness - .filter(|_| !full_state && !encrypted_room) + let lazy_state_ids: OptionFuture<_> = lazy_loading_witness .map(|witness| { StreamExt::into_future( witness @@ -1045,36 +1107,15 @@ async fn calculate_state_incremental<'a>( }) .into(); - let state_diff_ids: OptionFuture<_> = (!full_state) - .then(|| { - StreamExt::into_future( - services - .rooms - .state_accessor - .state_added((since_shortstatehash, current_shortstatehash)) - .boxed(), - ) - }) - .into(); + let state_diff_shortids = services + .rooms + .state_accessor + .state_added((since_shortstatehash, current_shortstatehash)) + .boxed(); - let current_state_ids: OptionFuture<_> = full_state - .then(|| { - StreamExt::into_future( - services - .rooms - .state_accessor - .state_full_shortids(current_shortstatehash) - .expect_ok() - .boxed(), - ) - }) - .into(); - - let state_events = current_state_ids - .stream() - .chain(state_diff_ids.stream()) + state_diff_shortids .broad_filter_map(|(shortstatekey, shorteventid)| async move { - if witness.is_none() || encrypted_room { + if lazy_loading_witness.is_none() { return Some(shorteventid); } @@ -1092,52 +1133,8 @@ async fn calculate_state_incremental<'a>( services.rooms.timeline.get_pdu(&event_id).await.ok() }) .collect::>() - .await; - - let (device_list_updates, left_encrypted_users) = state_events - .iter() - .stream() - .ready_filter(|_| encrypted_room) - .ready_filter(|state_event| state_event.kind == RoomMember) - .ready_filter_map(|state_event| { - let content: RoomMemberEventContent = state_event.get_content().ok()?; - let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?; - - Some((content, user_id)) - }) - .fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move { - use MembershipState::*; - - let shares_encrypted_room = - |user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id)); - - match content.membership { - | Leave => leu.insert(user_id), - | Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await => - dlu.insert(user_id), - | _ => false, - }; - - (dlu, leu) - }) - .await; - - let send_member_count = state_events.iter().any(|event| event.kind == RoomMember); - - let (joined_member_count, invited_member_count, heroes) = if send_member_count { - calculate_counts(services, room_id, sender_user).await? - } else { - (None, None, None) - }; - - Ok(StateChanges { - heroes, - joined_member_count, - invited_member_count, - state_events, - device_list_updates, - left_encrypted_users, - }) + .map(Ok) + .await } async fn lazy_filter( diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index fcf317bd..5ded1a9f 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -40,7 +40,7 @@ use ruma::{ use super::{load_timeline, share_encrypted_room}; use crate::{ Ruma, - client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite}, + client::{DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite}, }; type TodoRooms = BTreeMap, usize, u64)>; @@ -155,7 +155,7 @@ pub(crate) async fn sync_events_v4_route( if body.extensions.account_data.enabled.unwrap_or(false) { account_data.global = services .account_data - .changes_since(None, sender_user, globalsince, Some(next_batch)) + .changes_since(None, sender_user, Some(globalsince), Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect() .await; @@ -166,7 +166,12 @@ pub(crate) async fn sync_events_v4_route( room.clone(), services .account_data - .changes_since(Some(&room), sender_user, globalsince, Some(next_batch)) + .changes_since( + Some(&room), + sender_user, + Some(globalsince), + Some(next_batch), + ) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -180,7 +185,7 @@ pub(crate) async fn sync_events_v4_route( device_list_changes.extend( services .users - .keys_changed(sender_user, globalsince, None) + .keys_changed(sender_user, Some(globalsince), None) .map(ToOwned::to_owned) .collect::>() .await, @@ -318,7 +323,7 @@ pub(crate) async fn sync_events_v4_route( device_list_changes.extend( services .users - .room_keys_changed(room_id, globalsince, None) + .room_keys_changed(room_id, Some(globalsince), None) .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() @@ -514,11 +519,11 @@ pub(crate) async fn sync_events_v4_route( (timeline_pdus, limited) = (Vec::new(), true); } else { - (timeline_pdus, limited) = match load_timeline( + TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline( &services, sender_user, room_id, - roomsincecount, + Some(roomsincecount), None, *timeline_limit, ) @@ -536,7 +541,7 @@ pub(crate) async fn sync_events_v4_route( room_id.to_owned(), services .account_data - .changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch)) + .changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -562,7 +567,7 @@ pub(crate) async fn sync_events_v4_route( let mut vector: Vec> = services .rooms .read_receipt - .readreceipts_since(room_id, *roomsince) + .readreceipts_since(room_id, Some(*roomsince)) .filter_map(|(read_user, _ts, v)| async move { services .users diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index fd2f9ae9..fbebebf2 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -39,7 +39,9 @@ use ruma::{ use super::share_encrypted_room; use crate::{ Ruma, - client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite, sync::load_timeline}, + client::{ + DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite, sync::load_timeline, + }, }; type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request); @@ -411,11 +413,11 @@ where (timeline_pdus, limited) = (Vec::new(), true); } else { - (timeline_pdus, limited) = match load_timeline( + TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline( services, sender_user, room_id, - roomsincecount, + Some(roomsincecount), Some(PduCount::from(next_batch)), *timeline_limit, ) @@ -434,7 +436,7 @@ where room_id.to_owned(), services .account_data - .changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch)) + .changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -460,7 +462,7 @@ where let mut receipts: Vec> = services .rooms .read_receipt - .readreceipts_since(room_id, *roomsince) + .readreceipts_since(room_id, Some(*roomsince)) .filter_map(|(read_user, _ts, v)| async move { services .users @@ -687,7 +689,7 @@ async fn collect_account_data( account_data.global = services .account_data - .changes_since(None, sender_user, globalsince, None) + .changes_since(None, sender_user, Some(globalsince), None) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect() .await; @@ -698,7 +700,7 @@ async fn collect_account_data( room.clone(), services .account_data - .changes_since(Some(room), sender_user, globalsince, None) + .changes_since(Some(room), sender_user, Some(globalsince), None) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -732,7 +734,7 @@ where device_list_changes.extend( services .users - .keys_changed(sender_user, globalsince, None) + .keys_changed(sender_user, Some(globalsince), None) .map(ToOwned::to_owned) .collect::>() .await, @@ -868,7 +870,7 @@ where device_list_changes.extend( services .users - .room_keys_changed(room_id, globalsince, None) + .room_keys_changed(room_id, Some(globalsince), None) .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() diff --git a/src/core/utils/future/try_ext_ext.rs b/src/core/utils/future/try_ext_ext.rs index b2114e56..89fd67bd 100644 --- a/src/core/utils/future/try_ext_ext.rs +++ b/src/core/utils/future/try_ext_ext.rs @@ -36,7 +36,7 @@ where ) -> MapOkOrElse U, impl FnOnce(Self::Error) -> U> where F: FnOnce(Self::Ok) -> U, - Self: Send + Sized; + Self: Sized; fn ok( self, @@ -100,7 +100,7 @@ where ) -> MapOkOrElse U, impl FnOnce(Self::Error) -> U> where F: FnOnce(Self::Ok) -> U, - Self: Send + Sized, + Self: Sized, { self.map_ok_or_else(|_| default, f) } diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 453051be..b432c06d 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -129,13 +129,15 @@ pub fn changes_since<'a>( &'a self, room_id: Option<&'a RoomId>, user_id: &'a UserId, - since: u64, + since: Option, to: Option, ) -> impl Stream + Send + 'a { type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore); // Skip the data that's exactly at since, because we sent that last time - let first_possible = (room_id, user_id, since.saturating_add(1)); + // ...unless this is an initial sync, in which case send everything + let first_possible = + (room_id, user_id, since.map(|since| since.saturating_add(1)).unwrap_or(0)); self.db .roomuserdataid_accountdata diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 64081a2c..df66d942 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -104,16 +104,16 @@ impl Service { Ok(Raw::from_json(event)) } - /// Returns an iterator over the most recent read_receipts in a room that - /// happened after the event with id `since`. + /// Returns an iterator over the most recent read_receipts in a room, + /// optionally after the event with id `since`. #[inline] #[tracing::instrument(skip(self), level = "debug")] pub fn readreceipts_since<'a>( &'a self, room_id: &'a RoomId, - since: u64, + since: Option, ) -> impl Stream> + Send + 'a { - self.db.readreceipts_since(room_id, since) + self.db.readreceipts_since(room_id, since.unwrap_or(0)) } /// Sets a private read marker at PDU `count`. diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index a46ce380..66c26d76 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, ops::Deref, sync::Arc}; use conduwuit::{ - Result, at, err, implement, + Pdu, Result, at, err, implement, matrix::{Event, StateKey}, pair_of, utils::{ @@ -125,7 +125,7 @@ pub async fn state_get( shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, -) -> Result { +) -> Result { self.state_get_id(shortstatehash, event_type, state_key) .and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await) .await diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 33b0c1c3..757e0d07 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -423,7 +423,7 @@ impl Service { let keys_changed = self .services .users - .room_keys_changed(room_id, since.0, None) + .room_keys_changed(room_id, Some(since.0), None) .ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id)); pin_mut!(keys_changed); @@ -520,7 +520,7 @@ impl Service { let receipts = self .services .read_receipt - .readreceipts_since(room_id, since.0); + .readreceipts_since(room_id, Some(since.0)); pin_mut!(receipts); let mut read = BTreeMap::::new(); diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index b74b3885..51ae2ed5 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -790,7 +790,7 @@ impl Service { pub fn keys_changed<'a>( &'a self, user_id: &'a UserId, - from: u64, + from: Option, to: Option, ) -> impl Stream + Send + 'a { self.keys_changed_user_or_room(user_id.as_str(), from, to) @@ -801,7 +801,7 @@ impl Service { pub fn room_keys_changed<'a>( &'a self, room_id: &'a RoomId, - from: u64, + from: Option, to: Option, ) -> impl Stream + Send + 'a { self.keys_changed_user_or_room(room_id.as_str(), from, to) @@ -810,11 +810,12 @@ impl Service { fn keys_changed_user_or_room<'a>( &'a self, user_or_room_id: &'a str, - from: u64, + from: Option, to: Option, ) -> impl Stream + Send + 'a { type KeyVal<'a> = ((&'a str, u64), &'a UserId); + let from = from.unwrap_or(0); let to = to.unwrap_or(u64::MAX); let start = (user_or_room_id, from.saturating_add(1)); self.db