continuwuity/src/api/client/message.rs
Jade Ellis 3a95585f0e
fix: Disambiguate appservices in lazy loading context
In the previous commit, app services would all appear to be the same
device when accessing the same user. This sets the device ID to be the
appservice ID when available to avoid possible clobbering.
2025-04-18 14:00:31 +01:00

318 lines
6.9 KiB
Rust

use axum::extract::State;
use conduwuit::{
Err, Result, at,
matrix::{
Event,
pdu::{PduCount, PduEvent},
},
utils::{
IterStream, ReadyExt,
result::{FlatOk, LogErr},
stream::{BroadbandExt, TryIgnore, WidebandExt},
},
};
use conduwuit_service::{
Services,
rooms::{
lazy_loading,
lazy_loading::{Options, Witness},
timeline::PdusIterItem,
},
};
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, pin_mut};
use ruma::{
DeviceId, RoomId, UserId,
api::{
Direction,
client::{filter::RoomEventFilter, message::get_message_events},
},
events::{
AnyStateEvent, StateEventType,
TimelineEventType::{self, *},
},
serde::Raw,
};
use crate::Ruma;
/// list of safe and common non-state events to ignore if the user is ignored
const IGNORED_MESSAGE_TYPES: &[TimelineEventType] = &[
Audio,
CallInvite,
Emote,
File,
Image,
KeyVerificationStart,
Location,
PollStart,
UnstablePollStart,
Beacon,
Reaction,
RoomEncrypted,
RoomMessage,
Sticker,
Video,
Voice,
CallNotify,
];
const LIMIT_MAX: usize = 100;
const LIMIT_DEFAULT: usize = 10;
/// # `GET /_matrix/client/r0/rooms/{roomId}/messages`
///
/// Allows paginating through room history.
///
/// - Only works if the user is joined (TODO: always allow, but only show events
/// where the user was joined, depending on `history_visibility`)
pub(crate) async fn get_message_events_route(
State(services): State<crate::State>,
body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
let sender_user = body.sender_user();
let sender_device = body.sender_device.as_ref();
let room_id = &body.room_id;
let filter = &body.filter;
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(Forbidden("Room does not exist to this server")));
}
let from: PduCount = body
.from
.as_deref()
.map(str::parse)
.transpose()?
.unwrap_or_else(|| match body.dir {
| Direction::Forward => PduCount::min(),
| Direction::Backward => PduCount::max(),
});
let to: Option<PduCount> = body.to.as_deref().map(str::parse).flat_ok();
let limit: usize = body
.limit
.try_into()
.unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX);
if matches!(body.dir, Direction::Backward) {
services
.rooms
.timeline
.backfill_if_required(room_id, from)
.boxed()
.await
.log_err()
.ok();
}
let it = match body.dir {
| Direction::Forward => services
.rooms
.timeline
.pdus(Some(sender_user), room_id, Some(from))
.ignore_err()
.boxed(),
| Direction::Backward => services
.rooms
.timeline
.pdus_rev(Some(sender_user), room_id, Some(from))
.ignore_err()
.boxed(),
};
let events: Vec<_> = it
.ready_take_while(|(count, _)| Some(*count) != to)
.ready_filter_map(|item| event_filter(item, filter))
.wide_filter_map(|item| ignored_filter(&services, item, sender_user))
.wide_filter_map(|item| visibility_filter(&services, item, sender_user))
.take(limit)
.collect()
.await;
// let appservice_id = body.appservice_info.map(|appservice|
// appservice.registration.id);
let lazy_loading_context = lazy_loading::Context {
user_id: sender_user,
device_id: match sender_device {
| Some(device_id) => device_id,
| None =>
if let Some(registration) = body.appservice_info.as_ref() {
<&DeviceId>::from(registration.registration.id.as_str())
} else {
<&DeviceId>::from("")
},
},
room_id,
token: Some(from.into_unsigned()),
options: Some(&filter.lazy_load_options),
};
let witness: OptionFuture<_> = filter
.lazy_load_options
.is_enabled()
.then(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()))
.into();
let state = witness
.map(Option::into_iter)
.map(|option| option.flat_map(Witness::into_iter))
.map(IterStream::stream)
.into_stream()
.flatten()
.broad_filter_map(|user_id| async move {
get_member_event(&services, room_id, &user_id).await
})
.collect()
.await;
let next_token = events.last().map(at!(0));
let chunk = events
.into_iter()
.map(at!(1))
.map(PduEvent::into_room_event)
.collect();
Ok(get_message_events::v3::Response {
start: from.to_string(),
end: next_token.as_ref().map(ToString::to_string),
chunk,
state,
})
}
pub(crate) async fn lazy_loading_witness<'a, I>(
services: &Services,
lazy_loading_context: &lazy_loading::Context<'_>,
events: I,
) -> Witness
where
I: Iterator<Item = &'a PdusIterItem> + Clone + Send,
{
let oldest = events
.clone()
.map(|(count, _)| count)
.copied()
.min()
.unwrap_or_else(PduCount::max);
let newest = events
.clone()
.map(|(count, _)| count)
.copied()
.max()
.unwrap_or_else(PduCount::max);
let receipts = services
.rooms
.read_receipt
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());
pin_mut!(receipts);
let witness: Witness = events
.stream()
.map(|(_, pdu)| pdu.sender.clone())
.chain(
receipts
.ready_take_while(|(_, c, _)| *c <= newest.into_unsigned())
.map(|(user_id, ..)| user_id.to_owned()),
)
.collect()
.await;
services
.rooms
.lazy_loading
.witness_retain(witness, lazy_loading_context)
.await
}
async fn get_member_event(
services: &Services,
room_id: &RoomId,
user_id: &UserId,
) -> Option<Raw<AnyStateEvent>> {
services
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())
.map_ok(PduEvent::into_state_event)
.await
.ok()
}
#[inline]
pub(crate) async fn ignored_filter(
services: &Services,
item: PdusIterItem,
user_id: &UserId,
) -> Option<PdusIterItem> {
let (_, ref pdu) = item;
is_ignored_pdu(services, pdu, user_id)
.await
.eq(&false)
.then_some(item)
}
#[inline]
pub(crate) async fn is_ignored_pdu(
services: &Services,
pdu: &PduEvent,
user_id: &UserId,
) -> bool {
// exclude Synapse's dummy events from bloating up response bodies. clients
// don't need to see this.
if pdu.kind.to_cow_str() == "org.matrix.dummy_event" {
return true;
}
let ignored_type = IGNORED_MESSAGE_TYPES.binary_search(&pdu.kind).is_ok();
let ignored_server = services
.config
.forbidden_remote_server_names
.is_match(pdu.sender().server_name().host());
if ignored_type
&& (ignored_server || services.users.user_is_ignored(&pdu.sender, user_id).await)
{
return true;
}
false
}
#[inline]
pub(crate) async fn visibility_filter(
services: &Services,
item: PdusIterItem,
user_id: &UserId,
) -> Option<PdusIterItem> {
let (_, pdu) = &item;
services
.rooms
.state_accessor
.user_can_see_event(user_id, &pdu.room_id, &pdu.event_id)
.await
.then_some(item)
}
#[inline]
pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option<PdusIterItem> {
let (_, pdu) = &item;
pdu.matches(filter).then_some(item)
}
#[cfg_attr(debug_assertions, conduwuit::ctor)]
fn _is_sorted() {
debug_assert!(
IGNORED_MESSAGE_TYPES.is_sorted(),
"IGNORED_MESSAGE_TYPES must be sorted by the developer"
);
}