refactor: Promote handling unsigned data out of timeline
Also fixes: - Transaction IDs leaking in event route - Age not being set for event relations or threads - Both of the above for search results Notes down concern with relations table
This commit is contained in:
parent
7fa4fa9862
commit
987c5eeb03
18 changed files with 111 additions and 75 deletions
|
|
@ -31,7 +31,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
|
|||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(None, &room_id)
|
||||
.last_timeline_count(&room_id)
|
||||
.await?;
|
||||
|
||||
self.write_str(&format!("{result:#?}")).await
|
||||
|
|
@ -52,7 +52,7 @@ pub(super) async fn pdus(
|
|||
.services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &room_id, from)
|
||||
.pdus_rev(&room_id, from)
|
||||
.try_take(limit.unwrap_or(3))
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
|
|
|||
|
|
@ -82,11 +82,18 @@ pub(crate) async fn get_context_route(
|
|||
|
||||
let base_event = ignored_filter(&services, (base_count, base_pdu), sender_user);
|
||||
|
||||
// PDUs are used to get seen user IDs and then returned in response.
|
||||
|
||||
let events_before = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus_rev(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
|
|
@ -96,8 +103,13 @@ pub(crate) async fn get_context_route(
|
|||
let events_after = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(base_count))
|
||||
.pdus(room_id, Some(base_count))
|
||||
.ignore_err()
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_filter_map(|item| event_filter(item, filter))
|
||||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
|
|
|
|||
|
|
@ -122,14 +122,14 @@ pub(crate) async fn get_message_events_route(
|
|||
| Direction::Forward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus(Some(sender_user), room_id, Some(from))
|
||||
.pdus(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
|
||||
| Direction::Backward => services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(from))
|
||||
.pdus_rev(room_id, Some(from))
|
||||
.ignore_err()
|
||||
.boxed(),
|
||||
};
|
||||
|
|
@ -140,6 +140,11 @@ pub(crate) async fn get_message_events_route(
|
|||
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
|
||||
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
|
||||
.take(limit)
|
||||
.then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -212,6 +212,10 @@ async fn paginate_relations_with_filter(
|
|||
})
|
||||
}
|
||||
|
||||
// TODO: Can we move the visibility filter lower down, to avoid checking events
|
||||
// that won't be sent? At the moment this also results in getting events that
|
||||
// appear to have no relation because intermediaries are not visible to the
|
||||
// user.
|
||||
async fn visibility_filter<Pdu: Event + Send + Sync>(
|
||||
services: &Services,
|
||||
sender_user: &UserId,
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ pub(crate) async fn get_room_event_route(
|
|||
return Err!(Request(Forbidden("You don't have permission to view this event.")));
|
||||
}
|
||||
|
||||
event.add_age().ok();
|
||||
event.set_unsigned(body.sender_user.as_deref());
|
||||
|
||||
Ok(get_room_event::v3::Response { event: event.into_format() })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,12 +40,19 @@ pub(crate) async fn room_initial_sync_route(
|
|||
.map_ok(Event::into_format)
|
||||
.try_collect::<Vec<_>>();
|
||||
|
||||
// Events are returned in body
|
||||
|
||||
let limit = LIMIT_MAX;
|
||||
let events = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, room_id, None)
|
||||
.pdus_rev(room_id, None)
|
||||
.try_take(limit)
|
||||
.and_then(async |mut pdu| {
|
||||
pdu.1.set_unsigned(body.sender_user.as_deref());
|
||||
// TODO: bundled aggregations
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_collect::<Vec<_>>();
|
||||
|
||||
let (membership, visibility, state, events) =
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ async fn load_timeline(
|
|||
let last_timeline_count = services
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(Some(sender_user), room_id)
|
||||
.last_timeline_count(room_id)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
err!(Database(warn!("Failed to fetch end of room timeline: {}", err)))
|
||||
|
|
@ -71,12 +71,13 @@ async fn load_timeline(
|
|||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
|
||||
.ignore_err()
|
||||
.map(move |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.ready_take_while(move |&(pducount, _)| pducount > starting_count)
|
||||
.boxed()
|
||||
},
|
||||
|
|
@ -86,12 +87,13 @@ async fn load_timeline(
|
|||
services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(
|
||||
Some(sender_user),
|
||||
room_id,
|
||||
ending_count.map(|count| count.saturating_add(1)),
|
||||
)
|
||||
.pdus_rev(room_id, ending_count.map(|count| count.saturating_add(1)))
|
||||
.ignore_err()
|
||||
.map(move |mut pdu| {
|
||||
pdu.1.set_unsigned(Some(sender_user));
|
||||
// TODO: bundled aggregations
|
||||
pdu
|
||||
})
|
||||
.boxed()
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ pub(super) async fn build_state_incremental<'a>(
|
|||
let last_pdu_of_last_sync = services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(Some(sender_user), room_id, Some(last_sync_end_count.saturating_add(1)))
|
||||
.pdus_rev(room_id, Some(last_sync_end_count.saturating_add(1)))
|
||||
.boxed()
|
||||
.next()
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -31,6 +31,9 @@ pub(crate) async fn get_threads_route(
|
|||
.transpose()?
|
||||
.unwrap_or_else(PduCount::max);
|
||||
|
||||
// TODO: bundled aggregation
|
||||
// TODO: user_can_see_event and set_unsigned should be at the same level /
|
||||
// function, so unsigned is only set for seen events.
|
||||
let threads: Vec<(PduCount, PduEvent)> = services
|
||||
.rooms
|
||||
.threads
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use std::cmp;
|
|||
use axum::extract::State;
|
||||
use conduwuit::{
|
||||
Event, PduCount, Result,
|
||||
result::LogErr,
|
||||
utils::{IterStream, ReadyExt, stream::TryTools},
|
||||
};
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
|
|
@ -62,7 +63,7 @@ pub(crate) async fn get_backfill_route(
|
|||
pdus: services
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
||||
.pdus_rev(&body.room_id, Some(from.saturating_add(1)))
|
||||
.try_take(limit)
|
||||
.try_filter_map(|(_, pdu)| async move {
|
||||
Ok(services
|
||||
|
|
@ -72,6 +73,15 @@ pub(crate) async fn get_backfill_route(
|
|||
.await
|
||||
.then_some(pdu))
|
||||
})
|
||||
.and_then(async |mut pdu| {
|
||||
// Strip the transaction ID, as that is private
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
// Add age, as this is specified
|
||||
pdu.add_age().log_err().ok();
|
||||
// It's not clear if we should strip or add any more data, leave as is.
|
||||
// In particular: Redaction?
|
||||
Ok(pdu)
|
||||
})
|
||||
.try_filter_map(|pdu| async move {
|
||||
Ok(services
|
||||
.rooms
|
||||
|
|
|
|||
|
|
@ -1,10 +1,23 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::{borrow::Borrow, collections::BTreeMap};
|
||||
|
||||
use ruma::MilliSecondsSinceUnixEpoch;
|
||||
use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value};
|
||||
|
||||
use super::Pdu;
|
||||
use crate::{Result, err, implement};
|
||||
use crate::{Result, err, implement, result::LogErr};
|
||||
|
||||
/// Set the `unsigned` field of the PDU using only information in the PDU.
|
||||
/// Some unsigned data is already set within the database (eg. prev events,
|
||||
/// threads). Once this is done, other data must be calculated from the database
|
||||
/// (eg. relations) This is for server-to-client events.
|
||||
/// Backfill handles this itself.
|
||||
#[implement(Pdu)]
|
||||
pub fn set_unsigned(&mut self, user_id: Option<&ruma::UserId>) {
|
||||
if Some(self.sender.borrow()) != user_id {
|
||||
self.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
self.add_age().log_err().ok();
|
||||
}
|
||||
|
||||
#[implement(Pdu)]
|
||||
pub fn remove_transaction_id(&mut self) -> Result {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ use std::{mem::size_of, sync::Arc};
|
|||
use conduwuit::{
|
||||
arrayvec::ArrayVec,
|
||||
matrix::{Event, PduCount},
|
||||
result::LogErr,
|
||||
utils::{
|
||||
ReadyExt,
|
||||
stream::{TryIgnore, WidebandExt},
|
||||
|
|
@ -92,9 +91,7 @@ impl Data {
|
|||
|
||||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
if pdu.sender() != user_id {
|
||||
pdu.as_mut_pdu().remove_transaction_id().log_err().ok();
|
||||
}
|
||||
pdu.as_mut_pdu().set_unsigned(Some(user_id));
|
||||
|
||||
Some((shorteventid, pdu))
|
||||
})
|
||||
|
|
|
|||
|
|
@ -129,7 +129,12 @@ pub async fn search_pdus<'a>(
|
|||
.then_some(pdu)
|
||||
})
|
||||
.skip(query.skip)
|
||||
.take(query.limit);
|
||||
.take(query.limit)
|
||||
.map(move |mut pdu| {
|
||||
pdu.set_unsigned(query.user_id);
|
||||
// TODO: bundled aggregation
|
||||
pdu
|
||||
});
|
||||
|
||||
Ok((count, pdus))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -163,9 +163,7 @@ impl Service {
|
|||
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
|
||||
|
||||
let pdu_id: PduId = pdu_id.into();
|
||||
if pdu.sender() != user_id {
|
||||
pdu.as_mut_pdu().remove_transaction_id().ok();
|
||||
}
|
||||
pdu.as_mut_pdu().set_unsigned(Some(user_id));
|
||||
|
||||
Some((pdu_id.shorteventid, pdu))
|
||||
});
|
||||
|
|
|
|||
|
|
@ -347,6 +347,10 @@ where
|
|||
| _ => {},
|
||||
}
|
||||
|
||||
// CONCERN: If we receive events with a relation out-of-order, we never write
|
||||
// their relation / thread. We need some kind of way to trigger when we receive
|
||||
// this event, and potentially a way to rebuild the table entirely.
|
||||
|
||||
if let Ok(content) = pdu.get_content::<ExtractRelatesToEventId>() {
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&content.relates_to.event_id).await {
|
||||
self.services
|
||||
|
|
|
|||
|
|
@ -1,13 +1,13 @@
|
|||
use std::{borrow::Borrow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use conduwuit::{
|
||||
Err, PduCount, PduEvent, Result, at, err,
|
||||
result::{LogErr, NotFound},
|
||||
result::NotFound,
|
||||
utils::{self, stream::TryReadyExt},
|
||||
};
|
||||
use database::{Database, Deserialized, Json, KeyVal, Map};
|
||||
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt, future::select_ok, pin_mut};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, api::Direction};
|
||||
|
||||
use super::{PduId, RawPduId};
|
||||
use crate::{Dep, rooms, rooms::short::ShortRoomId};
|
||||
|
|
@ -45,12 +45,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
let last_count = pdus_rev
|
||||
|
|
@ -64,12 +60,8 @@ impl Data {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) async fn latest_pdu_in_room(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
|
||||
pub(super) async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
|
||||
let pdus_rev = self.pdus_rev(room_id, PduCount::max());
|
||||
|
||||
pin_mut!(pdus_rev);
|
||||
pdus_rev
|
||||
|
|
@ -221,7 +213,6 @@ impl Data {
|
|||
/// order.
|
||||
pub(super) fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
|
@ -231,14 +222,13 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.rev_raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
pub(super) fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: PduCount,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
|
|
@ -248,21 +238,15 @@ impl Data {
|
|||
self.pduid_pdu
|
||||
.raw_stream_from(¤t)
|
||||
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
|
||||
.ready_and_then(move |item| Self::each_pdu(item, user_id))
|
||||
.ready_and_then(Self::from_json_slice)
|
||||
})
|
||||
.try_flatten_stream()
|
||||
}
|
||||
|
||||
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
|
||||
fn from_json_slice((pdu_id, pdu): KeyVal<'_>) -> Result<PdusIterItem> {
|
||||
let pdu_id: RawPduId = pdu_id.into();
|
||||
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
if Some(pdu.sender.borrow()) != user_id {
|
||||
pdu.remove_transaction_id().log_err().ok();
|
||||
}
|
||||
|
||||
pdu.add_age().log_err().ok();
|
||||
let pdu = serde_json::from_slice::<PduEvent>(pdu)?;
|
||||
|
||||
Ok((pdu_id.pdu_count(), pdu))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ use conduwuit_core::{
|
|||
};
|
||||
use futures::{Future, Stream, TryStreamExt, pin_mut};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
|
||||
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId,
|
||||
events::room::encrypted::Relation,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
|
|
@ -138,7 +138,7 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
|
||||
let pdus = self.pdus(None, room_id, None);
|
||||
let pdus = self.pdus(room_id, None);
|
||||
|
||||
pin_mut!(pdus);
|
||||
pdus.try_next()
|
||||
|
|
@ -148,16 +148,12 @@ impl Service {
|
|||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
|
||||
self.db.latest_pdu_in_room(None, room_id).await
|
||||
self.db.latest_pdu_in_room(room_id).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub async fn last_timeline_count(
|
||||
&self,
|
||||
sender_user: Option<&UserId>,
|
||||
room_id: &RoomId,
|
||||
) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(sender_user, room_id).await
|
||||
pub async fn last_timeline_count(&self, room_id: &RoomId) -> Result<PduCount> {
|
||||
self.db.last_timeline_count(room_id).await
|
||||
}
|
||||
|
||||
/// Returns the `count` of this pdu's id.
|
||||
|
|
@ -235,33 +231,29 @@ impl Service {
|
|||
#[inline]
|
||||
pub fn all_pdus<'a>(
|
||||
&'a self,
|
||||
user_id: &'a UserId,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
|
||||
self.pdus(Some(user_id), room_id, None).ignore_err()
|
||||
self.pdus(room_id, None).ignore_err()
|
||||
}
|
||||
|
||||
/// Reverse iteration starting after `until`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus_rev<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
until: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
|
||||
.pdus_rev(room_id, until.unwrap_or_else(PduCount::max))
|
||||
}
|
||||
|
||||
/// Forward iteration starting after `from`.
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn pdus<'a>(
|
||||
&'a self,
|
||||
user_id: Option<&'a UserId>,
|
||||
room_id: &'a RoomId,
|
||||
from: Option<PduCount>,
|
||||
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
|
||||
self.db
|
||||
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
|
||||
self.db.pdus(room_id, from.unwrap_or_else(PduCount::min))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -781,7 +781,7 @@ impl Service {
|
|||
|
||||
for pdu in pdus {
|
||||
// Redacted events are not notification targets (we don't send push for them)
|
||||
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
|
||||
if pdu.is_redacted() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue