From 393d341f0787dd8dff7a8338423fa2fabae747b0 Mon Sep 17 00:00:00 2001 From: timedout Date: Tue, 2 Dec 2025 14:46:13 +0000 Subject: [PATCH] perf: Throttle frequent device metadata updates & centralise site --- src/api/client/message.rs | 22 +++++++--------------- src/api/client/read_marker.rs | 20 ++++++-------------- src/api/client/redact.rs | 23 +++++++---------------- src/api/client/send.rs | 25 +++++++------------------ src/api/client/state.rs | 23 ++++++++--------------- src/api/client/sync/v5.rs | 15 +++++---------- src/api/client/typing.rs | 22 +++++++--------------- src/service/users/mod.rs | 32 +++++++++++++++++++++++++++++--- 8 files changed, 76 insertions(+), 106 deletions(-) diff --git a/src/api/client/message.rs b/src/api/client/message.rs index b89361db..a7b364a0 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Result, at, matrix::{ @@ -22,7 +23,7 @@ use conduwuit_service::{ }; use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, pin_mut}; use ruma::{ - DeviceId, MilliSecondsSinceUnixEpoch, RoomId, UserId, + DeviceId, RoomId, UserId, api::{ Direction, client::{filter::RoomEventFilter, message::get_message_events}, @@ -70,6 +71,7 @@ const LIMIT_DEFAULT: usize = 10; /// where the user was joined, depending on `history_visibility`) pub(crate) async fn get_message_events_route( State(services): State, + InsecureClientIp(client_ip): InsecureClientIp, body: Ruma, ) -> Result { debug_assert!(IGNORED_MESSAGE_TYPES.is_sorted(), "IGNORED_MESSAGE_TYPES is not sorted"); @@ -78,20 +80,10 @@ pub(crate) async fn get_message_events_route( let room_id = &body.room_id; let filter = &body.filter; - if sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = body.sender_device(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, sender_device, client_ip) + .await; if !services.rooms.metadata.exists(room_id).await { return Err!(Request(Forbidden("Room does not exist to this server"))); diff --git a/src/api/client/read_marker.rs b/src/api/client/read_marker.rs index e724c014..95c761f9 100644 --- a/src/api/client/read_marker.rs +++ b/src/api/client/read_marker.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{Err, PduCount, Result, err}; use ruma::{ MilliSecondsSinceUnixEpoch, @@ -118,23 +119,14 @@ pub(crate) async fn set_read_marker_route( /// Sets private read marker and public read receipt EDU. pub(crate) async fn create_receipt_route( State(services): State, + InsecureClientIp(client_ip): InsecureClientIp, body: Ruma, ) -> Result { let sender_user = body.sender_user(); - if body.sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = body.sender_device(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip) + .await; if matches!( &body.receipt_type, diff --git a/src/api/client/redact.rs b/src/api/client/redact.rs index 70514d94..c3f42999 100644 --- a/src/api/client/redact.rs +++ b/src/api/client/redact.rs @@ -1,8 +1,8 @@ use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{Err, Result, matrix::pdu::PduBuilder}; use ruma::{ - MilliSecondsSinceUnixEpoch, api::client::redact::redact_event, - events::room::redaction::RoomRedactionEventContent, + api::client::redact::redact_event, events::room::redaction::RoomRedactionEventContent, }; use crate::Ruma; @@ -14,23 +14,14 @@ use crate::Ruma; /// - TODO: Handle txn id pub(crate) async fn redact_event_route( State(services): State, + InsecureClientIp(client_ip): InsecureClientIp, body: Ruma, ) -> Result { let sender_user = body.sender_user(); - if body.sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = body.sender_device(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip) + .await; let body = &body.body; if services.users.is_suspended(sender_user).await? { // TODO: Users can redact their own messages while suspended diff --git a/src/api/client/send.rs b/src/api/client/send.rs index 0b2373f5..c6ceeeff 100644 --- a/src/api/client/send.rs +++ b/src/api/client/send.rs @@ -1,11 +1,9 @@ use std::collections::BTreeMap; use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{Err, Result, err, matrix::pdu::PduBuilder, utils}; -use ruma::{ - MilliSecondsSinceUnixEpoch, api::client::message::send_message_event, - events::MessageLikeEventType, -}; +use ruma::{api::client::message::send_message_event, events::MessageLikeEventType}; use serde_json::from_str; use crate::Ruma; @@ -21,6 +19,7 @@ use crate::Ruma; /// allowed pub(crate) async fn send_message_event_route( State(services): State, + InsecureClientIp(client_ip): InsecureClientIp, body: Ruma, ) -> Result { let sender_user = body.sender_user(); @@ -30,20 +29,10 @@ pub(crate) async fn send_message_event_route( return Err!(Request(UserSuspended("You cannot perform this action while suspended."))); } - if sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = sender_device.unwrap(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, body.sender_device.as_deref(), client_ip) + .await; // Forbid m.room.encrypted if encryption is disabled if MessageLikeEventType::RoomEncrypted == body.event_type && !services.config.allow_encryption diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 7e9f4f3d..92e155e6 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Result, err, matrix::{Event, pdu::PduBuilder}, @@ -30,23 +31,14 @@ use crate::{Ruma, RumaResponse}; /// Sends a state event into the room. pub(crate) async fn send_state_event_for_key_route( State(services): State, + InsecureClientIp(ip): InsecureClientIp, body: Ruma, ) -> Result { let sender_user = body.sender_user(); - if body.sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = body.sender_device(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, body.sender_device.as_deref(), ip) + .await; if services.users.is_suspended(sender_user).await? { return Err!(Request(UserSuspended("You cannot perform this action while suspended."))); @@ -75,9 +67,10 @@ pub(crate) async fn send_state_event_for_key_route( /// Sends a state event into the room. pub(crate) async fn send_state_event_for_empty_key_route( State(services): State, + InsecureClientIp(ip): InsecureClientIp, body: Ruma, ) -> Result> { - send_state_event_for_key_route(State(services), body) + send_state_event_for_key_route(State(services), InsecureClientIp(ip), body) .boxed() .await .map(RumaResponse) diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index f0fe2bdd..8976fd02 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -6,6 +6,7 @@ use std::{ }; use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Error, Result, at, error, extract_variant, is_equal_to, matrix::{Event, TypeStateKey, pdu::PduCount}, @@ -25,7 +26,7 @@ use futures::{ pin_mut, }; use ruma::{ - DeviceId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, + DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, api::client::sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, directory::RoomTypeFilter, events::{ @@ -61,23 +62,17 @@ type KnownRooms = BTreeMap>; /// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186 pub(crate) async fn sync_events_v5_route( State(ref services): State, + InsecureClientIp(client_ip): InsecureClientIp, body: Ruma, ) -> Result { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_ref().expect("user is authenticated"); - // Increment the "device last active" metadata - let mut device = services - .users - .get_device_metadata(sender_user, sender_device) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); services .users - .update_device_last_seen(sender_user, sender_device, &device) - .await?; + .update_device_last_seen(sender_user, Some(sender_device), client_ip) + .await; let mut body = body.body; diff --git a/src/api/client/typing.rs b/src/api/client/typing.rs index b802b5db..e54aef7d 100644 --- a/src/api/client/typing.rs +++ b/src/api/client/typing.rs @@ -1,6 +1,7 @@ use axum::extract::State; +use axum_client_ip::InsecureClientIp; use conduwuit::{Err, Result, utils, utils::math::Tried}; -use ruma::{MilliSecondsSinceUnixEpoch, api::client::typing::create_typing_event}; +use ruma::api::client::typing::create_typing_event; use crate::Ruma; @@ -9,24 +10,15 @@ use crate::Ruma; /// Sets the typing state of the sender user. pub(crate) async fn create_typing_event_route( State(services): State, + InsecureClientIp(ip): InsecureClientIp, body: Ruma, ) -> Result { use create_typing_event::v3::Typing; let sender_user = body.sender_user(); - if body.sender_device.is_some() { - // Increment the "device last active" metadata - let device_id = body.sender_device(); - let mut device = services - .users - .get_device_metadata(sender_user, device_id) - .await - .expect("Device metadata should exist for authenticated device"); - device.last_seen_ts = Some(MilliSecondsSinceUnixEpoch::now()); - services - .users - .update_device_last_seen(sender_user, device_id, &device) - .await?; - } + services + .users + .update_device_last_seen(sender_user, body.sender_device.as_deref(), ip) + .await; if sender_user != body.user_id && body.appservice_info.is_none() { return Err!(Request(Forbidden("You cannot update typing status of other users."))); diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index dadb354d..46edec6f 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1,6 +1,6 @@ #[cfg(feature = "ldap")] use std::collections::HashMap; -use std::{collections::BTreeMap, mem, sync::Arc}; +use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc}; #[cfg(feature = "ldap")] use conduwuit::result::LogErr; @@ -25,6 +25,7 @@ use ruma::{ invite_permission_config::{FilterLevel, InvitePermissionConfigEvent}, }, serde::Raw, + uint, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -988,7 +989,7 @@ impl Service { device: &Device, ) -> Result<()> { increment(&self.db.userid_devicelistversion, user_id.as_bytes()); - self.update_device_last_seen(user_id, device_id, device) + self.update_device_metadata_no_increment(user_id, device_id, device) .await } @@ -996,7 +997,7 @@ impl Service { // This is namely used for updating the last_seen_ip and last_seen_ts values, // as those do not need a device list version bump due to them not being // relevant to other consumers. - pub async fn update_device_last_seen( + pub async fn update_device_metadata_no_increment( &self, user_id: &UserId, device_id: &DeviceId, @@ -1008,6 +1009,31 @@ impl Service { Ok(()) } + pub async fn update_device_last_seen( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + ip: IpAddr, + ) { + let now = MilliSecondsSinceUnixEpoch::now(); + if let Some(device_id) = device_id { + if let Ok(mut device) = self.get_device_metadata(user_id, device_id).await { + device.last_seen_ip = Some(ip.to_string()); + // If the last update was less than 10 seconds ago, don't update the timestamp + if let Some(prev) = device.last_seen_ts { + if now.get().saturating_sub(prev.get()) < uint!(10_000) { + return; + } + } + device.last_seen_ts = Some(now); + + self.update_device_metadata_no_increment(user_id, device_id, &device) + .await + .ok(); + } + } + } + /// Get device metadata. pub async fn get_device_metadata( &self,