From 987c5eeb03e696330a15e9693fcd52eac6ae9ce3 Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Tue, 3 Jun 2025 21:31:02 +0100 Subject: [PATCH] refactor: Promote handling unsigned data out of timeline Also fixes: - Transaction IDs leaking in event route - Age not being set for event relations or threads - Both of the above for search results Notes down concern with relations table --- src/admin/query/room_timeline.rs | 4 +-- src/api/client/context.rs | 16 +++++++++-- src/api/client/message.rs | 9 ++++-- src/api/client/relations.rs | 4 +++ src/api/client/room/event.rs | 2 +- src/api/client/room/initial_sync.rs | 9 +++++- src/api/client/sync/mod.rs | 24 ++++++++-------- src/api/client/sync/v3/state.rs | 2 +- src/api/client/threads.rs | 3 ++ src/api/server/backfill.rs | 12 +++++++- src/core/matrix/pdu/unsigned.rs | 17 ++++++++++-- src/service/rooms/pdu_metadata/data.rs | 5 +--- src/service/rooms/search/mod.rs | 7 ++++- src/service/rooms/threads/mod.rs | 4 +-- src/service/rooms/timeline/append.rs | 4 +++ src/service/rooms/timeline/data.rs | 38 ++++++++------------------ src/service/rooms/timeline/mod.rs | 24 ++++++---------- src/service/sending/sender.rs | 2 +- 18 files changed, 111 insertions(+), 75 deletions(-) diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index 0f72b58c..2ca46944 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { .services .rooms .timeline - .last_timeline_count(None, &room_id) + .last_timeline_count(&room_id) .await?; self.write_str(&format!("{result:#?}")).await @@ -52,7 +52,7 @@ pub(super) async fn pdus( .services .rooms .timeline - .pdus_rev(None, &room_id, from) + .pdus_rev(&room_id, from) .try_take(limit.unwrap_or(3)) .try_collect() .await?; diff --git a/src/api/client/context.rs b/src/api/client/context.rs index ccd77bea..38fe28b2 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -82,11 +82,18 @@ pub(crate) async fn get_context_route( let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user); + // PDUs are used to get seen user IDs and then returned in response. + let events_before = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(base_count)) + .pdus_rev(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) @@ -96,8 +103,13 @@ pub(crate) async fn get_context_route( let events_after = services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(base_count)) + .pdus(room_id, Some(base_count)) .ignore_err() + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_filter_map(|item| event_filter(item, filter)) .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index a7b364a0..5a0b65b9 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -122,14 +122,14 @@ pub(crate) async fn get_message_events_route( | Direction::Forward => services .rooms .timeline - .pdus(Some(sender_user), room_id, Some(from)) + .pdus(room_id, Some(from)) .ignore_err() .boxed(), | Direction::Backward => services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(from)) + .pdus_rev(room_id, Some(from)) .ignore_err() .boxed(), }; @@ -140,6 +140,11 @@ pub(crate) async fn get_message_events_route( .wide_filter_map(|item| ignored_filter(&services, item, sender_user)) .wide_filter_map(|item| visibility_filter(&services, item, sender_user)) .take(limit) + .then(async |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .collect() .await; diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 27e779a3..3586631c 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -212,6 +212,10 @@ async fn paginate_relations_with_filter( }) } +// TODO: Can we move the visibility filter lower down, to avoid checking events +// that won't be sent? At the moment this also results in getting events that +// appear to have no relation because intermediaries are not visible to the +// user. async fn visibility_filter( services: &Services, sender_user: &UserId, diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 75ae0758..baafafd6 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -33,7 +33,7 @@ pub(crate) async fn get_room_event_route( return Err!(Request(Forbidden("You don't have permission to view this event."))); } - event.add_age().ok(); + event.set_unsigned(body.sender_user.as_deref()); Ok(get_room_event::v3::Response { event: event.into_format() }) } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index d40f6b4f..e5d8a8e8 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -40,12 +40,19 @@ pub(crate) async fn room_initial_sync_route( .map_ok(Event::into_format) .try_collect::>(); + // Events are returned in body + let limit = LIMIT_MAX; let events = services .rooms .timeline - .pdus_rev(None, room_id, None) + .pdus_rev(room_id, None) .try_take(limit) + .and_then(async |mut pdu| { + pdu.1.set_unsigned(body.sender_user.as_deref()); + // TODO: bundled aggregations + Ok(pdu) + }) .try_collect::>(); let (membership, visibility, state, events) = diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index f47b591c..83ff34fd 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -53,7 +53,7 @@ async fn load_timeline( let last_timeline_count = services .rooms .timeline - .last_timeline_count(Some(sender_user), room_id) + .last_timeline_count(room_id) .await .map_err(|err| { err!(Database(warn!("Failed to fetch end of room timeline: {}", err))) @@ -71,12 +71,13 @@ async fn load_timeline( services .rooms .timeline - .pdus_rev( - Some(sender_user), - room_id, - ending_count.map(|count| count.saturating_add(1)), - ) + .pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1))) .ignore_err() + .map(move |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .ready_take_while(move |&(pducount, _)| pducount > starting_count) .boxed() }, @@ -86,12 +87,13 @@ async fn load_timeline( services .rooms .timeline - .pdus_rev( - Some(sender_user), - room_id, - ending_count.map(|count| count.saturating_add(1)), - ) + .pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1))) .ignore_err() + .map(move |mut pdu| { + pdu.1.set_unsigned(Some(sender_user)); + // TODO: bundled aggregations + pdu + }) .boxed() }, }; diff --git a/src/api/client/sync/v3/state.rs b/src/api/client/sync/v3/state.rs index e1f016f7..0f20f797 100644 --- a/src/api/client/sync/v3/state.rs +++ b/src/api/client/sync/v3/state.rs @@ -127,7 +127,7 @@ pub(super) async fn build_state_incremental<'a>( let last_pdu_of_last_sync = services .rooms .timeline - .pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1))) + .pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1))) .boxed() .next() .await diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index ca176eda..19cca5f7 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -31,6 +31,9 @@ pub(crate) async fn get_threads_route( .transpose()? .unwrap_or_else(PduCount::max); + // TODO: bundled aggregation + // TODO: user_can_see_event and set_unsigned should be at the same level / + // function, so unsigned is only set for seen events. let threads: Vec<(PduCount, PduEvent)> = services .rooms .threads diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index 6b80c098..ba3fb8a4 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -3,6 +3,7 @@ use std::cmp; use axum::extract::State; use conduwuit::{ Event, PduCount, Result, + result::LogErr, utils::{IterStream, ReadyExt, stream::TryTools}, }; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route( pdus: services .rooms .timeline - .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) + .pdus_rev(&body.room_id, Some(from.saturating_add(1))) .try_take(limit) .try_filter_map(|(_, pdu)| async move { Ok(services @@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route( .await .then_some(pdu)) }) + .and_then(async |mut pdu| { + // Strip the transaction ID, as that is private + pdu.remove_transaction_id().log_err().ok(); + // Add age, as this is specified + pdu.add_age().log_err().ok(); + // It's not clear if we should strip or add any more data, leave as is. + // In particular: Redaction? + Ok(pdu) + }) .try_filter_map(|pdu| async move { Ok(services .rooms diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 0c58bb68..d3da1053 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,10 +1,23 @@ -use std::collections::BTreeMap; +use std::{borrow::Borrow, collections::BTreeMap}; use ruma::MilliSecondsSinceUnixEpoch; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement}; +use crate::{Result, err, implement, result::LogErr}; + +/// Set the `unsigned` field of the PDU using only information in the PDU. +/// Some unsigned data is already set within the database (eg. prev events, +/// threads). Once this is done, other data must be calculated from the database +/// (eg. relations) This is for server-to-client events. +/// Backfill handles this itself. +#[implement(Pdu)] +pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) { + if Some(self.sender.borrow()) != user_id { + self.remove_transaction_id().log_err().ok(); + } + self.add_age().log_err().ok(); +} #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 854c6ea0..6a765f13 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc}; use conduwuit::{ arrayvec::ArrayVec, matrix::{Event, PduCount}, - result::LogErr, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, @@ -92,9 +91,7 @@ impl Data { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; - if pdu.sender() != user_id { - pdu.as_mut_pdu().remove_transaction_id().log_err().ok(); - } + pdu.as_mut_pdu().set_unsigned(Some(user_id)); Some((shorteventid, pdu)) }) diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 9bea0264..ac59cd81 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -129,7 +129,12 @@ pub async fn search_pdus<'a>( .then_some(pdu) }) .skip(query.skip) - .take(query.limit); + .take(query.limit) + .map(move |mut pdu| { + pdu.set_unsigned(query.user_id); + // TODO: bundled aggregation + pdu + }); Ok((count, pdus)) } diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index 59319ba6..58d11cce 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -163,9 +163,7 @@ impl Service { let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let pdu_id: PduId = pdu_id.into(); - if pdu.sender() != user_id { - pdu.as_mut_pdu().remove_transaction_id().ok(); - } + pdu.as_mut_pdu().set_unsigned(Some(user_id)); Some((pdu_id.shorteventid, pdu)) }); diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index c141577d..2d405820 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -347,6 +347,10 @@ where | _ => {}, } + // CONCERN: If we receive events with a relation out-of-order, we never write + // their relation / thread. We need some kind of way to trigger when we receive + // this event, and potentially a way to rebuild the table entirely. + if let Ok(content) = pdu.get_content::() { if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await { self.services diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 9064df6c..69f4f4a6 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -1,13 +1,13 @@ -use std::{borrow::Borrow, sync::Arc}; +use std::sync::Arc; use conduwuit::{ Err, PduCount, PduEvent, Result, at, err, - result::{LogErr, NotFound}, + result::NotFound, utils::{self, stream::TryReadyExt}, }; use database::{Database, Deserialized, Json, KeyVal, Map}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut}; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction}; +use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction}; use super::{PduId, RawPduId}; use crate::{Dep, rooms, rooms::short::ShortRoomId}; @@ -45,12 +45,8 @@ impl Data { } #[inline] - pub(super) async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); let last_count = pdus_rev @@ -64,12 +60,8 @@ impl Data { } #[inline] - pub(super) async fn latest_pdu_in_room( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); + pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + let pdus_rev = self.pdus_rev(room_id, PduCount::max()); pin_mut!(pdus_rev); pdus_rev @@ -221,7 +213,6 @@ impl Data { /// order. pub(super) fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: PduCount, ) -> impl Stream> + Send + 'a { @@ -231,14 +222,13 @@ impl Data { self.pduid_pdu .rev_raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } pub(super) fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: PduCount, ) -> impl Stream> + Send + 'a { @@ -248,21 +238,15 @@ impl Data { self.pduid_pdu .raw_stream_from(¤t) .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) + .ready_and_then(Self::from_json_slice) }) .try_flatten_stream() } - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { + fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result { let pdu_id: RawPduId = pdu_id.into(); - let mut pdu = serde_json::from_slice::(pdu)?; - - if Some(pdu.sender.borrow()) != user_id { - pdu.remove_transaction_id().log_err().ok(); - } - - pdu.add_age().log_err().ok(); + let pdu = serde_json::from_slice::(pdu)?; Ok((pdu_id.pdu_count(), pdu)) } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 252e7bb5..a35b502c 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -20,7 +20,7 @@ use conduwuit_core::{ }; use futures::{Future, Stream, TryStreamExt, pin_mut}; use ruma::{ - CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, + CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, events::room::encrypted::Relation, }; use serde::Deserialize; @@ -138,7 +138,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> { - let pdus = self.pdus(None, room_id, None); + let pdus = self.pdus(room_id, None); pin_mut!(pdus); pdus.try_next() @@ -148,16 +148,12 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.db.latest_pdu_in_room(None, room_id).await + self.db.latest_pdu_in_room(room_id).await } #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - self.db.last_timeline_count(sender_user, room_id).await + pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result { + self.db.last_timeline_count(room_id).await } /// Returns the `count` of this pdu's id. @@ -235,33 +231,29 @@ impl Service { #[inline] pub fn all_pdus<'a>( &'a self, - user_id: &'a UserId, room_id: &'a RoomId, ) -> impl Stream + Send + 'a { - self.pdus(Some(user_id), room_id, None).ignore_err() + self.pdus(room_id, None).ignore_err() } /// Reverse iteration starting after `until`. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus_rev<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, until: Option, ) -> impl Stream> + Send + 'a { self.db - .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) + .pdus_rev(room_id, until.unwrap_or_else(PduCount::max)) } /// Forward iteration starting after `from`. #[tracing::instrument(skip(self), level = "debug")] pub fn pdus<'a>( &'a self, - user_id: Option<&'a UserId>, room_id: &'a RoomId, from: Option, ) -> impl Stream> + Send + 'a { - self.db - .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) + self.db.pdus(room_id, from.unwrap_or_else(PduCount::min)) } } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 1cf3763c..1b5d734e 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -781,7 +781,7 @@ impl Service { for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) - if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) { + if pdu.is_redacted() { continue; }