From 1c21e4af6ecec8b9a6e3add6868728f23186e298 Mon Sep 17 00:00:00 2001 From: Ginger Date: Mon, 27 Oct 2025 17:24:02 -0400 Subject: [PATCH] fix: Properly sync left rooms - Remove most usages of `update_membership` in favor of directly calling the `mark_as_*` functions - Store the leave membership event as the value in the `userroomid_leftstate` table - Use the `userroomid_leftstate` table to synchronize the timeline and state for left rooms if possible --- src/api/client/membership/knock.rs | 33 +-- src/api/client/membership/leave.rs | 203 ++++++------- src/api/client/sync/mod.rs | 16 +- src/api/client/sync/v3/joined.rs | 316 ++------------------ src/api/client/sync/v3/left.rs | 372 +++++++++++++++--------- src/api/client/sync/v3/mod.rs | 58 +++- src/api/client/sync/v3/state.rs | 268 +++++++++++++++++ src/api/server/invite.rs | 17 +- src/core/matrix/state_res/event_auth.rs | 5 +- src/service/migrations.rs | 6 +- src/service/rooms/state/mod.rs | 16 +- src/service/rooms/state_cache/mod.rs | 28 +- src/service/rooms/state_cache/update.rs | 61 ++-- src/service/rooms/timeline/append.rs | 25 +- 14 files changed, 759 insertions(+), 665 deletions(-) create mode 100644 src/api/client/sync/v3/state.rs diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index 4cbc6ec1..39db96d0 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -5,7 +5,7 @@ use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Result, debug, debug_info, debug_warn, err, info, matrix::{ - event::{Event, gen_event_id}, + event::gen_event_id, pdu::{PduBuilder, PduEvent}, }, result::FlatOk, @@ -458,7 +458,7 @@ async fn knock_room_helper_local( .await, }; - let send_knock_response = services + services .sending .send_federation_request(&remote_server, send_knock_request) .await?; @@ -477,20 +477,14 @@ async fn knock_room_helper_local( .map_err(|e| err!(BadServerResponse("Invalid knock event PDU: {e:?}")))?; info!("Updating membership locally to knock state with provided stripped state events"); + // TODO: this call does not appear to do anything because `update_membership` + // doesn't call `mark_as_knock`. investigate further, ideally with the aim of + // removing this call entirely -- Ginger thinks `update_membership` should only + // be called from `force_state` and `append_pdu`. services .rooms .state_cache - .update_membership( - room_id, - sender_user, - parsed_knock_pdu - .get_content::() - .expect("we just created this"), - sender_user, - Some(send_knock_response.knock_room_state), - None, - false, - ) + .update_membership(room_id, sender_user, &parsed_knock_pdu, false) .await?; info!("Appending room knock event locally"); @@ -677,20 +671,11 @@ async fn knock_room_helper_remote( .await?; info!("Updating membership locally to knock state with provided stripped state events"); + // TODO: see TODO on the other call to `update_membership` services .rooms .state_cache - .update_membership( - room_id, - sender_user, - parsed_knock_pdu - .get_content::() - .expect("we just created this"), - sender_user, - Some(send_knock_response.knock_room_state), - None, - false, - ) + .update_membership(room_id, sender_user, &parsed_knock_pdu, false) .await?; info!("Appending room knock event locally"); diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index 41ff8157..b439182c 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -2,12 +2,12 @@ use std::collections::HashSet; use axum::extract::State; use conduwuit::{ - Err, Result, debug_info, debug_warn, err, + Err, Pdu, Result, debug_info, debug_warn, err, matrix::{event::gen_event_id, pdu::PduBuilder}, utils::{self, FutureBoolExt, future::ReadyEqExt}, warn, }; -use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut}; +use futures::{FutureExt, StreamExt, pin_mut}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId, api::{ @@ -81,42 +81,9 @@ pub async fn leave_room( room_id: &RoomId, reason: Option, ) -> Result { - let default_member_content = RoomMemberEventContent { - membership: MembershipState::Leave, - reason: reason.clone(), - join_authorized_via_users_server: None, - is_direct: None, - avatar_url: None, - displayname: None, - third_party_invite: None, - blurhash: None, - redact_events: None, - }; - let is_banned = services.rooms.metadata.is_banned(room_id); let is_disabled = services.rooms.metadata.is_disabled(room_id); - pin_mut!(is_banned, is_disabled); - if is_banned.or(is_disabled).await { - // the room is banned/disabled, the room must be rejected locally since we - // cant/dont want to federate with this server - services - .rooms - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - None, - None, - true, - ) - .await?; - - return Ok(()); - } - let dont_have_room = services .rooms .state_cache @@ -129,44 +96,41 @@ pub async fn leave_room( .is_knocked(user_id, room_id) .eq(&false); - // Ask a remote server if we don't have this room and are not knocking on it - if dont_have_room.and(not_knocked).await { - if let Err(e) = - remote_leave_room(services, user_id, room_id, reason.clone(), HashSet::new()) - .boxed() - .await - { - warn!(%user_id, "Failed to leave room {room_id} remotely: {e}"); - // Don't tell the client about this error - } + pin_mut!(is_banned, is_disabled); - let last_state = services - .rooms - .state_cache - .invite_state(user_id, room_id) - .or_else(|_| services.rooms.state_cache.knock_state(user_id, room_id)) - .or_else(|_| services.rooms.state_cache.left_state(user_id, room_id)) + /* + there are three possible cases when leaving a room: + 1. the room is banned or disabled, so we're not federating with it. + 2. nobody on the homeserver is in the room, which can happen if the user is rejecting an invite + to a room that we don't have any members in. + 3. someone else on the homeserver is in the room. in this case we can leave like normal by sending a PDU over federation. + + in cases 1 and 2, we have to update the state cache using `mark_as_left` directly. + otherwise `build_and_append_pdu` will take care of updating the state cache for us. + */ + + // `leave_pdu` is the outlier `m.room.member` event which will be synced to the + // user. if it's None the sync handler will create a dummy PDU. + let leave_pdu = if is_banned.or(is_disabled).await { + // case 1: the room is banned/disabled. we don't want to federate with another + // server to leave, so we can't create an outlier PDU. + None + } else if dont_have_room.and(not_knocked).await { + // case 2: ask a remote server to assist us with leaving + // we always mark the room as left locally, regardless of if the federated leave + // failed + + remote_leave_room(services, user_id, room_id, reason.clone(), HashSet::new()) .await - .ok(); - - // We always drop the invite, we can't rely on other servers - services - .rooms - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - last_state, - None, - true, - ) - .await?; + .inspect_err(|err| { + warn!(%user_id, "Failed to leave room {room_id} remotely: {err}"); + }) + .ok() } else { + // case 3: we can leave by sending a PDU. let state_lock = services.rooms.state.mutex.lock(room_id).await; - let Ok(event) = services + let user_member_event_content = services .rooms .state_accessor .room_state_get_content::( @@ -174,44 +138,61 @@ pub async fn leave_room( &StateEventType::RoomMember, user_id.as_str(), ) - .await - else { - debug_warn!( - "Trying to leave a room you are not a member of, marking room as left locally." - ); + .await; - return services - .rooms - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - None, - None, - true, - ) - .await; - }; + match user_member_event_content { + | Ok(content) => { + services + .rooms + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason, + join_authorized_via_users_server: None, + is_direct: None, + ..content + }), + user_id, + Some(room_id), + &state_lock, + ) + .await?; - services - .rooms - .timeline - .build_and_append_pdu( - PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { - membership: MembershipState::Leave, - reason, - join_authorized_via_users_server: None, - is_direct: None, - ..event - }), - user_id, - Some(room_id), - &state_lock, - ) - .await?; - } + // `build_and_append_pdu` calls `mark_as_left` internally, so we return early. + return Ok(()); + }, + | Err(_) => { + // an exception to case 3 is if the user isn't even in the room they're trying + // to leave. this can happen if the client's caching is wrong. + debug_warn!( + "Trying to leave a room you are not a member of, marking room as left \ + locally." + ); + + // return the existing leave state, if one exists. `mark_as_left` will then + // update the `roomuserid_leftcount` table, making the leave come down sync + // again. + services + .rooms + .state_cache + .left_state(user_id, room_id) + .await + }, + } + }; + + services + .rooms + .state_cache + .mark_as_left(user_id, room_id, leave_pdu) + .await; + + services + .rooms + .state_cache + .update_joined_count(room_id) + .await; Ok(()) } @@ -222,7 +203,7 @@ pub async fn remote_leave_room( room_id: &RoomId, reason: Option, mut servers: HashSet, -) -> Result<()> { +) -> Result { let mut make_leave_response_and_server = Err!(BadServerResponse("No remote server available to assist in leaving {room_id}.")); @@ -393,7 +374,7 @@ pub async fn remote_leave_room( &remote_server, federation::membership::create_leave_event::v2::Request { room_id: room_id.to_owned(), - event_id, + event_id: event_id.clone(), pdu: services .sending .convert_to_outgoing_federation_event(leave_event.clone()) @@ -402,5 +383,13 @@ pub async fn remote_leave_room( ) .await?; - Ok(()) + services + .rooms + .outlier + .add_pdu_outlier(&event_id, &leave_event); + + let leave_pdu = Pdu::from_id_val(&event_id, leave_event) + .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; + + Ok(leave_pdu) } diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 5b853ec2..18041232 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -4,15 +4,15 @@ mod v5; use std::collections::VecDeque; use conduwuit::{ - PduCount, Result, + Event, PduCount, Result, matrix::pdu::PduEvent, - trace, + ref_at, trace, utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; use conduwuit_service::Services; use futures::StreamExt; use ruma::{ - RoomId, UserId, + OwnedUserId, RoomId, UserId, events::TimelineEventType::{ self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker, }, @@ -29,6 +29,16 @@ pub(crate) struct TimelinePdus { pub limited: bool, } +impl TimelinePdus { + fn senders(&self) -> impl Iterator { + self.pdus + .iter() + .map(ref_at!(1)) + .map(Event::sender) + .map(Into::into) + } +} + async fn load_timeline( services: &Services, sender_user: &UserId, diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 3f929951..8a790229 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, - ops::ControlFlow, -}; +use std::collections::{BTreeMap, HashMap}; use conduwuit::{ Result, at, err, extract_variant, is_equal_to, @@ -9,29 +6,20 @@ use conduwuit::{ Event, pdu::{PduCount, PduEvent}, }, - ref_at, result::FlatOk, utils::{ BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::ruma_from_u64, - stream::{BroadbandExt, Tools, TryIgnore, WidebandExt}, - }, -}; -use conduwuit_service::{ - Services, - rooms::{ - lazy_loading, - lazy_loading::{MemberSet, Options}, - short::ShortStateHash, + stream::{Tools, WidebandExt}, }, }; +use conduwuit_service::{Services, rooms::lazy_loading::MemberSet}; use futures::{ FutureExt, StreamExt, TryFutureExt, future::{OptionFuture, join, join3, join4, try_join}, }; -use itertools::Itertools; use ruma::{ - OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + OwnedRoomId, OwnedUserId, RoomId, UserId, api::client::sync::sync_events::{ UnreadNotificationsCount, v3::{Ephemeral, JoinedRoom, RoomAccountData, RoomSummary, State as RoomState, Timeline}, @@ -44,13 +32,14 @@ use ruma::{ serde::Raw, uint, }; -use service::rooms::short::ShortEventId; -use tracing::trace; use super::{load_timeline, share_encrypted_room}; use crate::client::{ - TimelinePdus, ignored_filter, - sync::v3::{DeviceListUpdates, SyncContext}, + ignored_filter, + sync::v3::{ + DeviceListUpdates, SyncContext, + state::{calculate_state_incremental, calculate_state_initial}, + }, }; /// Generate the sync response for a room the user is joined to. @@ -80,11 +69,10 @@ pub(super) async fn load_joined_room( let SyncContext { sender_user, - sender_device, since, next_batch, full_state, - filter, + .. } = sync_context; // the global count as of the end of the last sync. @@ -211,38 +199,23 @@ pub(super) async fn load_joined_room( |content: RoomMemberEventContent| content.membership != MembershipState::Join, ); - // lazy loading is only enabled if the filter allows for it and we aren't - // requesting the full state - let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled() - || filter.room.timeline.lazy_load_options.is_enabled()) - && !full_state; - - let lazy_loading_context = &lazy_loading::Context { - user_id: sender_user, - device_id: Some(sender_device), - room_id, - token: since, - options: Some(&filter.room.state.lazy_load_options), - }; + let lazy_loading_context = &sync_context.lazy_loading_context(room_id); // the user IDs of members whose membership needs to be sent to the client, if // lazy-loading is enabled. - let lazily_loaded_members = OptionFuture::from(lazy_loading_enabled.then(|| { - let witness: MemberSet = timeline - .pdus - .iter() - .map(ref_at!(1)) - .map(Event::sender) - .map(Into::into) - .chain(receipt_events.keys().map(Into::into)) - .collect(); + let lazily_loaded_members = + OptionFuture::from(sync_context.lazy_loading_enabled().then(|| { + let timeline_and_receipt_members: MemberSet = timeline + .senders() + .chain(receipt_events.keys().map(Into::into)) + .collect(); - services - .rooms - .lazy_loading - .retain_lazy_members(witness, lazy_loading_context) - })) - .await; + services + .rooms + .lazy_loading + .retain_lazy_members(timeline_and_receipt_members, lazy_loading_context) + })) + .await; // reset lazy loading state on initial sync if previous_sync_end_count.is_none() { @@ -473,249 +446,6 @@ pub(super) async fn load_joined_room( Ok((joined_room, device_list_updates)) } -/// Calculate the state events to include in an initial sync response. -/// -/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned -/// Vec will include the membership events of exclusively the members in -/// `lazily_loaded_members`. -#[tracing::instrument( - name = "initial", - level = "trace", - skip_all, - fields(current_shortstatehash) -)] -#[allow(clippy::too_many_arguments)] -async fn calculate_state_initial( - services: &Services, - sender_user: &UserId, - timeline_start_shortstatehash: ShortStateHash, - lazily_loaded_members: Option<&MemberSet>, -) -> Result> { - // load the keys and event IDs of the state events at the start of the timeline - let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services - .rooms - .state_accessor - .state_full_ids(timeline_start_shortstatehash) - .unzip() - .await; - - trace!("performing initial sync of {} state events", event_ids.len()); - - services - .rooms - .short - // look up the full state keys - .multi_get_statekey_from_short(shortstatekeys.into_iter().stream()) - .zip(event_ids.into_iter().stream()) - .ready_filter_map(|item| Some((item.0.ok()?, item.1))) - .ready_filter_map(|((event_type, state_key), event_id)| { - if let Some(lazily_loaded_members) = lazily_loaded_members { - /* - if lazy loading is enabled, filter out membership events which aren't for a user - included in `lazily_loaded_members` or for the user requesting the sync. - */ - let event_is_redundant = event_type == StateEventType::RoomMember - && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { - sender_user != user_id && !lazily_loaded_members.contains(user_id) - }); - - event_is_redundant.or_some(event_id) - } else { - Some(event_id) - } - }) - .broad_filter_map(|event_id: OwnedEventId| async move { - services.rooms.timeline.get_pdu(&event_id).await.ok() - }) - .collect() - .map(Ok) - .await -} - -/// Calculate the state events to include in an incremental sync response. -/// -/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned -/// Vec will include the membership events of all the members in -/// `lazily_loaded_members`. -#[tracing::instrument(name = "incremental", level = "trace", skip_all)] -#[allow(clippy::too_many_arguments)] -async fn calculate_state_incremental<'a>( - services: &Services, - sender_user: &'a UserId, - room_id: &RoomId, - previous_sync_end_count: PduCount, - previous_sync_end_shortstatehash: ShortStateHash, - timeline_start_shortstatehash: ShortStateHash, - timeline_end_shortstatehash: ShortStateHash, - timeline: &TimelinePdus, - lazily_loaded_members: Option<&'a MemberSet>, -) -> Result> { - // NB: a limited sync is one where `timeline.limited == true`. Synapse calls - // this a "gappy" sync internally. - - /* - the state events returned from an incremental sync which isn't limited are usually empty. - however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`), - the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state - at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline - merged a split in the DAG. - - see: https://github.com/element-hq/synapse/issues/16941 - */ - - let timeline_is_linear = timeline.pdus.is_empty() || { - let last_pdu_of_last_sync = services - .rooms - .timeline - .pdus_rev(Some(sender_user), room_id, Some(previous_sync_end_count.saturating_add(1))) - .boxed() - .next() - .await - .transpose() - .expect("last sync should have had some PDUs") - .map(at!(1)); - - // make sure the prev_events of each pdu in the timeline refer only to the - // previous pdu - timeline - .pdus - .iter() - .try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| { - if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() { - if prev_event_id - .as_ref() - .is_none_or(is_equal_to!(pdu_prev_event_id)) - { - return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned())); - } - } - - trace!( - "pdu {:?} has split prev_events (expected {:?}): {:?}", - pdu.event_id, prev_event_id, pdu.prev_events - ); - ControlFlow::Break(()) - }) - .is_continue() - }; - - if timeline_is_linear && !timeline.limited { - // if there are no splits in the DAG and the timeline isn't limited, then - // `state` will always be empty unless lazy loading is enabled. - - if let Some(lazily_loaded_members) = lazily_loaded_members - && !timeline.pdus.is_empty() - { - // lazy loading is enabled, so we return the membership events which were - // requested by the caller. - let lazy_membership_events: Vec<_> = lazily_loaded_members - .iter() - .stream() - .broad_filter_map(|user_id| async move { - if user_id == sender_user { - return None; - } - - services - .rooms - .state_accessor - .state_get( - timeline_start_shortstatehash, - &StateEventType::RoomMember, - user_id.as_str(), - ) - .ok() - .await - }) - .collect() - .await; - - if !lazy_membership_events.is_empty() { - trace!( - "syncing lazy membership events for members: {:?}", - lazy_membership_events - .iter() - .map(|pdu| pdu.state_key().unwrap()) - ); - } - return Ok(lazy_membership_events); - } - - // lazy loading is disabled, `state` is empty. - return Ok(vec![]); - } - - /* - at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates - computing the incremental state (which may be empty). - - NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering - if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon - enough that the performance penalty is acceptable. - */ - - trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync"); - - // fetch the shorteventids of state events in the timeline - let state_events_in_timeline: BTreeSet = services - .rooms - .short - .multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| { - if pdu.state_key().is_some() { - Some(pdu.event_id.as_ref()) - } else { - None - } - })) - .collect() - .await; - - trace!("{} state events in timeline", state_events_in_timeline.len()); - - /* - fetch the state events which were added since the last sync. - - specifically we fetch the difference between the state at the last sync and the state at the _end_ - of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched. - this is necessary to account for splits in the DAG, as explained above. - */ - let state_diff = services - .rooms - .short - .multi_get_eventid_from_short::<'_, OwnedEventId, _>( - services - .rooms - .state_accessor - .state_added((previous_sync_end_shortstatehash, timeline_end_shortstatehash)) - .await? - .stream() - .ready_filter_map(|(_, shorteventid)| { - if state_events_in_timeline.contains(&shorteventid) { - None - } else { - Some(shorteventid) - } - }), - ) - .ignore_err(); - - // finally, fetch the PDU contents and collect them into a vec - let state_diff_pdus = state_diff - .broad_filter_map(|event_id| async move { - services - .rooms - .timeline - .get_non_outlier_pdu(&event_id) - .await - .ok() - }) - .collect::>() - .await; - - trace!(?state_diff_pdus, "collected state PDUs for incremental sync"); - Ok(state_diff_pdus) -} - async fn calculate_device_list_updates( services: &Services, SyncContext { sender_user, since, next_batch, .. }: SyncContext<'_>, diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs index 4e6378b4..30986ab7 100644 --- a/src/api/client/sync/v3/left.rs +++ b/src/api/client/sync/v3/left.rs @@ -1,21 +1,29 @@ -use std::collections::HashMap; - use conduwuit::{ - Event, PduEvent, Result, error, + Event, PduCount, PduEvent, Result, at, debug_warn, pdu::EventHash, - utils::{self, FutureBoolExt, TryFutureExtExt, future::ReadyEqExt}, - warn, + trace, + utils::{self, IterStream, future::ReadyEqExt, stream::WidebandExt as _}, }; -use futures::{FutureExt, StreamExt, pin_mut}; +use futures::{StreamExt, future::join}; use ruma::{ - EventId, OwnedEventId, OwnedRoomId, UserId, + EventId, OwnedRoomId, RoomId, api::client::sync::sync_events::v3::{LeftRoom, RoomAccountData, State, Timeline}, - events::{StateEventType, TimelineEventType::*}, + events::{ + StateEventType, TimelineEventType, + room::member::{MembershipChange, RoomMemberEventContent}, + }, uint, }; -use service::{Services, rooms::lazy_loading::Options}; +use serde_json::value::RawValue; +use service::Services; -use crate::client::sync::v3::SyncContext; +use crate::client::{ + TimelinePdus, ignored_filter, + sync::{ + load_timeline, + v3::{SyncContext, prepare_lazily_loaded_members, state::calculate_state_initial}, + }, +}; #[tracing::instrument( name = "left", @@ -23,174 +31,248 @@ use crate::client::sync::v3::SyncContext; skip_all, fields( room_id = %room_id, - full = %full_state, ), )] #[allow(clippy::too_many_arguments)] pub(super) async fn load_left_room( services: &Services, - SyncContext { - sender_user, - since, - next_batch, - full_state, - filter, - .. - }: SyncContext<'_>, + sync_context: SyncContext<'_>, ref room_id: OwnedRoomId, + leave_pdu: Option, ) -> Result> { - let left_count = services + let SyncContext { + sender_user, since, next_batch, filter, .. + } = sync_context; + + // the global count as of the moment the user left the room + let Some(left_count) = services .rooms .state_cache .get_left_count(room_id, sender_user) .await - .ok(); + .ok() + else { + // if we get here, the membership cache is incorrect, likely due to a state + // reset + debug_warn!("attempting to sync left room but no left count exists"); + return Ok(None); + }; - // Left before last sync let include_leave = filter.room.include_leave; - if (since >= left_count && !include_leave) || Some(next_batch) < left_count { + + // return early if we haven't gotten to this leave yet. + // this can happen if the user leaves while a sync response is being generated + if next_batch < left_count { return Ok(None); } - let is_not_found = services.rooms.metadata.exists(room_id).eq(&false); - - let is_disabled = services.rooms.metadata.is_disabled(room_id); - - let is_banned = services.rooms.metadata.is_banned(room_id); - - pin_mut!(is_not_found, is_disabled, is_banned); - if is_not_found.or(is_disabled).or(is_banned).await { - // This is just a rejected invite, not a room we know - // Insert a leave event anyways for the client - let event = PduEvent { - event_id: EventId::new(services.globals.server_name()), - sender: sender_user.to_owned(), - origin: None, - origin_server_ts: utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - kind: RoomMember, - content: serde_json::from_str(r#"{"membership":"leave"}"#) - .expect("this is valid JSON"), - state_key: Some(sender_user.as_str().into()), - unsigned: None, - // The following keys are dropped on conversion - room_id: Some(room_id.clone()), - prev_events: vec![], - depth: uint!(1), - auth_events: vec![], - redacts: None, - hashes: EventHash { sha256: String::new() }, - signatures: None, - }; - - return Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, - timeline: Timeline { - limited: false, - prev_batch: Some(next_batch.to_string()), - events: Vec::new(), - }, - state: State { events: vec![event.into_format()] }, - })); + // return early if this is an incremental sync, and we've already synced this + // leave to the user, and `include_leave` isn't set on the filter. + if !include_leave && since.is_some_and(|since| since >= left_count) { + return Ok(None); } - let mut left_state_events = Vec::new(); - - let since_state_ids = async { - let since_shortstatehash = services - .rooms - .user - .get_token_shortstatehash(room_id, since?) - .ok() - .await?; - - services - .rooms - .state_accessor - .state_full_ids(since_shortstatehash) - .collect::>() - .map(Some) - .await + if let Some(ref leave_pdu) = leave_pdu { + debug_assert_eq!(leave_pdu.kind, TimelineEventType::RoomMember); } - .await - .unwrap_or_default(); - let Ok(left_event_id): Result = services - .rooms - .state_accessor - .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) - .await - else { - warn!("Left {room_id} but no left state event"); - return Ok(None); - }; + let does_not_exist = services.rooms.metadata.exists(room_id).eq(&false).await; - let Ok(left_shortstatehash) = services - .rooms - .state_accessor - .pdu_shortstatehash(&left_event_id) - .await - else { - warn!(event_id = %left_event_id, "Leave event has no state in {room_id}"); - return Ok(None); - }; + let (timeline, state_events) = match leave_pdu { + | Some(leave_pdu) if does_not_exist => { + /* + we have none PDUs with left beef for this room, likely because it was a rejected invite to a room + which nobody on this homeserver is in. `leave_pdu` is the remote-assisted outlier leave event for the room, + which is all we can send to the client. + */ + trace!("syncing remote-assisted leave PDU"); + (TimelinePdus::default(), vec![leave_pdu]) + }, + | Some(leave_pdu) => { + // we have this room in our DB, and can fetch the state and timeline from when + // the user left if they're allowed to see it. - let mut left_state_ids: HashMap<_, _> = services - .rooms - .state_accessor - .state_full_ids(left_shortstatehash) - .collect() - .await; + let leave_state_key = sender_user; + debug_assert_eq!(Some(leave_state_key.as_str()), leave_pdu.state_key()); - let leave_shortstatekey = services - .rooms - .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str()) - .await; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - for (shortstatekey, event_id) in left_state_ids { - if full_state || since_state_ids.get(&shortstatekey) != Some(&event_id) { - let (event_type, state_key) = services + let leave_shortstatehash = services .rooms - .short - .get_statekey_from_short(shortstatekey) + .state_accessor + .pdu_shortstatehash(&leave_pdu.event_id) .await?; - if filter.room.state.lazy_load_options.is_enabled() - && event_type == StateEventType::RoomMember - && !full_state - && state_key - .as_str() - .try_into() - .is_ok_and(|user_id: &UserId| sender_user != user_id) - { - continue; + let prev_member_event = services + .rooms + .state_accessor + .state_get( + leave_shortstatehash, + &StateEventType::RoomMember, + leave_state_key.as_str(), + ) + .await?; + let current_membership: RoomMemberEventContent = leave_pdu.get_content()?; + let prev_membership: RoomMemberEventContent = prev_member_event.get_content()?; + + match current_membership.membership_change( + Some(prev_membership.details()), + &leave_pdu.sender, + leave_state_key, + ) { + | MembershipChange::Left => { + // if the user went from `join` to `leave`, they should be able to view the + // timeline. + + let timeline_start_count = if let Some(since) = since { + // for incremental syncs, start the timeline after `since` + PduCount::Normal(since) + } else { + // for initial syncs, start the timeline at the previous membership event + services + .rooms + .timeline + .get_pdu_count(&prev_member_event.event_id) + .await? + .saturating_sub(1) + }; + let timeline_end_count = services + .rooms + .timeline + .get_pdu_count(leave_pdu.event_id()) + .await?; + + let timeline = load_timeline( + services, + sender_user, + room_id, + Some(timeline_start_count), + Some(timeline_end_count), + 10_usize, + ) + .await?; + + let timeline_start_shortstatehash = async { + if let Some((_, pdu)) = timeline.pdus.front() { + if let Ok(shortstatehash) = services + .rooms + .state_accessor + .pdu_shortstatehash(&pdu.event_id) + .await + { + return shortstatehash; + } + } + + leave_shortstatehash + }; + + let lazily_loaded_members = prepare_lazily_loaded_members( + services, + sync_context, + room_id, + timeline.senders(), + ); + + let (timeline_start_shortstatehash, lazily_loaded_members) = + join(timeline_start_shortstatehash, lazily_loaded_members).await; + + // TODO: calculate incremental state for incremental syncs. + // always calculating initial state _works_ but returns more data and does + // more processing than strictly necessary. + let state = calculate_state_initial( + services, + sender_user, + timeline_start_shortstatehash, + lazily_loaded_members.as_ref(), + ) + .await?; + + trace!( + ?timeline_start_count, + ?timeline_end_count, + "syncing {} timeline events (limited = {}) and {} state events", + timeline.pdus.len(), + timeline.limited, + state.len() + ); + + (timeline, state) + }, + | other_membership => { + // otherwise, the user should not be able to view the timeline. + // only return their leave event. + trace!( + ?other_membership, + "user did not leave happily, only syncing leave event" + ); + (TimelinePdus::default(), vec![leave_pdu]) + }, } + }, + | None => { + /* + no leave event was actually sent in this room, but we still need to pretend + like the user left it. this is usually because the room was banned by a server admin. + generate a fake leave event to placate the client. + */ + trace!("syncing dummy leave event"); + (TimelinePdus::default(), vec![create_dummy_leave_event( + services, + sync_context, + room_id, + )]) + }, + }; - let Ok(pdu) = services.rooms.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - if !include_leave && pdu.sender == sender_user { - continue; - } - - left_state_events.push(pdu.into_format()); - } - } + let raw_timeline_pdus = timeline + .pdus + .into_iter() + .stream() + // filter out ignored events from the timeline + .wide_filter_map(|item| ignored_filter(services, item, sender_user)) + .map(at!(1)) + .map(Event::into_format) + .collect::>() + .await; Ok(Some(LeftRoom { account_data: RoomAccountData { events: Vec::new() }, timeline: Timeline { - // TODO: support left timeline events so we dont need to set limited to true - limited: true, + limited: timeline.limited, prev_batch: Some(next_batch.to_string()), - events: Vec::new(), // and so we dont need to set this to empty vec + events: raw_timeline_pdus, + }, + state: State { + events: state_events.into_iter().map(Event::into_format).collect(), }, - state: State { events: left_state_events }, })) } + +fn create_dummy_leave_event( + services: &Services, + SyncContext { sender_user, .. }: SyncContext<'_>, + room_id: &RoomId, +) -> PduEvent { + // TODO: because this event ID is random, it could cause caching issues with + // clients. perhaps a database table could be created to hold these dummy + // events, or they could be stored as outliers? + PduEvent { + event_id: EventId::new(services.globals.server_name()), + sender: sender_user.to_owned(), + origin: None, + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + kind: TimelineEventType::RoomMember, + content: RawValue::from_string(r#"{"membership": "leave"}"#.to_owned()).unwrap(), + state_key: Some(sender_user.as_str().into()), + unsigned: None, + // The following keys are dropped on conversion + room_id: Some(room_id.to_owned()), + prev_events: vec![], + depth: uint!(1), + auth_events: vec![], + redacts: None, + hashes: EventHash { sha256: String::new() }, + signatures: None, + } +} diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index 44c57fd5..4814c8b3 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -1,5 +1,6 @@ mod joined; mod left; +mod state; use std::{ cmp::{self}, @@ -22,7 +23,7 @@ use futures::{ future::{OptionFuture, join3, join4, join5}, }; use ruma::{ - DeviceId, OwnedUserId, UserId, + DeviceId, OwnedUserId, RoomId, UserId, api::client::{ filter::FilterDefinition, sync::sync_events::{ @@ -40,6 +41,7 @@ use ruma::{ }, serde::Raw, }; +use service::rooms::lazy_loading::{self, MemberSet, Options as _}; use super::{load_timeline, share_encrypted_room}; use crate::{ @@ -88,6 +90,25 @@ struct SyncContext<'a> { filter: &'a FilterDefinition, } +impl<'a> SyncContext<'a> { + fn lazy_loading_context(&self, room_id: &'a RoomId) -> lazy_loading::Context<'a> { + lazy_loading::Context { + user_id: self.sender_user, + device_id: Some(self.sender_device), + room_id, + token: self.since, + options: Some(&self.filter.room.state.lazy_load_options), + } + } + + #[inline] + fn lazy_loading_enabled(&self) -> bool { + (self.filter.room.state.lazy_load_options.is_enabled() + || self.filter.room.timeline.lazy_load_options.is_enabled()) + && !self.full_state + } +} + type PresenceUpdates = HashMap; /// # `GET /_matrix/client/r0/sync` @@ -239,8 +260,8 @@ pub(crate) async fn build_sync_events( .rooms .state_cache .rooms_left(sender_user) - .broad_filter_map(|(room_id, _)| { - load_left_room(services, context, room_id.clone()) + .broad_filter_map(|(room_id, leave_pdu)| { + load_left_room(services, context, room_id.clone(), leave_pdu) .map_ok(move |left_room| (room_id, left_room)) .ok() }) @@ -400,3 +421,34 @@ async fn process_presence_updates( .collect() .await } + +async fn prepare_lazily_loaded_members( + services: &Services, + sync_context: SyncContext<'_>, + room_id: &RoomId, + timeline_members: impl Iterator, +) -> Option { + let lazy_loading_context = &sync_context.lazy_loading_context(room_id); + + // the user IDs of members whose membership needs to be sent to the client, if + // lazy-loading is enabled. + let lazily_loaded_members = + OptionFuture::from(sync_context.lazy_loading_enabled().then(|| { + services + .rooms + .lazy_loading + .retain_lazy_members(timeline_members.collect(), lazy_loading_context) + })) + .await; + + // reset lazy loading state on initial sync + if sync_context.since.is_none() { + services + .rooms + .lazy_loading + .reset(lazy_loading_context) + .await; + } + + lazily_loaded_members +} diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs new file mode 100644 index 00000000..53574ca8 --- /dev/null +++ b/src/api/client/sync/v3/state.rs @@ -0,0 +1,268 @@ +use std::{collections::BTreeSet, ops::ControlFlow}; + +use conduwuit::{ + Result, at, is_equal_to, + matrix::{ + Event, + pdu::{PduCount, PduEvent}, + }, + utils::{ + BoolExt, IterStream, ReadyExt, TryFutureExtExt, + stream::{BroadbandExt, TryIgnore}, + }, +}; +use conduwuit_service::{ + Services, + rooms::{lazy_loading::MemberSet, short::ShortStateHash}, +}; +use futures::{FutureExt, StreamExt}; +use itertools::Itertools; +use ruma::{OwnedEventId, RoomId, UserId, events::StateEventType}; +use service::rooms::short::ShortEventId; +use tracing::trace; + +use crate::client::TimelinePdus; + +/// Calculate the state events to include in an initial sync response. +/// +/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned +/// Vec will include the membership events of exclusively the members in +/// `lazily_loaded_members`. +#[tracing::instrument( + name = "initial", + level = "trace", + skip_all, + fields(current_shortstatehash) +)] +#[allow(clippy::too_many_arguments)] +pub(super) async fn calculate_state_initial( + services: &Services, + sender_user: &UserId, + timeline_start_shortstatehash: ShortStateHash, + lazily_loaded_members: Option<&MemberSet>, +) -> Result> { + // load the keys and event IDs of the state events at the start of the timeline + let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services + .rooms + .state_accessor + .state_full_ids(timeline_start_shortstatehash) + .unzip() + .await; + + trace!("performing initial sync of {} state events", event_ids.len()); + + services + .rooms + .short + // look up the full state keys + .multi_get_statekey_from_short(shortstatekeys.into_iter().stream()) + .zip(event_ids.into_iter().stream()) + .ready_filter_map(|item| Some((item.0.ok()?, item.1))) + .ready_filter_map(|((event_type, state_key), event_id)| { + if let Some(lazily_loaded_members) = lazily_loaded_members { + /* + if lazy loading is enabled, filter out membership events which aren't for a user + included in `lazily_loaded_members` or for the user requesting the sync. + */ + let event_is_redundant = event_type == StateEventType::RoomMember + && state_key.as_str().try_into().is_ok_and(|user_id: &UserId| { + sender_user != user_id && !lazily_loaded_members.contains(user_id) + }); + + event_is_redundant.or_some(event_id) + } else { + Some(event_id) + } + }) + .broad_filter_map(|event_id: OwnedEventId| async move { + services.rooms.timeline.get_pdu(&event_id).await.ok() + }) + .collect() + .map(Ok) + .await +} + +/// Calculate the state events to include in an incremental sync response. +/// +/// If lazy-loading is enabled (`lazily_loaded_members` is Some), the returned +/// Vec will include the membership events of all the members in +/// `lazily_loaded_members`. +#[tracing::instrument(name = "incremental", level = "trace", skip_all)] +#[allow(clippy::too_many_arguments)] +pub(super) async fn calculate_state_incremental<'a>( + services: &Services, + sender_user: &'a UserId, + room_id: &RoomId, + previous_sync_end_count: PduCount, + previous_sync_end_shortstatehash: ShortStateHash, + timeline_start_shortstatehash: ShortStateHash, + timeline_end_shortstatehash: ShortStateHash, + timeline: &TimelinePdus, + lazily_loaded_members: Option<&'a MemberSet>, +) -> Result> { + // NB: a limited sync is one where `timeline.limited == true`. Synapse calls + // this a "gappy" sync internally. + + /* + the state events returned from an incremental sync which isn't limited are usually empty. + however, if an event in the timeline (`timeline.pdus`) merges a split in the room's DAG (i.e. has multiple `prev_events`), + the state at the _end_ of the timeline may include state events which were merged in and don't exist in the state + at the _start_ of the timeline. because this is uncommon, we check here to see if any events in the timeline + merged a split in the DAG. + + see: https://github.com/element-hq/synapse/issues/16941 + */ + + let timeline_is_linear = timeline.pdus.is_empty() || { + let last_pdu_of_last_sync = services + .rooms + .timeline + .pdus_rev(Some(sender_user), room_id, Some(previous_sync_end_count.saturating_add(1))) + .boxed() + .next() + .await + .transpose() + .expect("last sync should have had some PDUs") + .map(at!(1)); + + // make sure the prev_events of each pdu in the timeline refer only to the + // previous pdu + timeline + .pdus + .iter() + .try_fold(last_pdu_of_last_sync.map(|pdu| pdu.event_id), |prev_event_id, (_, pdu)| { + if let Ok(pdu_prev_event_id) = pdu.prev_events.iter().exactly_one() { + if prev_event_id + .as_ref() + .is_none_or(is_equal_to!(pdu_prev_event_id)) + { + return ControlFlow::Continue(Some(pdu_prev_event_id.to_owned())); + } + } + + trace!( + "pdu {:?} has split prev_events (expected {:?}): {:?}", + pdu.event_id, prev_event_id, pdu.prev_events + ); + ControlFlow::Break(()) + }) + .is_continue() + }; + + if timeline_is_linear && !timeline.limited { + // if there are no splits in the DAG and the timeline isn't limited, then + // `state` will always be empty unless lazy loading is enabled. + + if let Some(lazily_loaded_members) = lazily_loaded_members + && !timeline.pdus.is_empty() + { + // lazy loading is enabled, so we return the membership events which were + // requested by the caller. + let lazy_membership_events: Vec<_> = lazily_loaded_members + .iter() + .stream() + .broad_filter_map(|user_id| async move { + if user_id == sender_user { + return None; + } + + services + .rooms + .state_accessor + .state_get( + timeline_start_shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + ) + .ok() + .await + }) + .collect() + .await; + + if !lazy_membership_events.is_empty() { + trace!( + "syncing lazy membership events for members: {:?}", + lazy_membership_events + .iter() + .map(|pdu| pdu.state_key().unwrap()) + .collect::>() + ); + } + return Ok(lazy_membership_events); + } + + // lazy loading is disabled, `state` is empty. + return Ok(vec![]); + } + + /* + at this point, either the timeline is `limited` or the DAG has a split in it. this necessitates + computing the incremental state (which may be empty). + + NOTE: this code path does not apply lazy-load filtering to membership state events. the spec forbids lazy-load filtering + if the timeline is `limited`, and DAG splits which require sending extra membership state events are (probably) uncommon + enough that the performance penalty is acceptable. + */ + + trace!(?timeline_is_linear, ?timeline.limited, "computing state for incremental sync"); + + // fetch the shorteventids of state events in the timeline + let state_events_in_timeline: BTreeSet = services + .rooms + .short + .multi_get_or_create_shorteventid(timeline.pdus.iter().filter_map(|(_, pdu)| { + if pdu.state_key().is_some() { + Some(pdu.event_id.as_ref()) + } else { + None + } + })) + .collect() + .await; + + trace!("{} state events in timeline", state_events_in_timeline.len()); + + /* + fetch the state events which were added since the last sync. + + specifically we fetch the difference between the state at the last sync and the state at the _end_ + of the timeline, and then we filter out state events in the timeline itself using the shorteventids we fetched. + this is necessary to account for splits in the DAG, as explained above. + */ + let state_diff = services + .rooms + .short + .multi_get_eventid_from_short::<'_, OwnedEventId, _>( + services + .rooms + .state_accessor + .state_added((previous_sync_end_shortstatehash, timeline_end_shortstatehash)) + .await? + .stream() + .ready_filter_map(|(_, shorteventid)| { + if state_events_in_timeline.contains(&shorteventid) { + None + } else { + Some(shorteventid) + } + }), + ) + .ignore_err(); + + // finally, fetch the PDU contents and collect them into a vec + let state_diff_pdus = state_diff + .broad_filter_map(|event_id| async move { + services + .rooms + .timeline + .get_non_outlier_pdu(&event_id) + .await + .ok() + }) + .collect::>() + .await; + + trace!(?state_diff_pdus, "collected state PDUs for incremental sync"); + Ok(state_diff_pdus) +} diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 78a65fe8..1c12aa8d 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -10,7 +10,6 @@ use conduwuit::{ use ruma::{ CanonicalJsonValue, OwnedUserId, UserId, api::{client::error::ErrorKind, federation::membership::create_invite}, - events::room::member::{MembershipState, RoomMemberEventContent}, serde::JsonObject, }; @@ -133,16 +132,20 @@ pub(crate) async fn create_invite_route( services .rooms .state_cache - .update_membership( - &body.room_id, + .mark_as_invited( &recipient_user, - RoomMemberEventContent::new(MembershipState::Invite), - sender_user, + &body.room_id, + &sender_user, Some(invite_state), body.via.clone(), - true, ) - .await?; + .await; + + services + .rooms + .state_cache + .update_joined_count(&body.room_id) + .await; for appservice in services.appservice.read().await.values() { if appservice.is_user_match(&recipient_user) { diff --git a/src/core/matrix/state_res/event_auth.rs b/src/core/matrix/state_res/event_auth.rs index 7f6c2b98..f99d4a5e 100644 --- a/src/core/matrix/state_res/event_auth.rs +++ b/src/core/matrix/state_res/event_auth.rs @@ -908,7 +908,7 @@ where false } }, - | JoinRule::Restricted(_) => + | JoinRule::Restricted(_) => { if membership_allows_join || user_for_join_auth_is_valid { trace!( %sender, @@ -928,7 +928,8 @@ where valid authorising user given to permit the join" ); false - }, + } + }, | JoinRule::Public => { trace!(%sender, "join rule is public, allowing join"); true diff --git a/src/service/migrations.rs b/src/service/migrations.rs index 42a622c2..2f462d31 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -456,7 +456,11 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) for user_id in &non_joined_members { debug_info!("User is left or banned, marking as left"); - services.rooms.state_cache.mark_as_left(user_id, room_id); + services + .rooms + .state_cache + .mark_as_left(user_id, room_id, None) + .await; } } diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 07d51dc0..554dae84 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -20,7 +20,7 @@ use ruma::{ EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId, events::{ AnyStrippedStateEvent, StateEventType, TimelineEventType, - room::{create::RoomCreateEventContent, member::RoomMemberEventContent}, + room::create::RoomCreateEventContent, }, serde::Raw, }; @@ -126,21 +126,9 @@ impl Service { continue; }; - let Ok(membership_event) = pdu.get_content::() else { - continue; - }; - self.services .state_cache - .update_membership( - room_id, - user_id, - membership_event, - &pdu.sender, - None, - None, - false, - ) + .update_membership(room_id, user_id, &pdu, false) .await?; }, | TimelineEventType::SpaceChild => { diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 6874cc80..eaf1aeae 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -4,7 +4,7 @@ mod via; use std::{collections::HashMap, sync::Arc}; use conduwuit::{ - Result, SyncRwLock, implement, + Pdu, Result, SyncRwLock, implement, result::LogErr, utils::{ReadyExt, stream::TryIgnore}, warn, @@ -13,7 +13,7 @@ use database::{Deserialized, Ignore, Interfix, Map}; use futures::{Stream, StreamExt, future::join5, pin_mut}; use ruma::{ OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, - events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState}, + events::{AnyStrippedStateEvent, room::member::MembershipState}, serde::Raw, }; @@ -54,7 +54,6 @@ struct Data { type AppServiceInRoomCache = SyncRwLock>>; type StrippedStateEventItem = (OwnedRoomId, Vec>); -type SyncStateEventItem = (OwnedRoomId, Vec>); impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { @@ -431,18 +430,16 @@ pub async fn knock_state( #[implement(Service)] #[tracing::instrument(skip(self), level = "trace")] -pub async fn left_state( - &self, - user_id: &UserId, - room_id: &RoomId, -) -> Result>> { +pub async fn left_state(&self, user_id: &UserId, room_id: &RoomId) -> Option { let key = (user_id, room_id); self.db .userroomid_leftstate .qry(&key) .await .deserialized() - .and_then(|val: Raw>| val.deserialize_as().map_err(Into::into)) + // old databases may have garbage data as values in the `userroomid_leftstate` table from before + // the leave event was stored there. they still need to be included, so we return Ok(None) for deserialization failures. + .unwrap_or(None) } /// Returns an iterator over all rooms a user left. @@ -451,8 +448,8 @@ pub async fn left_state( pub fn rooms_left<'a>( &'a self, user_id: &'a UserId, -) -> impl Stream + Send + 'a { - type KeyVal<'a> = (Key<'a>, Raw>>); +) -> impl Stream)> + Send + 'a { + type KeyVal<'a> = (Key<'a>, Raw>); type Key<'a> = (&'a UserId, &'a RoomId); let prefix = (user_id, Interfix); @@ -461,8 +458,13 @@ pub fn rooms_left<'a>( .stream_prefix(&prefix) .ignore_err() .map(|((_, room_id), state): KeyVal<'_>| (room_id.to_owned(), state)) - .map(|(room_id, state)| Ok((room_id, state.deserialize_as()?))) - .ignore_err() + .ready_filter_map(|(room_id, state)| { + // deserialization errors need to be ignored. see comment in `left_state` + match state.deserialize() { + | Ok(state) => Some((room_id, state)), + | Err(_) => Some((room_id, None)), + } + }) } #[implement(Service)] diff --git a/src/service/rooms/state_cache/update.rs b/src/service/rooms/state_cache/update.rs index 295bd9e7..9fb2f29a 100644 --- a/src/service/rooms/state_cache/update.rs +++ b/src/service/rooms/state_cache/update.rs @@ -1,13 +1,13 @@ use std::collections::HashSet; -use conduwuit::{Err, Result, implement, is_not_empty, utils::ReadyExt, warn}; +use conduwuit::{Err, Event, Pdu, Result, implement, is_not_empty, utils::ReadyExt, warn}; use database::{Json, serialize_key}; use futures::StreamExt; use ruma::{ OwnedServerName, RoomId, UserId, events::{ - AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType, - RoomAccountDataEventType, StateEventType, + AnyStrippedStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, + StateEventType, direct::DirectEvent, invite_permission_config::FilterLevel, room::{ @@ -26,8 +26,7 @@ use ruma::{ fields( %room_id, %user_id, - %sender, - ?membership_event, + ?pdu, ), )] #[allow(clippy::too_many_arguments)] @@ -35,13 +34,10 @@ pub async fn update_membership( &self, room_id: &RoomId, user_id: &UserId, - membership_event: RoomMemberEventContent, - sender: &UserId, - last_state: Option>>, - invite_via: Option>, + pdu: &Pdu, update_joined_count: bool, ) -> Result { - let membership = membership_event.membership; + let membership = pdu.get_content::()?; // Keep track what remote users exist by adding them as "deactivated" users // @@ -54,7 +50,7 @@ pub async fn update_membership( } } - match &membership { + match &membership.membership { | MembershipState::Join => { // Check if the user never joined this room if !self.once_joined(user_id, room_id).await { @@ -125,6 +121,7 @@ pub async fn update_membership( // return an error for blocked invites. ignored invites aren't handled here // since the recipient's membership should still be changed to `invite`. // they're filtered out in the individual /sync handlers + let sender = pdu.sender(); if matches!( self.services .users @@ -136,19 +133,15 @@ pub async fn update_membership( "{user_id} has blocked invites from {sender}." ))); } - self.mark_as_invited(user_id, room_id, sender, last_state, invite_via) + + // TODO: make sure that passing None for `last_state` is correct behavior. + // the call from `append_pdu` used to use `services.state.summary_stripped` + // to fill that parameter. + self.mark_as_invited(user_id, room_id, sender, None, None) .await; }, | MembershipState::Leave | MembershipState::Ban => { - self.mark_as_left(user_id, room_id); - - if self.services.globals.user_is_local(user_id) - && (self.services.config.forget_forced_upon_leave - || self.services.metadata.is_banned(room_id).await - || self.services.metadata.is_disabled(room_id).await) - { - self.forget(room_id, user_id); - } + self.mark_as_left(user_id, room_id, Some(pdu.clone())).await; }, | _ => {}, } @@ -252,24 +245,24 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { self.db.roomid_inviteviaservers.remove(room_id); } -/// Direct DB function to directly mark a user as left. It is not -/// recommended to use this directly. You most likely should use -/// `update_membership` instead +/// Mark a user as having left a room. +/// +/// `leave_pdu` represents the m.room.member event which the user sent to leave +/// the room. If this is None, no event was actually sent, but we must still +/// behave as if the user is no longer in the room. This may occur, for example, +/// if the room being left has been server-banned by an administrator. #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] -pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { +pub async fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId, leave_pdu: Option) { let userroom_id = (user_id, room_id); let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id"); let roomuser_id = (room_id, user_id); let roomuser_id = serialize_key(roomuser_id).expect("failed to serialize roomuser_id"); - // (timo) TODO - let leftstate = Vec::>::new(); - self.db .userroomid_leftstate - .raw_put(&userroom_id, Json(leftstate)); + .raw_put(&userroom_id, Json(leave_pdu)); self.db .roomuserid_leftcount .raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap()); @@ -285,6 +278,14 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { self.db.roomuserid_knockedcount.remove(&roomuser_id); self.db.roomid_inviteviaservers.remove(room_id); + + if self.services.globals.user_is_local(user_id) + && (self.services.config.forget_forced_upon_leave + || self.services.metadata.is_banned(room_id).await + || self.services.metadata.is_disabled(room_id).await) + { + self.forget(room_id, user_id); + } } /// Direct DB function to directly mark a user as knocked. It is not @@ -366,7 +367,7 @@ pub async fn mark_as_invited( .raw_aput::<8, _, _>(&roomuser_id, self.services.globals.next_count().unwrap()); self.db .userroomid_invitesender - .raw_put(&userroom_id, sender_user); + .insert(&userroom_id, sender_user); self.db.userroomid_joined.remove(&userroom_id); self.db.roomuserid_joined.remove(&roomuser_id); diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index 32605c10..8a2b4782 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -19,9 +19,7 @@ use ruma::{ GlobalAccountDataEventType, StateEventType, TimelineEventType, push_rules::PushRulesEvent, room::{ - encrypted::Relation, - member::{MembershipState, RoomMemberEventContent}, - power_levels::RoomPowerLevelsEventContent, + encrypted::Relation, power_levels::RoomPowerLevelsEventContent, redaction::RoomRedactionEventContent, }, }, @@ -323,31 +321,12 @@ where let target_user_id = UserId::parse(state_key).expect("This state_key was previously validated"); - let content: RoomMemberEventContent = pdu.get_content()?; - let stripped_state = match content.membership { - | MembershipState::Invite | MembershipState::Knock => self - .services - .state - .summary_stripped(pdu, room_id) - .await - .into(), - | _ => None, - }; - // Update our membership info, we do this here incase a user is invited or // knocked and immediately leaves we need the DB to record the invite or // knock event for auth self.services .state_cache - .update_membership( - room_id, - target_user_id, - content, - pdu.sender(), - stripped_state, - None, - true, - ) + .update_membership(room_id, target_user_id, &pdu, true) .await?; } },