perf: Throttle frequent device metadata updates & centralise site

This commit is contained in:
timedout 2025-12-02 14:46:13 +00:00
parent ba55dffa0e
commit 393d341f07
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F
8 changed files with 76 additions and 106 deletions

View file

@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, at,
matrix::{
@ -22,7 +23,7 @@ use conduwuit_service::{
};
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, pin_mut};
use ruma::{
DeviceId, MilliSecondsSinceUnixEpoch, RoomId, UserId,
DeviceId, RoomId, UserId,
api::{
Direction,
client::{filter::RoomEventFilter, message::get_message_events},
@ -70,6 +71,7 @@ const LIMIT_DEFAULT: usize = 10;
/// where the user was joined, depending on `history_visibility`)
pub(crate) async fn get_message_events_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<get_message_events::v3::Request>,
) -> Result<get_message_events::v3::Response> {
debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted");
@ -78,20 +80,10 @@ pub(crate) async fn get_message_events_route(
let room_id = &body.room_id;
let filter = &body.filter;
if sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = body.sender_device();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, sender_device, client_ip)
.await;
if !services.rooms.metadata.exists(room_id).await {
return Err!(Request(Forbidden("Room does not exist to this server")));

View file

@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, PduCount, Result, err};
use ruma::{
MilliSecondsSinceUnixEpoch,
@ -118,23 +119,14 @@ pub(crate) async fn set_read_marker_route(
/// Sets private read marker and public read receipt EDU.
pub(crate) async fn create_receipt_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<create_receipt::v3::Request>,
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user();
if body.sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = body.sender_device();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
if matches!(
&body.receipt_type,

View file

@ -1,8 +1,8 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, matrix::pdu::PduBuilder};
use ruma::{
MilliSecondsSinceUnixEpoch, api::client::redact::redact_event,
events::room::redaction::RoomRedactionEventContent,
api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent,
};
use crate::Ruma;
@ -14,23 +14,14 @@ use crate::Ruma;
/// - TODO: Handle txn id
pub(crate) async fn redact_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<redact_event::v3::Request>,
) -> Result<redact_event::v3::Response> {
let sender_user = body.sender_user();
if body.sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = body.sender_device();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
let body = &body.body;
if services.users.is_suspended(sender_user).await? {
// TODO: Users can redact their own messages while suspended

View file

@ -1,11 +1,9 @@
use std::collections::BTreeMap;
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils};
use ruma::{
MilliSecondsSinceUnixEpoch, api::client::message::send_message_event,
events::MessageLikeEventType,
};
use ruma::{api::client::message::send_message_event, events::MessageLikeEventType};
use serde_json::from_str;
use crate::Ruma;
@ -21,6 +19,7 @@ use crate::Ruma;
/// allowed
pub(crate) async fn send_message_event_route(
State(services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<send_message_event::v3::Request>,
) -> Result<send_message_event::v3::Response> {
let sender_user = body.sender_user();
@ -30,20 +29,10 @@ pub(crate) async fn send_message_event_route(
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
}
if sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = sender_device.unwrap();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip)
.await;
// Forbid m.room.encrypted if encryption is disabled
if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption

View file

@ -1,4 +1,5 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Result, err,
matrix::{Event, pdu::PduBuilder},
@ -30,23 +31,14 @@ use crate::{Ruma, RumaResponse};
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<send_state_event::v3::Response> {
let sender_user = body.sender_user();
if body.sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = body.sender_device();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if services.users.is_suspended(sender_user).await? {
return Err!(Request(UserSuspended("You cannot perform this action while suspended.")));
@ -75,9 +67,10 @@ pub(crate) async fn send_state_event_for_key_route(
/// Sends a state event into the room.
pub(crate) async fn send_state_event_for_empty_key_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<send_state_event::v3::Request>,
) -> Result<RumaResponse<send_state_event::v3::Response>> {
send_state_event_for_key_route(State(services), body)
send_state_event_for_key_route(State(services), InsecureClientIp(ip), body)
.boxed()
.await
.map(RumaResponse)

View file

@ -6,6 +6,7 @@ use std::{
};
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{
Err, Error, Result, at, error, extract_variant, is_equal_to,
matrix::{Event, TypeStateKey, pdu::PduCount},
@ -25,7 +26,7 @@ use futures::{
pin_mut,
};
use ruma::{
DeviceId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount},
directory::RoomTypeFilter,
events::{
@ -61,23 +62,17 @@ type KnownRooms = BTreeMap<String, BTreeMap<OwnedRoomId, u64>>;
/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186
pub(crate) async fn sync_events_v5_route(
State(ref services): State<crate::State>,
InsecureClientIp(client_ip): InsecureClientIp,
body: Ruma<sync_events::v5::Request>,
) -> Result<sync_events::v5::Response> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
// Increment the "device last active" metadata
let mut device = services
.users
.get_device_metadata(sender_user, sender_device)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, sender_device, &device)
.await?;
.update_device_last_seen(sender_user, Some(sender_device), client_ip)
.await;
let mut body = body.body;

View file

@ -1,6 +1,7 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, utils, utils::math::Tried};
use ruma::{MilliSecondsSinceUnixEpoch, api::client::typing::create_typing_event};
use ruma::api::client::typing::create_typing_event;
use crate::Ruma;
@ -9,24 +10,15 @@ use crate::Ruma;
/// Sets the typing state of the sender user.
pub(crate) async fn create_typing_event_route(
State(services): State<crate::State>,
InsecureClientIp(ip): InsecureClientIp,
body: Ruma<create_typing_event::v3::Request>,
) -> Result<create_typing_event::v3::Response> {
use create_typing_event::v3::Typing;
let sender_user = body.sender_user();
if body.sender_device.is_some() {
// Increment the "device last active" metadata
let device_id = body.sender_device();
let mut device = services
.users
.get_device_metadata(sender_user, device_id)
.await
.expect("Device metadata should exist for authenticated device");
device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now());
services
.users
.update_device_last_seen(sender_user, device_id, &device)
.await?;
}
services
.users
.update_device_last_seen(sender_user, body.sender_device.as_deref(), ip)
.await;
if sender_user != body.user_id && body.appservice_info.is_none() {
return Err!(Request(Forbidden("You cannot update typing status of other users.")));

View file

@ -1,6 +1,6 @@
#[cfg(feature = "ldap")]
use std::collections::HashMap;
use std::{collections::BTreeMap, mem, sync::Arc};
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
#[cfg(feature = "ldap")]
use conduwuit::result::LogErr;
@ -25,6 +25,7 @@ use ruma::{
invite_permission_config::{FilterLevel, InvitePermissionConfigEvent},
},
serde::Raw,
uint,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -988,7 +989,7 @@ impl Service {
device: &Device,
) -> Result<()> {
increment(&self.db.userid_devicelistversion, user_id.as_bytes());
self.update_device_last_seen(user_id, device_id, device)
self.update_device_metadata_no_increment(user_id, device_id, device)
.await
}
@ -996,7 +997,7 @@ impl Service {
// This is namely used for updating the last_seen_ip and last_seen_ts values,
// as those do not need a device list version bump due to them not being
// relevant to other consumers.
pub async fn update_device_last_seen(
pub async fn update_device_metadata_no_increment(
&self,
user_id: &UserId,
device_id: &DeviceId,
@ -1008,6 +1009,31 @@ impl Service {
Ok(())
}
pub async fn update_device_last_seen(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
ip: IpAddr,
) {
let now = MilliSecondsSinceUnixEpoch::now();
if let Some(device_id) = device_id {
if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await {
device.last_seen_ip = Some(ip.to_string());
// If the last update was less than 10 seconds ago, don't update the timestamp
if let Some(prev) = device.last_seen_ts {
if now.get().saturating_sub(prev.get()) < uint!(10_000) {
return;
}
}
device.last_seen_ts = Some(now);
self.update_device_metadata_no_increment(user_id, device_id, &device)
.await
.ok();
}
}
}
/// Get device metadata.
pub async fn get_device_metadata(
&self,