From be743ec70afe9a87f117344821d19761facda1a2 Mon Sep 17 00:00:00 2001 From: Ginger Date: Wed, 5 Nov 2025 10:01:12 -0500 Subject: [PATCH] chore(sync/v3): Use more descriptive names for SyncContext properties --- src/api/client/sync/v3/joined.rs | 126 ++++++++++++++++--------------- src/api/client/sync/v3/left.rs | 34 +++++---- src/api/client/sync/v3/mod.rs | 95 ++++++++++++++--------- src/api/client/sync/v3/state.rs | 8 +- 4 files changed, 151 insertions(+), 112 deletions(-) diff --git a/src/api/client/sync/v3/joined.rs b/src/api/client/sync/v3/joined.rs index 8864f7fe..251cbf9f 100644 --- a/src/api/client/sync/v3/joined.rs +++ b/src/api/client/sync/v3/joined.rs @@ -58,20 +58,15 @@ pub(super) async fn load_joined_room( ref room_id: OwnedRoomId, ) -> Result<(JoinedRoom, DeviceListUpdates)> { let SyncContext { - sender_user, - since, - next_batch, + syncing_user, + last_sync_end_count, + current_count, full_state, .. } = sync_context; - - // the global count as of the end of the last sync. - // this will be None if we are doing an initial sync. - let previous_sync_end_count = since.map(PduCount::Normal); - let next_batchcount = PduCount::Normal(next_batch); let mut device_list_updates = DeviceListUpdates::new(); - // the room state right now + // the room state as of `next_batch`. let current_shortstatehash = services .rooms .state @@ -81,36 +76,39 @@ pub(super) async fn load_joined_room( // the room state as of the end of the last sync. // this will be None if we are doing an initial sync or if we just joined this // room. - let previous_sync_end_shortstatehash = OptionFuture::from(since.map(|since| { - services - .rooms - .user - .get_token_shortstatehash(room_id, since) - .ok() - })) - .map(Option::flatten) - .map(Ok); + let last_sync_end_shortstatehash = + OptionFuture::from(last_sync_end_count.map(|last_sync_end_count| { + services + .rooms + .user + .get_token_shortstatehash(room_id, last_sync_end_count) + .ok() + })) + .map(Option::flatten) + .map(Ok); - let (current_shortstatehash, previous_sync_end_shortstatehash) = - try_join(current_shortstatehash, previous_sync_end_shortstatehash).await?; + let (current_shortstatehash, last_sync_end_shortstatehash) = + try_join(current_shortstatehash, last_sync_end_shortstatehash).await?; + + // load recent timeline events. let timeline = load_timeline( services, - sender_user, + syncing_user, room_id, - previous_sync_end_count, - Some(next_batchcount), + last_sync_end_count.map(PduCount::Normal), + Some(PduCount::Normal(current_count)), services.config.incremental_sync_max_timeline_size, ); let receipt_events = services .rooms .read_receipt - .readreceipts_since(room_id, since) + .readreceipts_since(room_id, last_sync_end_count) .filter_map(|(read_user, _, edu)| async move { services .users - .user_is_ignored(read_user, sender_user) + .user_is_ignored(read_user, syncing_user) .await .or_some((read_user.to_owned(), edu)) }) @@ -142,13 +140,13 @@ pub(super) async fn load_joined_room( services .rooms .user - .last_notification_read(sender_user, room_id) + .last_notification_read(syncing_user, room_id) }) .into(); // the syncing user's membership event during the last sync. // this will be None if `previous_sync_end_shortstatehash` is None. - let membership_during_previous_sync: OptionFuture<_> = previous_sync_end_shortstatehash + let membership_during_previous_sync: OptionFuture<_> = last_sync_end_shortstatehash .map(|shortstatehash| { services .rooms @@ -156,7 +154,7 @@ pub(super) async fn load_joined_room( .state_get_content( shortstatehash, &StateEventType::RoomMember, - sender_user.as_str(), + syncing_user.as_str(), ) .ok() }) @@ -199,16 +197,16 @@ pub(super) async fn load_joined_room( *or* we just joined this room, `calculate_state_initial` will be used, otherwise `calculate_state_incremental` will be used. */ - let mut state_events = if let Some(previous_sync_end_count) = previous_sync_end_count - && let Some(previous_sync_end_shortstatehash) = previous_sync_end_shortstatehash + let mut state_events = if let Some(last_sync_end_count) = last_sync_end_count + && let Some(last_sync_end_shortstatehash) = last_sync_end_shortstatehash && !full_state { calculate_state_incremental( services, - sender_user, + syncing_user, room_id, - previous_sync_end_count, - previous_sync_end_shortstatehash, + PduCount::Normal(last_sync_end_count), + last_sync_end_shortstatehash, timeline_start_shortstatehash, current_shortstatehash, &timeline, @@ -219,7 +217,7 @@ pub(super) async fn load_joined_room( } else { calculate_state_initial( services, - sender_user, + syncing_user, timeline_start_shortstatehash, lazily_loaded_members.as_ref(), ) @@ -228,7 +226,7 @@ pub(super) async fn load_joined_room( }; // for incremental syncs, calculate updates to E2EE device lists - if previous_sync_end_count.is_some() && is_encrypted_room { + if last_sync_end_count.is_some() && is_encrypted_room { calculate_device_list_updates( services, sync_context, @@ -243,8 +241,9 @@ pub(super) async fn load_joined_room( // only compute room counts and heroes (aka the summary) if the room's members // changed since the last sync let (joined_member_count, invited_member_count, heroes) = + // calculate counts if any of the state events we're syncing are of type `m.room.member` if state_events.iter().any(|event| event.kind == RoomMember) { - calculate_counts(services, room_id, sender_user).await? + calculate_counts(services, room_id, syncing_user).await? } else { (None, None, None) }; @@ -254,7 +253,7 @@ pub(super) async fn load_joined_room( && pdu .state_key .as_deref() - .is_some_and(is_equal_to!(sender_user.as_str())) + .is_some_and(is_equal_to!(syncing_user.as_str())) }; // the membership event of the syncing user, if they joined since the last sync @@ -272,7 +271,7 @@ pub(super) async fn load_joined_room( let prev_batch = timeline.pdus.front().map(at!(0)).or_else(|| { sender_join_membership_event .is_some() - .and(since) + .and(last_sync_end_count) .map(Into::into) }); @@ -281,7 +280,7 @@ pub(super) async fn load_joined_room( .into_iter() .stream() // filter out ignored events from the timeline - .wide_filter_map(|item| ignored_filter(services, item, sender_user)) + .wide_filter_map(|item| ignored_filter(services, item, syncing_user)) .map(at!(1)) // if the syncing user just joined, add their membership event to the timeline .chain(sender_join_membership_event.into_iter().stream()) @@ -290,7 +289,7 @@ pub(super) async fn load_joined_room( let account_data_events = services .account_data - .changes_since(Some(room_id), sender_user, since, Some(next_batch)) + .changes_since(Some(room_id), syncing_user, last_sync_end_count, Some(current_count)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect(); @@ -301,7 +300,8 @@ pub(super) async fn load_joined_room( 3. the last notification the user saw has changed since the last sync */ let send_notification_counts = last_notification_read.is_none_or(|last_notification_read| { - since.is_none_or(|since| last_notification_read > since) + last_sync_end_count + .is_none_or(|last_sync_end_count| last_notification_read > last_sync_end_count) }); let notification_count: OptionFuture<_> = send_notification_counts @@ -309,7 +309,7 @@ pub(super) async fn load_joined_room( services .rooms .user - .notification_count(sender_user, room_id) + .notification_count(syncing_user, room_id) .map(TryInto::try_into) .unwrap_or(uint!(0)) }) @@ -320,7 +320,7 @@ pub(super) async fn load_joined_room( services .rooms .user - .highlight_count(sender_user, room_id) + .highlight_count(syncing_user, room_id) .map(TryInto::try_into) .unwrap_or(uint!(0)) }) @@ -331,14 +331,15 @@ pub(super) async fn load_joined_room( .typing .last_typing_update(room_id) .and_then(|count| async move { - if since.is_some_and(|since| count <= since) { + if last_sync_end_count.is_some_and(|last_sync_end_count| count <= last_sync_end_count) + { return Ok(Vec::>::new()); } let typings = services .rooms .typing - .typings_event_for_user(room_id, sender_user) + .typings_event_for_user(room_id, syncing_user) .await?; Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) @@ -352,12 +353,12 @@ pub(super) async fn load_joined_room( let (room_events, account_data_events, typing_events) = events; let (notification_count, highlight_count) = unread_notifications; - let last_privateread_update = if let Some(since) = since { + let last_privateread_update = if let Some(last_sync_end_count) = last_sync_end_count { services .rooms .read_receipt - .last_privateread_update(sender_user, room_id) - .await > since + .last_privateread_update(syncing_user, room_id) + .await > last_sync_end_count } else { true }; @@ -366,7 +367,7 @@ pub(super) async fn load_joined_room( services .rooms .read_receipt - .private_read_get(room_id, sender_user) + .private_read_get(room_id, syncing_user) .await .ok() } else { @@ -383,7 +384,7 @@ pub(super) async fn load_joined_room( services .rooms .user - .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash) + .associate_token_shortstatehash(room_id, current_count, current_shortstatehash) .await; let joined_room = JoinedRoom { @@ -417,7 +418,12 @@ pub(super) async fn load_joined_room( async fn calculate_device_list_updates( services: &Services, - SyncContext { sender_user, since, next_batch, .. }: SyncContext<'_>, + SyncContext { + syncing_user, + last_sync_end_count: since, + current_count, + .. + }: SyncContext<'_>, room_id: &RoomId, device_list_updates: &mut DeviceListUpdates, state_events: &Vec, @@ -426,7 +432,7 @@ async fn calculate_device_list_updates( // add users with changed keys to the `changed` list services .users - .room_keys_changed(room_id, since, Some(next_batch)) + .room_keys_changed(room_id, since, Some(current_count)) .map(at!(0)) .map(ToOwned::to_owned) .ready_for_each(|user_id| { @@ -456,7 +462,7 @@ async fn calculate_device_list_updates( if matches!(content.membership, Leave | Join) { let shares_encrypted_room = - share_encrypted_room(services, sender_user, &user_id, Some(room_id)) + share_encrypted_room(services, syncing_user, &user_id, Some(room_id)) .await; match content.membership { | Leave if !shares_encrypted_room => { @@ -476,7 +482,7 @@ async fn calculate_device_list_updates( async fn calculate_counts( services: &Services, room_id: &RoomId, - sender_user: &UserId, + syncing_user: &UserId, ) -> Result<(Option, Option, Option>)> { let joined_member_count = services .rooms @@ -496,7 +502,7 @@ async fn calculate_counts( let small_room = joined_member_count.saturating_add(invited_member_count) <= 5; let heroes: OptionFuture<_> = small_room - .then(|| calculate_heroes(services, room_id, sender_user)) + .then(|| calculate_heroes(services, room_id, syncing_user)) .into(); Ok((Some(joined_member_count), Some(invited_member_count), heroes.await)) @@ -505,15 +511,15 @@ async fn calculate_counts( async fn calculate_heroes( services: &Services, room_id: &RoomId, - sender_user: &UserId, + syncing_user: &UserId, ) -> Vec { services .rooms .timeline - .all_pdus(sender_user, room_id) + .all_pdus(syncing_user, room_id) .ready_filter(|(_, pdu)| pdu.kind == RoomMember) .fold_default(|heroes: Vec<_>, (_, pdu)| { - fold_hero(heroes, services, room_id, sender_user, pdu) + fold_hero(heroes, services, room_id, syncing_user, pdu) }) .await } @@ -522,7 +528,7 @@ async fn fold_hero( mut heroes: Vec, services: &Services, room_id: &RoomId, - sender_user: &UserId, + syncing_user: &UserId, pdu: PduEvent, ) -> Vec { let Some(user_id): Option<&UserId> = @@ -531,7 +537,7 @@ async fn fold_hero( return heroes; }; - if user_id == sender_user { + if user_id == syncing_user { return heroes; } diff --git a/src/api/client/sync/v3/left.rs b/src/api/client/sync/v3/left.rs index ab5f36d9..6b9c579a 100644 --- a/src/api/client/sync/v3/left.rs +++ b/src/api/client/sync/v3/left.rs @@ -41,14 +41,18 @@ pub(super) async fn load_left_room( leave_pdu: Option, ) -> Result> { let SyncContext { - sender_user, since, next_batch, filter, .. + syncing_user, + last_sync_end_count, + current_count, + filter, + .. } = sync_context; // the global count as of the moment the user left the room let Some(left_count) = services .rooms .state_cache - .get_left_count(room_id, sender_user) + .get_left_count(room_id, syncing_user) .await .ok() else { @@ -62,13 +66,13 @@ pub(super) async fn load_left_room( // return early if we haven't gotten to this leave yet. // this can happen if the user leaves while a sync response is being generated - if next_batch < left_count { + if current_count < left_count { return Ok(None); } // return early if this is an incremental sync, and we've already synced this // leave to the user, and `include_leave` isn't set on the filter. - if !include_leave && since.is_some_and(|since| since >= left_count) { + if !include_leave && last_sync_end_count >= Some(left_count) { return Ok(None); } @@ -96,7 +100,7 @@ pub(super) async fn load_left_room( // we have this room in our DB, and can fetch the state and timeline from when // the user left if they're allowed to see it. - let leave_state_key = sender_user; + let leave_state_key = syncing_user; debug_assert_eq!( Some(leave_state_key.as_str()), leave_pdu.state_key(), @@ -130,9 +134,11 @@ pub(super) async fn load_left_room( // if the user went from `join` to `leave`, they should be able to view the // timeline. - let timeline_start_count = if let Some(since) = since { + let timeline_start_count = if let Some(last_sync_end_count) = + last_sync_end_count + { // for incremental syncs, start the timeline after `since` - PduCount::Normal(since) + PduCount::Normal(last_sync_end_count) } else { // for initial syncs, start the timeline at the previous membership event services @@ -150,7 +156,7 @@ pub(super) async fn load_left_room( let timeline = load_timeline( services, - sender_user, + syncing_user, room_id, Some(timeline_start_count), Some(timeline_end_count), @@ -188,7 +194,7 @@ pub(super) async fn load_left_room( // more processing than strictly necessary. let state = calculate_state_initial( services, - sender_user, + syncing_user, timeline_start_shortstatehash, lazily_loaded_members.as_ref(), ) @@ -236,7 +242,7 @@ pub(super) async fn load_left_room( .into_iter() .stream() // filter out ignored events from the timeline - .wide_filter_map(|item| ignored_filter(services, item, sender_user)) + .wide_filter_map(|item| ignored_filter(services, item, syncing_user)) .map(at!(1)) .map(Event::into_format) .collect::>() @@ -246,7 +252,7 @@ pub(super) async fn load_left_room( account_data: RoomAccountData { events: Vec::new() }, timeline: Timeline { limited: timeline.limited, - prev_batch: Some(next_batch.to_string()), + prev_batch: Some(current_count.to_string()), events: raw_timeline_pdus, }, state: State { @@ -257,7 +263,7 @@ pub(super) async fn load_left_room( fn create_dummy_leave_event( services: &Services, - SyncContext { sender_user, .. }: SyncContext<'_>, + SyncContext { syncing_user, .. }: SyncContext<'_>, room_id: &RoomId, ) -> PduEvent { // TODO: because this event ID is random, it could cause caching issues with @@ -265,14 +271,14 @@ fn create_dummy_leave_event( // events, or they could be stored as outliers? PduEvent { event_id: EventId::new(services.globals.server_name()), - sender: sender_user.to_owned(), + sender: syncing_user.to_owned(), origin: None, origin_server_ts: utils::millis_since_unix_epoch() .try_into() .expect("Timestamp is valid js_int value"), kind: TimelineEventType::RoomMember, content: RawValue::from_string(r#"{"membership": "leave"}"#.to_owned()).unwrap(), - state_key: Some(sender_user.as_str().into()), + state_key: Some(syncing_user.as_str().into()), unsigned: None, // The following keys are dropped on conversion room_id: Some(room_id.to_owned()), diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index 1c2ee074..1ca9e027 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -52,6 +52,7 @@ use crate::{ }, }; +/// A collection of updates to users' device lists, used for E2EE. struct DeviceListUpdates { changed: HashSet, left: HashSet, @@ -80,23 +81,39 @@ impl From for DeviceLists { } } +/// References to common data needed to calculate the sync response. #[derive(Clone, Copy)] struct SyncContext<'a> { - sender_user: &'a UserId, - sender_device: &'a DeviceId, - since: Option, - next_batch: u64, + /// The ID of the user requesting this sync. + syncing_user: &'a UserId, + /// The ID of the device requesting this sync, which will belong to + /// `syncing_user`. + syncing_device: &'a DeviceId, + /// The global count at the end of the previous sync response. + /// The previous sync's `current_count` will become the next sync's + /// `last_sync_end_count`. This will be None if no `since` query parameter + /// was specified, indicating an initial sync. + last_sync_end_count: Option, + /// The global count as of when we started building the sync response. + /// This is used as an upper bound when querying the database to ensure the + /// response represents a snapshot in time and doesn't include data which + /// appeared while the response was being built. + current_count: u64, + /// The `full_state` query parameter, used when syncing state for joined and + /// left rooms. full_state: bool, + /// The sync filter, which the client uses to specify what data should be + /// included in the sync response. filter: &'a FilterDefinition, } impl<'a> SyncContext<'a> { fn lazy_loading_context(&self, room_id: &'a RoomId) -> lazy_loading::Context<'a> { lazy_loading::Context { - user_id: self.sender_user, - device_id: Some(self.sender_device), + user_id: self.syncing_user, + device_id: Some(self.syncing_device), room_id, - token: self.since, + token: self.last_sync_end_count, options: Some(&self.filter.room.state.lazy_load_options), } } @@ -196,10 +213,12 @@ pub(crate) async fn build_sync_events( services: &Services, body: &Ruma, ) -> Result> { - let (sender_user, sender_device) = body.sender(); + let (syncing_user, syncing_device) = body.sender(); - let next_batch = services.globals.current_count()?; - let since = body + let current_count = services.globals.current_count()?; + + // the `since` token is the last sync end count stringified + let last_sync_end_count = body .body .since .as_ref() @@ -209,20 +228,23 @@ pub(crate) async fn build_sync_events( // FilterDefinition is very large (0x1000 bytes), let's put it on the heap let filter = Box::new(match body.body.filter.as_ref() { + // use the default filter if none was specified | None => FilterDefinition::default(), + // use inline filters directly | Some(Filter::FilterDefinition(filter)) => filter.clone(), + // look up filter IDs from the database | Some(Filter::FilterId(filter_id)) => services .users - .get_filter(sender_user, filter_id) + .get_filter(syncing_user, filter_id) .await .unwrap_or_default(), }); let context = SyncContext { - sender_user, - sender_device, - since, - next_batch, + syncing_user, + syncing_device, + last_sync_end_count, + current_count, full_state, filter: &filter, }; @@ -230,7 +252,7 @@ pub(crate) async fn build_sync_events( let joined_rooms = services .rooms .state_cache - .rooms_joined(sender_user) + .rooms_joined(syncing_user) .map(ToOwned::to_owned) .broad_filter_map(|room_id| async { let joined_room = load_joined_room(services, context, room_id.clone()).await; @@ -259,7 +281,7 @@ pub(crate) async fn build_sync_events( let left_rooms = services .rooms .state_cache - .rooms_left(sender_user) + .rooms_left(syncing_user) .broad_filter_map(|(room_id, leave_pdu)| { load_left_room(services, context, room_id.clone(), leave_pdu) .map_ok(move |left_room| (room_id, left_room)) @@ -271,9 +293,9 @@ pub(crate) async fn build_sync_events( let invited_rooms = services .rooms .state_cache - .rooms_invited(sender_user) + .rooms_invited(syncing_user) .wide_filter_map(async |(room_id, invite_state)| { - if is_ignored_invite(services, sender_user, &room_id).await { + if is_ignored_invite(services, syncing_user, &room_id).await { None } else { Some((room_id, invite_state)) @@ -283,12 +305,12 @@ pub(crate) async fn build_sync_events( let invite_count = services .rooms .state_cache - .get_invite_count(&room_id, sender_user) + .get_invite_count(&room_id, syncing_user) .await .ok(); // only sync this invite if it was sent after the last /sync call - if since < invite_count { + if last_sync_end_count < invite_count { let invited_room = InvitedRoom { invite_state: InviteState { events: invite_state }, }; @@ -301,17 +323,17 @@ pub(crate) async fn build_sync_events( let knocked_rooms = services .rooms .state_cache - .rooms_knocked(sender_user) + .rooms_knocked(syncing_user) .fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move { let knock_count = services .rooms .state_cache - .get_knock_count(&room_id, sender_user) + .get_knock_count(&room_id, syncing_user) .await .ok(); // only sync this knock if it was sent after the last /sync call - if since < knock_count { + if last_sync_end_count < knock_count { let knocked_room = KnockedRoom { knock_state: KnockState { events: knock_state }, }; @@ -324,36 +346,41 @@ pub(crate) async fn build_sync_events( let presence_updates: OptionFuture<_> = services .config .allow_local_presence - .then(|| process_presence_updates(services, since, sender_user)) + .then(|| process_presence_updates(services, last_sync_end_count, syncing_user)) .into(); let account_data = services .account_data - .changes_since(None, sender_user, since, Some(next_batch)) + .changes_since(None, syncing_user, last_sync_end_count, Some(current_count)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect(); // Look for device list updates of this account let keys_changed = services .users - .keys_changed(sender_user, since, Some(next_batch)) + .keys_changed(syncing_user, last_sync_end_count, Some(current_count)) .map(ToOwned::to_owned) .collect::>(); let to_device_events = services .users - .get_to_device_events(sender_user, sender_device, since, Some(next_batch)) + .get_to_device_events( + syncing_user, + syncing_device, + last_sync_end_count, + Some(current_count), + ) .collect::>(); let device_one_time_keys_count = services .users - .count_one_time_keys(sender_user, sender_device); + .count_one_time_keys(syncing_user, syncing_device); // Remove all to-device events the device received *last time* let remove_to_device_events = services .users - .remove_to_device_events(sender_user, sender_device, since); + .remove_to_device_events(syncing_user, syncing_device, last_sync_end_count); let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms); let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); @@ -373,7 +400,7 @@ pub(crate) async fn build_sync_events( device_one_time_keys_count, // Fallback keys are not yet supported device_unused_fallback_key_types: None, - next_batch: next_batch.to_string(), + next_batch: current_count.to_string(), presence: Presence { events: presence_updates .into_iter() @@ -398,12 +425,12 @@ pub(crate) async fn build_sync_events( #[tracing::instrument(name = "presence", level = "debug", skip_all)] async fn process_presence_updates( services: &Services, - since: Option, + last_sync_end_count: Option, syncing_user: &UserId, ) -> PresenceUpdates { services .presence - .presence_since(since.unwrap_or(0)) // send all presences on initial sync + .presence_since(last_sync_end_count.unwrap_or(0)) // send all presences on initial sync .filter(|(user_id, ..)| { services .rooms @@ -433,7 +460,7 @@ async fn prepare_lazily_loaded_members( // reset lazy loading state on initial sync. // do this even if lazy loading is disabled so future lazy loads // will have the correct members. - if sync_context.since.is_none() { + if sync_context.last_sync_end_count.is_none() { services .rooms .lazy_loading diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index 53574ca8..8838f4f7 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -93,8 +93,8 @@ pub(super) async fn calculate_state_incremental<'a>( services: &Services, sender_user: &'a UserId, room_id: &RoomId, - previous_sync_end_count: PduCount, - previous_sync_end_shortstatehash: ShortStateHash, + last_sync_end_count: PduCount, + last_sync_end_shortstatehash: ShortStateHash, timeline_start_shortstatehash: ShortStateHash, timeline_end_shortstatehash: ShortStateHash, timeline: &TimelinePdus, @@ -117,7 +117,7 @@ pub(super) async fn calculate_state_incremental<'a>( let last_pdu_of_last_sync = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(previous_sync_end_count.saturating_add(1))) + .pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1))) .boxed() .next() .await @@ -237,7 +237,7 @@ pub(super) async fn calculate_state_incremental<'a>( services .rooms .state_accessor - .state_added((previous_sync_end_shortstatehash, timeline_end_shortstatehash)) + .state_added((last_sync_end_shortstatehash, timeline_end_shortstatehash)) .await? .stream() .ready_filter_map(|(_, shorteventid)| {