fix(sync/v3): Cleanup part 1: mostly fix redundant data in state
This commit is contained in:
parent
33c3d23d60
commit
26fa73841b
13 changed files with 466 additions and 426 deletions
|
|
@ -41,7 +41,7 @@ async fn changes_since(
|
|||
let results: Vec<_> = self
|
||||
.services
|
||||
.account_data
|
||||
.changes_since(room_id.as_deref(), &user_id, since, None)
|
||||
.changes_since(room_id.as_deref(), &user_id, Some(since), None)
|
||||
.collect()
|
||||
.await;
|
||||
let query_time = timer.elapsed();
|
||||
|
|
|
|||
|
|
@ -389,7 +389,7 @@ pub(crate) async fn get_key_changes_route(
|
|||
device_list_updates.extend(
|
||||
services
|
||||
.users
|
||||
.keys_changed(sender_user, from, Some(to))
|
||||
.keys_changed(sender_user, Some(from), Some(to))
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
.await,
|
||||
|
|
@ -401,7 +401,7 @@ pub(crate) async fn get_key_changes_route(
|
|||
device_list_updates.extend(
|
||||
services
|
||||
.users
|
||||
.room_keys_changed(room_id, from, Some(to))
|
||||
.room_keys_changed(room_id, Some(from), Some(to))
|
||||
.map(|(user_id, _)| user_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ where
|
|||
let receipts = services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());
|
||||
.readreceipts_since(lazy_loading_context.room_id, Some(oldest.into_unsigned()));
|
||||
|
||||
pin_mut!(receipts);
|
||||
let witness: Witness = events
|
||||
|
|
|
|||
|
|
@ -3,12 +3,13 @@ mod v4;
|
|||
mod v5;
|
||||
|
||||
use conduwuit::{
|
||||
Error, PduCount, Result,
|
||||
PduCount, Result,
|
||||
matrix::pdu::PduEvent,
|
||||
trace,
|
||||
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
|
||||
};
|
||||
use conduwuit_service::Services;
|
||||
use futures::{StreamExt, pin_mut};
|
||||
use futures::StreamExt;
|
||||
use ruma::{
|
||||
RoomId, UserId,
|
||||
events::TimelineEventType::{
|
||||
|
|
@ -23,43 +24,75 @@ pub(crate) use self::{
|
|||
pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
|
||||
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TimelinePdus {
|
||||
pub pdus: Vec<(PduCount, PduEvent)>,
|
||||
pub limited: bool,
|
||||
}
|
||||
|
||||
async fn load_timeline(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
roomsincecount: PduCount,
|
||||
next_batch: Option<PduCount>,
|
||||
starting_count: Option<PduCount>,
|
||||
ending_count: Option<PduCount>,
|
||||
limit: usize,
|
||||
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
|
||||
) -> Result<TimelinePdus> {
|
||||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.await?;
|
||||
|
||||
if last_timeline_count <= roomsincecount {
|
||||
return Ok((Vec::new(), false));
|
||||
let mut pdus_between_counts = match starting_count {
|
||||
| Some(starting_count) => {
|
||||
if last_timeline_count <= starting_count {
|
||||
return Ok(TimelinePdus::default());
|
||||
}
|
||||
|
||||
// Stream from the DB all PDUs which were sent after `starting_count` but before
|
||||
// `ending_count`, including both endpoints
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(starting_count))
|
||||
.ignore_err()
|
||||
.ready_take_while(|&(pducount, _)| {
|
||||
pducount <= ending_count.unwrap_or_else(PduCount::max)
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
| None => {
|
||||
// For initial sync, stream from the DB all PDUs before and including
|
||||
// `ending_count` in reverse order
|
||||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, ending_count)
|
||||
.ignore_err()
|
||||
.boxed()
|
||||
},
|
||||
};
|
||||
|
||||
// Return at most `limit` PDUs from the stream
|
||||
let mut pdus: Vec<_> = pdus_between_counts.by_ref().take(limit).collect().await;
|
||||
if starting_count.is_none() {
|
||||
// `pdus_rev` returns PDUs in reverse order. fix that here
|
||||
pdus.reverse();
|
||||
}
|
||||
// The timeline is limited if more than `limit` PDUs exist in the DB after
|
||||
// `starting_count`
|
||||
let limited = pdus_between_counts.next().await.is_some();
|
||||
|
||||
let non_timeline_pdus = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, None)
|
||||
.ignore_err()
|
||||
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
|
||||
.ready_take_while(|&(pducount, _)| pducount > roomsincecount);
|
||||
trace!(
|
||||
"syncing {:?} timeline pdus from {:?} to {:?} (limited = {:?})",
|
||||
pdus.len(),
|
||||
starting_count,
|
||||
ending_count,
|
||||
limited,
|
||||
);
|
||||
|
||||
// Take the last events for the timeline
|
||||
pin_mut!(non_timeline_pdus);
|
||||
let timeline_pdus: Vec<_> = non_timeline_pdus.by_ref().take(limit).collect().await;
|
||||
|
||||
let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect();
|
||||
|
||||
// They /sync response doesn't always return all messages, so we say the output
|
||||
// is limited unless there are events in non_timeline_pdus
|
||||
let limited = non_timeline_pdus.next().await.is_some();
|
||||
|
||||
Ok((timeline_pdus, limited))
|
||||
Ok(TimelinePdus { pdus, limited })
|
||||
}
|
||||
|
||||
async fn share_encrypted_room(
|
||||
|
|
|
|||
|
|
@ -11,13 +11,13 @@ use conduwuit::{
|
|||
Event,
|
||||
pdu::{EventHash, PduCount, PduEvent},
|
||||
},
|
||||
pair_of, ref_at,
|
||||
ref_at,
|
||||
result::FlatOk,
|
||||
utils::{
|
||||
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||
future::{OptionStream, ReadyEqExt},
|
||||
math::ruma_from_u64,
|
||||
stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
|
||||
stream::{BroadbandExt, Tools, WidebandExt},
|
||||
},
|
||||
warn,
|
||||
};
|
||||
|
|
@ -30,8 +30,8 @@ use conduwuit_service::{
|
|||
},
|
||||
};
|
||||
use futures::{
|
||||
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
|
||||
future::{OptionFuture, join, join3, join4, join5, try_join, try_join4},
|
||||
FutureExt, StreamExt, TryFutureExt,
|
||||
future::{OptionFuture, join, join3, join4, join5, try_join4},
|
||||
pin_mut,
|
||||
};
|
||||
use ruma::{
|
||||
|
|
@ -58,21 +58,50 @@ use ruma::{
|
|||
uint,
|
||||
};
|
||||
use service::rooms::short::{ShortEventId, ShortStateKey};
|
||||
use tracing::trace;
|
||||
|
||||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::{
|
||||
Ruma, RumaResponse,
|
||||
client::{ignored_filter, is_ignored_invite},
|
||||
client::{TimelinePdus, ignored_filter, is_ignored_invite},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
struct StateChanges {
|
||||
heroes: Option<Vec<OwnedUserId>>,
|
||||
joined_member_count: Option<u64>,
|
||||
invited_member_count: Option<u64>,
|
||||
state_events: Vec<PduEvent>,
|
||||
device_list_updates: HashSet<OwnedUserId>,
|
||||
left_encrypted_users: HashSet<OwnedUserId>,
|
||||
struct DeviceListUpdates {
|
||||
changed: HashSet<OwnedUserId>,
|
||||
left: HashSet<OwnedUserId>,
|
||||
}
|
||||
|
||||
impl DeviceListUpdates {
|
||||
fn new() -> Self {
|
||||
DeviceListUpdates {
|
||||
changed: HashSet::new(),
|
||||
left: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn merge(&mut self, other: DeviceListUpdates) {
|
||||
self.changed.extend(other.changed);
|
||||
self.left.extend(other.left);
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<DeviceLists> for DeviceListUpdates {
|
||||
fn into(self) -> DeviceLists {
|
||||
DeviceLists {
|
||||
changed: self.changed.into_iter().collect(),
|
||||
left: self.left.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct SyncContext<'a> {
|
||||
sender_user: &'a UserId,
|
||||
sender_device: &'a DeviceId,
|
||||
since: Option<u64>,
|
||||
next_batch: u64,
|
||||
full_state: bool,
|
||||
filter: &'a Box<FilterDefinition>,
|
||||
}
|
||||
|
||||
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
||||
|
|
@ -169,11 +198,12 @@ pub(crate) async fn build_sync_events(
|
|||
.body
|
||||
.since
|
||||
.as_ref()
|
||||
.and_then(|string| string.parse().ok())
|
||||
.unwrap_or(0);
|
||||
.and_then(|string| string.parse().ok());
|
||||
|
||||
let full_state = body.body.full_state;
|
||||
let filter = match body.body.filter.as_ref() {
|
||||
|
||||
// FilterDefinition is very large (0x1000 bytes), let's put it on the heap
|
||||
let filter = Box::new(match body.body.filter.as_ref() {
|
||||
| None => FilterDefinition::default(),
|
||||
| Some(Filter::FilterDefinition(filter)) => filter.clone(),
|
||||
| Some(Filter::FilterId(filter_id)) => services
|
||||
|
|
@ -181,6 +211,15 @@ pub(crate) async fn build_sync_events(
|
|||
.get_filter(sender_user, filter_id)
|
||||
.await
|
||||
.unwrap_or_default(),
|
||||
});
|
||||
|
||||
let context = SyncContext {
|
||||
sender_user,
|
||||
sender_device,
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
filter: &filter,
|
||||
};
|
||||
|
||||
let joined_rooms = services
|
||||
|
|
@ -189,30 +228,20 @@ pub(crate) async fn build_sync_events(
|
|||
.rooms_joined(sender_user)
|
||||
.map(ToOwned::to_owned)
|
||||
.broad_filter_map(|room_id| {
|
||||
load_joined_room(
|
||||
services,
|
||||
sender_user,
|
||||
sender_device,
|
||||
room_id.clone(),
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
&filter,
|
||||
)
|
||||
.map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
|
||||
.ok()
|
||||
load_joined_room(services, context, room_id.clone())
|
||||
.map_ok(move |(joined_room, updates)| (room_id, joined_room, updates))
|
||||
.ok()
|
||||
})
|
||||
.ready_fold(
|
||||
(BTreeMap::new(), HashSet::new(), HashSet::new()),
|
||||
|(mut joined_rooms, mut device_list_updates, mut left_encrypted_users),
|
||||
(room_id, joined_room, dlu, leu)| {
|
||||
device_list_updates.extend(dlu);
|
||||
left_encrypted_users.extend(leu);
|
||||
(BTreeMap::new(), DeviceListUpdates::new()),
|
||||
|(mut joined_rooms, mut all_updates), (room_id, joined_room, updates)| {
|
||||
all_updates.merge(updates);
|
||||
|
||||
if !joined_room.is_empty() {
|
||||
joined_rooms.insert(room_id, joined_room);
|
||||
}
|
||||
|
||||
(joined_rooms, device_list_updates, left_encrypted_users)
|
||||
(joined_rooms, all_updates)
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -221,18 +250,9 @@ pub(crate) async fn build_sync_events(
|
|||
.state_cache
|
||||
.rooms_left(sender_user)
|
||||
.broad_filter_map(|(room_id, _)| {
|
||||
handle_left_room(
|
||||
services,
|
||||
since,
|
||||
room_id.clone(),
|
||||
sender_user,
|
||||
next_batch,
|
||||
full_state,
|
||||
filter.room.include_leave,
|
||||
&filter,
|
||||
)
|
||||
.map_ok(move |left_room| (room_id, left_room))
|
||||
.ok()
|
||||
handle_left_room(services, context, room_id.clone())
|
||||
.map_ok(move |left_room| (room_id, left_room))
|
||||
.ok()
|
||||
})
|
||||
.ready_filter_map(|(room_id, left_room)| left_room.map(|left_room| (room_id, left_room)))
|
||||
.collect();
|
||||
|
|
@ -256,16 +276,14 @@ pub(crate) async fn build_sync_events(
|
|||
.await
|
||||
.ok();
|
||||
|
||||
// Invited before last sync
|
||||
if Some(since) >= invite_count {
|
||||
return invited_rooms;
|
||||
// only sync this invite if it was sent after the last /sync call
|
||||
if since < invite_count {
|
||||
let invited_room = InvitedRoom {
|
||||
invite_state: InviteState { events: invite_state },
|
||||
};
|
||||
|
||||
invited_rooms.insert(room_id, invited_room);
|
||||
}
|
||||
|
||||
let invited_room = InvitedRoom {
|
||||
invite_state: InviteState { events: invite_state },
|
||||
};
|
||||
|
||||
invited_rooms.insert(room_id, invited_room);
|
||||
invited_rooms
|
||||
});
|
||||
|
||||
|
|
@ -281,16 +299,14 @@ pub(crate) async fn build_sync_events(
|
|||
.await
|
||||
.ok();
|
||||
|
||||
// Knocked before last sync
|
||||
if Some(since) >= knock_count {
|
||||
return knocked_rooms;
|
||||
// only sync this knock if it was sent after the last /sync call
|
||||
if since < knock_count {
|
||||
let knocked_room = KnockedRoom {
|
||||
knock_state: KnockState { events: knock_state },
|
||||
};
|
||||
|
||||
knocked_rooms.insert(room_id, knocked_room);
|
||||
}
|
||||
|
||||
let knocked_room = KnockedRoom {
|
||||
knock_state: KnockState { events: knock_state },
|
||||
};
|
||||
|
||||
knocked_rooms.insert(room_id, knocked_room);
|
||||
knocked_rooms
|
||||
});
|
||||
|
||||
|
|
@ -315,7 +331,7 @@ pub(crate) async fn build_sync_events(
|
|||
|
||||
let to_device_events = services
|
||||
.users
|
||||
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
|
||||
.get_to_device_events(sender_user, sender_device, since, Some(next_batch))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let device_one_time_keys_count = services
|
||||
|
|
@ -337,29 +353,12 @@ pub(crate) async fn build_sync_events(
|
|||
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
|
||||
let ((), to_device_events, presence_updates) = ephemeral;
|
||||
let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms;
|
||||
let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms;
|
||||
device_list_updates.extend(keys_changed);
|
||||
|
||||
// If the user doesn't share an encrypted room with the target anymore, we need
|
||||
// to tell them
|
||||
let device_list_left: HashSet<_> = left_encrypted_users
|
||||
.into_iter()
|
||||
.stream()
|
||||
.broad_filter_map(|user_id| async move {
|
||||
share_encrypted_room(services, sender_user, &user_id, None)
|
||||
.await
|
||||
.eq(&false)
|
||||
.then_some(user_id)
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
let (joined_rooms, mut device_list_updates) = joined_rooms;
|
||||
device_list_updates.changed.extend(keys_changed);
|
||||
|
||||
let response = sync_events::v3::Response {
|
||||
account_data: GlobalAccountData { events: account_data },
|
||||
device_lists: DeviceLists {
|
||||
changed: device_list_updates.into_iter().collect(),
|
||||
left: device_list_left.into_iter().collect(),
|
||||
},
|
||||
device_lists: device_list_updates.into(),
|
||||
device_one_time_keys_count,
|
||||
// Fallback keys are not yet supported
|
||||
device_unused_fallback_key_types: None,
|
||||
|
|
@ -388,12 +387,12 @@ pub(crate) async fn build_sync_events(
|
|||
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
|
||||
async fn process_presence_updates(
|
||||
services: &Services,
|
||||
since: u64,
|
||||
since: Option<u64>,
|
||||
syncing_user: &UserId,
|
||||
) -> PresenceUpdates {
|
||||
services
|
||||
.presence
|
||||
.presence_since(since)
|
||||
.presence_since(since.unwrap_or(0)) // send all presences on initial sync
|
||||
.filter(|(user_id, ..)| {
|
||||
services
|
||||
.rooms
|
||||
|
|
@ -424,13 +423,15 @@ async fn process_presence_updates(
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_left_room(
|
||||
services: &Services,
|
||||
since: u64,
|
||||
SyncContext {
|
||||
sender_user,
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
filter,
|
||||
..
|
||||
}: SyncContext<'_>,
|
||||
ref room_id: OwnedRoomId,
|
||||
sender_user: &UserId,
|
||||
next_batch: u64,
|
||||
full_state: bool,
|
||||
include_leave: bool,
|
||||
filter: &FilterDefinition,
|
||||
) -> Result<Option<LeftRoom>> {
|
||||
let left_count = services
|
||||
.rooms
|
||||
|
|
@ -440,7 +441,8 @@ async fn handle_left_room(
|
|||
.ok();
|
||||
|
||||
// Left before last sync
|
||||
if (Some(since) >= left_count && !include_leave) || Some(next_batch) < left_count {
|
||||
let include_leave = filter.room.include_leave;
|
||||
if (since >= left_count && !include_leave) || Some(next_batch) < left_count {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
|
|
@ -489,20 +491,24 @@ async fn handle_left_room(
|
|||
|
||||
let mut left_state_events = Vec::new();
|
||||
|
||||
let since_shortstatehash = services.rooms.user.get_token_shortstatehash(room_id, since);
|
||||
let since_state_ids = async {
|
||||
let since_shortstatehash = services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, since?)
|
||||
.ok()
|
||||
.await?;
|
||||
|
||||
let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash
|
||||
.map_ok(|since_shortstatehash| {
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash)
|
||||
.map(Ok)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_ids(since_shortstatehash)
|
||||
.collect::<HashMap<_, OwnedEventId>>()
|
||||
.map(Some)
|
||||
.await
|
||||
}
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
let Ok(left_event_id): Result<OwnedEventId> = services
|
||||
.rooms
|
||||
|
|
@ -594,29 +600,38 @@ async fn handle_left_room(
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn load_joined_room(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
sender_device: &DeviceId,
|
||||
SyncContext {
|
||||
sender_user,
|
||||
sender_device,
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
filter,
|
||||
..
|
||||
}: SyncContext<'_>,
|
||||
ref room_id: OwnedRoomId,
|
||||
since: u64,
|
||||
next_batch: u64,
|
||||
full_state: bool,
|
||||
filter: &FilterDefinition,
|
||||
) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
|
||||
let sincecount = PduCount::Normal(since);
|
||||
) -> Result<(JoinedRoom, DeviceListUpdates)> {
|
||||
let mut device_list_updates = DeviceListUpdates::new();
|
||||
let sincecount = since.map(PduCount::Normal);
|
||||
let next_batchcount = PduCount::Normal(next_batch);
|
||||
|
||||
// the shortstatehash of the room's state right now
|
||||
let current_shortstatehash = services
|
||||
.rooms
|
||||
.state
|
||||
.get_room_shortstatehash(room_id)
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
||||
|
||||
let since_shortstatehash = services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, since)
|
||||
.ok()
|
||||
.map(Ok);
|
||||
// the shortstatehash of what the room's state was when the `since` token was
|
||||
// issued
|
||||
let since_shortstatehash = OptionFuture::from(since.map(|since| {
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.get_token_shortstatehash(room_id, since)
|
||||
.ok()
|
||||
}))
|
||||
.map(|v| Ok(v.flatten()));
|
||||
|
||||
let timeline = load_timeline(
|
||||
services,
|
||||
|
|
@ -646,41 +661,23 @@ async fn load_joined_room(
|
|||
.boxed()
|
||||
.await?;
|
||||
|
||||
let (timeline_pdus, limited) = timeline;
|
||||
let initial = since_shortstatehash.is_none();
|
||||
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter.room.timeline.lazy_load_options.is_enabled();
|
||||
let TimelinePdus { pdus: timeline_pdus, limited } = timeline;
|
||||
let is_initial_sync = since_shortstatehash.is_none();
|
||||
|
||||
let lazy_loading_context = &lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: Some(sender_device),
|
||||
room_id,
|
||||
token: Some(since),
|
||||
options: Some(&filter.room.state.lazy_load_options),
|
||||
};
|
||||
|
||||
// Reset lazy loading because this is an initial sync
|
||||
let lazy_load_reset: OptionFuture<_> = initial
|
||||
.then(|| services.rooms.lazy_loading.reset(lazy_loading_context))
|
||||
.into();
|
||||
|
||||
lazy_load_reset.await;
|
||||
let witness: OptionFuture<_> = lazy_loading_enabled
|
||||
.then(|| {
|
||||
let witness: Witness = timeline_pdus
|
||||
.iter()
|
||||
.map(ref_at!(1))
|
||||
.map(Event::sender)
|
||||
.map(Into::into)
|
||||
.chain(receipt_events.keys().map(Into::into))
|
||||
.collect();
|
||||
|
||||
services
|
||||
let timeline_start_shortstatehash = async {
|
||||
if let Some((_, pdu)) = timeline_pdus.first() {
|
||||
if let Ok(shortstatehash) = services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
.witness_retain(witness, lazy_loading_context)
|
||||
})
|
||||
.into();
|
||||
.state_accessor
|
||||
.pdu_shortstatehash(&pdu.event_id)
|
||||
.await
|
||||
{
|
||||
return shortstatehash;
|
||||
}
|
||||
}
|
||||
|
||||
return current_shortstatehash;
|
||||
};
|
||||
|
||||
let last_notification_read: OptionFuture<_> = timeline_pdus
|
||||
.is_empty()
|
||||
|
|
@ -702,8 +699,24 @@ async fn load_joined_room(
|
|||
})
|
||||
.into();
|
||||
|
||||
let (last_notification_read, since_sender_member, witness) =
|
||||
join3(last_notification_read, since_sender_member, witness).await;
|
||||
let is_encrypted_room = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
|
||||
.is_ok();
|
||||
|
||||
let (
|
||||
last_notification_read,
|
||||
since_sender_member,
|
||||
timeline_start_shortstatehash,
|
||||
is_encrypted_room,
|
||||
) = join4(
|
||||
last_notification_read,
|
||||
since_sender_member,
|
||||
timeline_start_shortstatehash,
|
||||
is_encrypted_room,
|
||||
)
|
||||
.await;
|
||||
|
||||
let joined_since_last_sync =
|
||||
since_sender_member
|
||||
|
|
@ -712,26 +725,147 @@ async fn load_joined_room(
|
|||
content.membership != MembershipState::Join
|
||||
});
|
||||
|
||||
let StateChanges {
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
mut state_events,
|
||||
mut device_list_updates,
|
||||
left_encrypted_users,
|
||||
} = calculate_state_changes(
|
||||
services,
|
||||
sender_user,
|
||||
let lazy_loading_enabled = (filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter.room.timeline.lazy_load_options.is_enabled())
|
||||
&& !full_state;
|
||||
|
||||
let lazy_loading_context = &lazy_loading::Context {
|
||||
user_id: sender_user,
|
||||
device_id: Some(sender_device),
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
since_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
token: since,
|
||||
options: Some(&filter.room.state.lazy_load_options),
|
||||
};
|
||||
|
||||
let lazy_loading_witness = OptionFuture::from(lazy_loading_enabled.then(|| {
|
||||
let witness: Witness = timeline_pdus
|
||||
.iter()
|
||||
.map(ref_at!(1))
|
||||
.map(Event::sender)
|
||||
.map(Into::into)
|
||||
.chain(receipt_events.keys().map(Into::into))
|
||||
.collect();
|
||||
|
||||
services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
.witness_retain(witness, lazy_loading_context)
|
||||
}))
|
||||
.await;
|
||||
|
||||
/* replace with multiple steps
|
||||
glossary for my own sanity:
|
||||
- full state: every state event from the start of the room to the start of the timeline
|
||||
- incremental state: state events from `since` to the start of the timeline
|
||||
- state_events: the `state` key on the JSON object we return
|
||||
|
||||
if initial sync or full_state:
|
||||
get full state
|
||||
use full state as state_events
|
||||
else if TL is limited:
|
||||
get incremental state
|
||||
use incremental state as state_events
|
||||
if encryption is enabled:
|
||||
use incremental state to extend device list
|
||||
else:
|
||||
state_events is empty
|
||||
compute counts and heroes from state_events
|
||||
*/
|
||||
let mut state_events = if is_initial_sync || full_state {
|
||||
// reset lazy loading state on initial sync
|
||||
if is_initial_sync {
|
||||
services
|
||||
.rooms
|
||||
.lazy_loading
|
||||
.reset(lazy_loading_context)
|
||||
.await;
|
||||
};
|
||||
|
||||
let state_initial = calculate_state_initial(
|
||||
services,
|
||||
sender_user,
|
||||
timeline_start_shortstatehash,
|
||||
lazy_loading_witness.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
state_initial
|
||||
} else if limited {
|
||||
let state_incremental = calculate_state_incremental(
|
||||
services,
|
||||
sender_user,
|
||||
since_shortstatehash,
|
||||
timeline_start_shortstatehash,
|
||||
lazy_loading_witness.as_ref(),
|
||||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
// calculate device list updates for E2EE
|
||||
if is_encrypted_room {
|
||||
// add users with changed keys to the `changed` list
|
||||
services
|
||||
.users
|
||||
.room_keys_changed(room_id, since, Some(next_batch))
|
||||
.map(at!(0))
|
||||
.map(ToOwned::to_owned)
|
||||
.ready_for_each(|user_id| {
|
||||
device_list_updates.changed.insert(user_id);
|
||||
})
|
||||
.await;
|
||||
|
||||
// add users who now share encrypted rooms to `changed` and
|
||||
// users who no longer share encrypted rooms to `left`
|
||||
for state_event in &state_incremental {
|
||||
if state_event.kind == RoomMember {
|
||||
let Some(content): Option<RoomMemberEventContent> =
|
||||
state_event.get_content().ok()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(user_id): Option<OwnedUserId> = state_event
|
||||
.state_key
|
||||
.as_ref()
|
||||
.and_then(|key| key.parse().ok())
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
use MembershipState::*;
|
||||
|
||||
if matches!(content.membership, Leave | Join) {
|
||||
let shares_encrypted_room =
|
||||
share_encrypted_room(services, sender_user, &user_id, Some(room_id))
|
||||
.await;
|
||||
match content.membership {
|
||||
| Leave if !shares_encrypted_room => {
|
||||
device_list_updates.left.insert(user_id);
|
||||
},
|
||||
| Join if joined_since_last_sync || shares_encrypted_room => {
|
||||
device_list_updates.changed.insert(user_id);
|
||||
},
|
||||
| _ => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state_incremental
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
// only compute room counts and heroes (aka the summary) if the room's members
|
||||
// changed since the last sync
|
||||
let (joined_member_count, invited_member_count, heroes) =
|
||||
if state_events.iter().any(|event| event.kind == RoomMember) {
|
||||
calculate_counts(services, room_id, sender_user).await?
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
let is_sender_membership = |pdu: &PduEvent| {
|
||||
pdu.kind == StateEventType::RoomMember.into()
|
||||
|
|
@ -753,11 +887,11 @@ async fn load_joined_room(
|
|||
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
|
||||
joined_sender_member
|
||||
.is_some()
|
||||
.then_some(since)
|
||||
.and_then(|| since)
|
||||
.map(Into::into)
|
||||
});
|
||||
|
||||
let room_events = timeline_pdus
|
||||
let timeline_pdus = timeline_pdus
|
||||
.into_iter()
|
||||
.stream()
|
||||
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
|
||||
|
|
@ -772,15 +906,8 @@ async fn load_joined_room(
|
|||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect();
|
||||
|
||||
// Look for device list updates in this room
|
||||
let device_updates = services
|
||||
.users
|
||||
.room_keys_changed(room_id, since, Some(next_batch))
|
||||
.map(|(user_id, _)| user_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let send_notification_counts = last_notification_read.is_none_or(|count| count > since);
|
||||
let send_notification_counts =
|
||||
last_notification_read.is_none_or(|count| since.is_none_or(|since| count > since));
|
||||
|
||||
let notification_count: OptionFuture<_> = send_notification_counts
|
||||
.then(|| {
|
||||
|
|
@ -809,7 +936,7 @@ async fn load_joined_room(
|
|||
.typing
|
||||
.last_typing_update(room_id)
|
||||
.and_then(|count| async move {
|
||||
if count <= since {
|
||||
if since.is_some_and(|since| count <= since) {
|
||||
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
|
||||
}
|
||||
|
||||
|
|
@ -824,22 +951,21 @@ async fn load_joined_room(
|
|||
.unwrap_or(Vec::new());
|
||||
|
||||
let unread_notifications = join(notification_count, highlight_count);
|
||||
let events = join3(room_events, account_data_events, typing_events);
|
||||
let (unread_notifications, events, device_updates) =
|
||||
join3(unread_notifications, events, device_updates)
|
||||
.boxed()
|
||||
.await;
|
||||
let events = join3(timeline_pdus, account_data_events, typing_events);
|
||||
let (unread_notifications, events) = join(unread_notifications, events).boxed().await;
|
||||
|
||||
let (room_events, account_data_events, typing_events) = events;
|
||||
let (notification_count, highlight_count) = unread_notifications;
|
||||
|
||||
device_list_updates.extend(device_updates);
|
||||
|
||||
let last_privateread_update = services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.last_privateread_update(sender_user, room_id)
|
||||
.await > since;
|
||||
let last_privateread_update = if let Some(since) = since {
|
||||
services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.last_privateread_update(sender_user, room_id)
|
||||
.await > since
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
let private_read_event = if last_privateread_update {
|
||||
services
|
||||
|
|
@ -880,7 +1006,7 @@ async fn load_joined_room(
|
|||
},
|
||||
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
|
||||
timeline: Timeline {
|
||||
limited: limited || joined_since_last_sync,
|
||||
limited,
|
||||
prev_batch: prev_batch.as_ref().map(ToString::to_string),
|
||||
events: room_events,
|
||||
},
|
||||
|
|
@ -891,69 +1017,25 @@ async fn load_joined_room(
|
|||
unread_thread_notifications: BTreeMap::new(),
|
||||
};
|
||||
|
||||
Ok((joined_room, device_list_updates, left_encrypted_users))
|
||||
Ok((joined_room, device_list_updates))
|
||||
}
|
||||
|
||||
/// Calculate the "initial state", or all events from the start of the room up
|
||||
/// to (but not including) the `current_shortstatehash`. If
|
||||
/// `lazy_loading_witness` is `None`, lazy loading will be disabled.
|
||||
#[tracing::instrument(
|
||||
name = "state",
|
||||
name = "initial",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
full = %full_state,
|
||||
cs = %current_shortstatehash,
|
||||
ss = ?since_shortstatehash,
|
||||
)
|
||||
fields(current_shortstatehash)
|
||||
)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn calculate_state_changes(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
filter: &FilterDefinition,
|
||||
since_shortstatehash: Option<ShortStateHash>,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
joined_since_last_sync: bool,
|
||||
witness: Option<&Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
if since_shortstatehash.is_none() {
|
||||
calculate_state_initial(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
current_shortstatehash,
|
||||
witness,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
calculate_state_incremental(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state,
|
||||
filter,
|
||||
since_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "initial", level = "trace", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn calculate_state_initial(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
_filter: &FilterDefinition,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
witness: Option<&Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
lazy_loading_witness: Option<&Witness>,
|
||||
) -> Result<Vec<PduEvent>> {
|
||||
let (shortstatekeys, event_ids): (Vec<_>, Vec<_>) = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
|
|
@ -961,19 +1043,21 @@ async fn calculate_state_initial(
|
|||
.unzip()
|
||||
.await;
|
||||
|
||||
let state_events = services
|
||||
trace!("event ids for initial sync @ {:?}: {:?}", current_shortstatehash, event_ids);
|
||||
|
||||
services
|
||||
.rooms
|
||||
.short
|
||||
.multi_get_statekey_from_short(shortstatekeys.into_iter().stream())
|
||||
.zip(event_ids.into_iter().stream())
|
||||
.ready_filter_map(|item| Some((item.0.ok()?, item.1)))
|
||||
.ready_filter_map(|((event_type, state_key), event_id)| {
|
||||
let lazy = !full_state
|
||||
&& event_type == StateEventType::RoomMember
|
||||
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
|
||||
sender_user != user_id
|
||||
&& witness.is_some_and(|witness| !witness.contains(user_id))
|
||||
});
|
||||
let lazy = lazy_loading_witness.is_some_and(|witness| {
|
||||
event_type == StateEventType::RoomMember
|
||||
&& state_key.as_str().try_into().is_ok_and(|user_id: &UserId| {
|
||||
sender_user != user_id && !witness.contains(user_id)
|
||||
})
|
||||
});
|
||||
|
||||
lazy.or_some(event_id)
|
||||
})
|
||||
|
|
@ -981,46 +1065,25 @@ async fn calculate_state_initial(
|
|||
services.rooms.timeline.get_pdu(&event_id).await.ok()
|
||||
})
|
||||
.collect()
|
||||
.map(Ok);
|
||||
|
||||
let counts = calculate_counts(services, room_id, sender_user);
|
||||
let ((joined_member_count, invited_member_count, heroes), state_events) =
|
||||
try_join(counts, state_events).boxed().await?;
|
||||
|
||||
// The state_events above should contain all timeline_users, let's mark them as
|
||||
// lazy loaded.
|
||||
|
||||
Ok(StateChanges {
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
state_events,
|
||||
..Default::default()
|
||||
})
|
||||
.map(Ok)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Calculate the "incremental state", or all events from the
|
||||
/// `since_shortstatehash` up to (but not including)
|
||||
/// the `current_shortstatehash`. If `lazy_loading_witness` is `None`, lazy
|
||||
/// loading will be disabled.
|
||||
#[tracing::instrument(name = "incremental", level = "trace", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn calculate_state_incremental<'a>(
|
||||
services: &Services,
|
||||
sender_user: &'a UserId,
|
||||
room_id: &RoomId,
|
||||
full_state: bool,
|
||||
_filter: &FilterDefinition,
|
||||
since_shortstatehash: Option<ShortStateHash>,
|
||||
current_shortstatehash: ShortStateHash,
|
||||
joined_since_last_sync: bool,
|
||||
witness: Option<&'a Witness>,
|
||||
) -> Result<StateChanges> {
|
||||
lazy_loading_witness: Option<&'a Witness>,
|
||||
) -> Result<Vec<PduEvent>> {
|
||||
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
|
||||
|
||||
let encrypted_room = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
|
||||
.is_ok()
|
||||
.await;
|
||||
|
||||
let state_get_shorteventid = |user_id: &'a UserId| {
|
||||
services
|
||||
.rooms
|
||||
|
|
@ -1033,8 +1096,7 @@ async fn calculate_state_incremental<'a>(
|
|||
.ok()
|
||||
};
|
||||
|
||||
let lazy_state_ids: OptionFuture<_> = witness
|
||||
.filter(|_| !full_state && !encrypted_room)
|
||||
let lazy_state_ids: OptionFuture<_> = lazy_loading_witness
|
||||
.map(|witness| {
|
||||
StreamExt::into_future(
|
||||
witness
|
||||
|
|
@ -1045,36 +1107,15 @@ async fn calculate_state_incremental<'a>(
|
|||
})
|
||||
.into();
|
||||
|
||||
let state_diff_ids: OptionFuture<_> = (!full_state)
|
||||
.then(|| {
|
||||
StreamExt::into_future(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_added((since_shortstatehash, current_shortstatehash))
|
||||
.boxed(),
|
||||
)
|
||||
})
|
||||
.into();
|
||||
let state_diff_shortids = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_added((since_shortstatehash, current_shortstatehash))
|
||||
.boxed();
|
||||
|
||||
let current_state_ids: OptionFuture<_> = full_state
|
||||
.then(|| {
|
||||
StreamExt::into_future(
|
||||
services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_full_shortids(current_shortstatehash)
|
||||
.expect_ok()
|
||||
.boxed(),
|
||||
)
|
||||
})
|
||||
.into();
|
||||
|
||||
let state_events = current_state_ids
|
||||
.stream()
|
||||
.chain(state_diff_ids.stream())
|
||||
state_diff_shortids
|
||||
.broad_filter_map(|(shortstatekey, shorteventid)| async move {
|
||||
if witness.is_none() || encrypted_room {
|
||||
if lazy_loading_witness.is_none() {
|
||||
return Some(shorteventid);
|
||||
}
|
||||
|
||||
|
|
@ -1092,52 +1133,8 @@ async fn calculate_state_incremental<'a>(
|
|||
services.rooms.timeline.get_pdu(&event_id).await.ok()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let (device_list_updates, left_encrypted_users) = state_events
|
||||
.iter()
|
||||
.stream()
|
||||
.ready_filter(|_| encrypted_room)
|
||||
.ready_filter(|state_event| state_event.kind == RoomMember)
|
||||
.ready_filter_map(|state_event| {
|
||||
let content: RoomMemberEventContent = state_event.get_content().ok()?;
|
||||
let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?;
|
||||
|
||||
Some((content, user_id))
|
||||
})
|
||||
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
|
||||
use MembershipState::*;
|
||||
|
||||
let shares_encrypted_room =
|
||||
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
|
||||
|
||||
match content.membership {
|
||||
| Leave => leu.insert(user_id),
|
||||
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
|
||||
dlu.insert(user_id),
|
||||
| _ => false,
|
||||
};
|
||||
|
||||
(dlu, leu)
|
||||
})
|
||||
.await;
|
||||
|
||||
let send_member_count = state_events.iter().any(|event| event.kind == RoomMember);
|
||||
|
||||
let (joined_member_count, invited_member_count, heroes) = if send_member_count {
|
||||
calculate_counts(services, room_id, sender_user).await?
|
||||
} else {
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
Ok(StateChanges {
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
state_events,
|
||||
device_list_updates,
|
||||
left_encrypted_users,
|
||||
})
|
||||
.map(Ok)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn lazy_filter(
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ use ruma::{
|
|||
use super::{load_timeline, share_encrypted_room};
|
||||
use crate::{
|
||||
Ruma,
|
||||
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite},
|
||||
client::{DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite},
|
||||
};
|
||||
|
||||
type TodoRooms = BTreeMap<OwnedRoomId, (BTreeSet<TypeStateKey>, usize, u64)>;
|
||||
|
|
@ -155,7 +155,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
if body.extensions.account_data.enabled.unwrap_or(false) {
|
||||
account_data.global = services
|
||||
.account_data
|
||||
.changes_since(None, sender_user, globalsince, Some(next_batch))
|
||||
.changes_since(None, sender_user, Some(globalsince), Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||
.collect()
|
||||
.await;
|
||||
|
|
@ -166,7 +166,12 @@ pub(crate) async fn sync_events_v4_route(
|
|||
room.clone(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(&room), sender_user, globalsince, Some(next_batch))
|
||||
.changes_since(
|
||||
Some(&room),
|
||||
sender_user,
|
||||
Some(globalsince),
|
||||
Some(next_batch),
|
||||
)
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
|
|
@ -180,7 +185,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
device_list_changes.extend(
|
||||
services
|
||||
.users
|
||||
.keys_changed(sender_user, globalsince, None)
|
||||
.keys_changed(sender_user, Some(globalsince), None)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
.await,
|
||||
|
|
@ -318,7 +323,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
device_list_changes.extend(
|
||||
services
|
||||
.users
|
||||
.room_keys_changed(room_id, globalsince, None)
|
||||
.room_keys_changed(room_id, Some(globalsince), None)
|
||||
.map(|(user_id, _)| user_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
|
|
@ -514,11 +519,11 @@ pub(crate) async fn sync_events_v4_route(
|
|||
|
||||
(timeline_pdus, limited) = (Vec::new(), true);
|
||||
} else {
|
||||
(timeline_pdus, limited) = match load_timeline(
|
||||
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
|
||||
&services,
|
||||
sender_user,
|
||||
room_id,
|
||||
roomsincecount,
|
||||
Some(roomsincecount),
|
||||
None,
|
||||
*timeline_limit,
|
||||
)
|
||||
|
|
@ -536,7 +541,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
room_id.to_owned(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
|
||||
.changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
|
|
@ -562,7 +567,7 @@ pub(crate) async fn sync_events_v4_route(
|
|||
let mut vector: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.readreceipts_since(room_id, *roomsince)
|
||||
.readreceipts_since(room_id, Some(*roomsince))
|
||||
.filter_map(|(read_user, _ts, v)| async move {
|
||||
services
|
||||
.users
|
||||
|
|
|
|||
|
|
@ -39,7 +39,9 @@ use ruma::{
|
|||
use super::share_encrypted_room;
|
||||
use crate::{
|
||||
Ruma,
|
||||
client::{DEFAULT_BUMP_TYPES, ignored_filter, is_ignored_invite, sync::load_timeline},
|
||||
client::{
|
||||
DEFAULT_BUMP_TYPES, TimelinePdus, ignored_filter, is_ignored_invite, sync::load_timeline,
|
||||
},
|
||||
};
|
||||
|
||||
type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request);
|
||||
|
|
@ -411,11 +413,11 @@ where
|
|||
|
||||
(timeline_pdus, limited) = (Vec::new(), true);
|
||||
} else {
|
||||
(timeline_pdus, limited) = match load_timeline(
|
||||
TimelinePdus { pdus: timeline_pdus, limited } = match load_timeline(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
roomsincecount,
|
||||
Some(roomsincecount),
|
||||
Some(PduCount::from(next_batch)),
|
||||
*timeline_limit,
|
||||
)
|
||||
|
|
@ -434,7 +436,7 @@ where
|
|||
room_id.to_owned(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch))
|
||||
.changes_since(Some(room_id), sender_user, Some(*roomsince), Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
|
|
@ -460,7 +462,7 @@ where
|
|||
let mut receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
|
||||
.rooms
|
||||
.read_receipt
|
||||
.readreceipts_since(room_id, *roomsince)
|
||||
.readreceipts_since(room_id, Some(*roomsince))
|
||||
.filter_map(|(read_user, _ts, v)| async move {
|
||||
services
|
||||
.users
|
||||
|
|
@ -687,7 +689,7 @@ async fn collect_account_data(
|
|||
|
||||
account_data.global = services
|
||||
.account_data
|
||||
.changes_since(None, sender_user, globalsince, None)
|
||||
.changes_since(None, sender_user, Some(globalsince), None)
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||
.collect()
|
||||
.await;
|
||||
|
|
@ -698,7 +700,7 @@ async fn collect_account_data(
|
|||
room.clone(),
|
||||
services
|
||||
.account_data
|
||||
.changes_since(Some(room), sender_user, globalsince, None)
|
||||
.changes_since(Some(room), sender_user, Some(globalsince), None)
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect()
|
||||
.await,
|
||||
|
|
@ -732,7 +734,7 @@ where
|
|||
device_list_changes.extend(
|
||||
services
|
||||
.users
|
||||
.keys_changed(sender_user, globalsince, None)
|
||||
.keys_changed(sender_user, Some(globalsince), None)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
.await,
|
||||
|
|
@ -868,7 +870,7 @@ where
|
|||
device_list_changes.extend(
|
||||
services
|
||||
.users
|
||||
.room_keys_changed(room_id, globalsince, None)
|
||||
.room_keys_changed(room_id, Some(globalsince), None)
|
||||
.map(|(user_id, _)| user_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ where
|
|||
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
|
||||
where
|
||||
F: FnOnce(Self::Ok) -> U,
|
||||
Self: Send + Sized;
|
||||
Self: Sized;
|
||||
|
||||
fn ok(
|
||||
self,
|
||||
|
|
@ -100,7 +100,7 @@ where
|
|||
) -> MapOkOrElse<Self, impl FnOnce(Self::Ok) -> U, impl FnOnce(Self::Error) -> U>
|
||||
where
|
||||
F: FnOnce(Self::Ok) -> U,
|
||||
Self: Send + Sized,
|
||||
Self: Sized,
|
||||
{
|
||||
self.map_ok_or_else(|_| default, f)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,13 +129,15 @@ pub fn changes_since<'a>(
|
|||
&'a self,
|
||||
room_id: Option<&'a RoomId>,
|
||||
user_id: &'a UserId,
|
||||
since: u64,
|
||||
since: Option<u64>,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = AnyRawAccountDataEvent> + Send + 'a {
|
||||
type Key<'a> = (Option<&'a RoomId>, &'a UserId, u64, Ignore);
|
||||
|
||||
// Skip the data that's exactly at since, because we sent that last time
|
||||
let first_possible = (room_id, user_id, since.saturating_add(1));
|
||||
// ...unless this is an initial sync, in which case send everything
|
||||
let first_possible =
|
||||
(room_id, user_id, since.map(|since| since.saturating_add(1)).unwrap_or(0));
|
||||
|
||||
self.db
|
||||
.roomuserdataid_accountdata
|
||||
|
|
|
|||
|
|
@ -104,16 +104,16 @@ impl Service {
|
|||
Ok(Raw::from_json(event))
|
||||
}
|
||||
|
||||
/// Returns an iterator over the most recent read_receipts in a room that
|
||||
/// happened after the event with id `since`.
|
||||
/// Returns an iterator over the most recent read_receipts in a room,
|
||||
/// optionally after the event with id `since`.
|
||||
#[inline]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn readreceipts_since<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
since: u64,
|
||||
since: Option<u64>,
|
||||
) -> impl Stream<Item = ReceiptItem<'a>> + Send + 'a {
|
||||
self.db.readreceipts_since(room_id, since)
|
||||
self.db.readreceipts_since(room_id, since.unwrap_or(0))
|
||||
}
|
||||
|
||||
/// Sets a private read marker at PDU `count`.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use std::{borrow::Borrow, ops::Deref, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Result, at, err, implement,
|
||||
Pdu, Result, at, err, implement,
|
||||
matrix::{Event, StateKey},
|
||||
pair_of,
|
||||
utils::{
|
||||
|
|
@ -125,7 +125,7 @@ pub async fn state_get(
|
|||
shortstatehash: ShortStateHash,
|
||||
event_type: &StateEventType,
|
||||
state_key: &str,
|
||||
) -> Result<impl Event> {
|
||||
) -> Result<Pdu> {
|
||||
self.state_get_id(shortstatehash, event_type, state_key)
|
||||
.and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -423,7 +423,7 @@ impl Service {
|
|||
let keys_changed = self
|
||||
.services
|
||||
.users
|
||||
.room_keys_changed(room_id, since.0, None)
|
||||
.room_keys_changed(room_id, Some(since.0), None)
|
||||
.ready_filter(|(user_id, _)| self.services.globals.user_is_local(user_id));
|
||||
|
||||
pin_mut!(keys_changed);
|
||||
|
|
@ -520,7 +520,7 @@ impl Service {
|
|||
let receipts = self
|
||||
.services
|
||||
.read_receipt
|
||||
.readreceipts_since(room_id, since.0);
|
||||
.readreceipts_since(room_id, Some(since.0));
|
||||
|
||||
pin_mut!(receipts);
|
||||
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
|
||||
|
|
|
|||
|
|
@ -790,7 +790,7 @@ impl Service {
|
|||
pub fn keys_changed<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
from: u64,
|
||||
from: Option<u64>,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = &'a UserId> + Send + 'a {
|
||||
self.keys_changed_user_or_room(user_id.as_str(), from, to)
|
||||
|
|
@ -801,7 +801,7 @@ impl Service {
|
|||
pub fn room_keys_changed<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
from: u64,
|
||||
from: Option<u64>,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
|
||||
self.keys_changed_user_or_room(room_id.as_str(), from, to)
|
||||
|
|
@ -810,11 +810,12 @@ impl Service {
|
|||
fn keys_changed_user_or_room<'a>(
|
||||
&'a self,
|
||||
user_or_room_id: &'a str,
|
||||
from: u64,
|
||||
from: Option<u64>,
|
||||
to: Option<u64>,
|
||||
) -> impl Stream<Item = (&'a UserId, u64)> + Send + 'a {
|
||||
type KeyVal<'a> = ((&'a str, u64), &'a UserId);
|
||||
|
||||
let from = from.unwrap_or(0);
|
||||
let to = to.unwrap_or(u64::MAX);
|
||||
let start = (user_or_room_id, from.saturating_add(1));
|
||||
self.db
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue