From bc426e1bfcbc4e3d7f05f53515cd7144996a1b60 Mon Sep 17 00:00:00 2001 From: timedout Date: Sat, 3 Jan 2026 16:05:05 +0000 Subject: [PATCH] fix: Apply client-requested timeout to federated key queries Also parallelised federation calls in related functions --- changelog.d/1261.bugfix | 2 + src/admin/user/commands.rs | 4 +- src/api/client/keys.rs | 87 +++++++++++++++++++++++++----------- src/api/client/well_known.rs | 2 +- src/api/server/user.rs | 6 ++- src/service/admin/grant.rs | 2 +- src/service/admin/mod.rs | 11 ++++- 7 files changed, 81 insertions(+), 33 deletions(-) create mode 100644 changelog.d/1261.bugfix diff --git a/changelog.d/1261.bugfix b/changelog.d/1261.bugfix new file mode 100644 index 00000000..ffc50358 --- /dev/null +++ b/changelog.d/1261.bugfix @@ -0,0 +1,2 @@ +Client requested timeout parameter is now applied to e2ee key lookups and claims. Related federation requests are now +also concurrent. Contributed by @nex. diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 0a41e259..4e9c38c6 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -465,7 +465,7 @@ pub(super) async fn force_join_list_of_local_users( if server_admins.is_empty() { return Err!("There are no admins set for this server."); - }; + } let (room_id, servers) = self .services @@ -580,7 +580,7 @@ pub(super) async fn force_join_all_local_users( if server_admins.is_empty() { return Err!("There are no admins set for this server."); - }; + } let (room_id, servers) = self .services diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index f371d89e..36a5b949 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -1,7 +1,16 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + ops::Add, + time::Duration, +}; use axum::extract::State; -use conduwuit::{Err, Error, Result, debug, debug_warn, err, result::NotFound, utils}; +use conduwuit::{ + Err, Error, Result, debug, debug_warn, err, + result::NotFound, + utils, + utils::{IterStream, stream::WidebandExt}, +}; use conduwuit_service::{Services, users::parse_master_key}; use futures::{StreamExt, stream::FuturesUnordered}; use ruma::{ @@ -134,6 +143,7 @@ pub(crate) async fn get_keys_route( &body.device_keys, |u| u == sender_user, true, // Always allow local users to see device names of other local users + body.timeout.unwrap_or(Duration::from_secs(10)), ) .await } @@ -145,7 +155,12 @@ pub(crate) async fn claim_keys_route( State(services): State, body: Ruma, ) -> Result { - claim_keys_helper(&services, &body.one_time_keys).await + claim_keys_helper( + &services, + &body.one_time_keys, + body.timeout.unwrap_or(Duration::from_secs(10)), + ) + .await } /// # `POST /_matrix/client/r0/keys/device_signing/upload` @@ -421,10 +436,12 @@ pub(crate) async fn get_keys_helper( device_keys_input: &BTreeMap>, allowed_signatures: F, include_display_names: bool, + timeout: Duration, ) -> Result where F: Fn(&UserId) -> bool + Send + Sync, { + let deadline = tokio::time::Instant::now().add(timeout); let mut master_keys = BTreeMap::new(); let mut self_signing_keys = BTreeMap::new(); let mut user_signing_keys = BTreeMap::new(); @@ -512,9 +529,10 @@ where let mut failures = BTreeMap::new(); - let mut futures: FuturesUnordered<_> = get_over_federation + let mut futures = get_over_federation .into_iter() - .map(|(server, vec)| async move { + .stream() + .wide_filter_map(|(server, vec)| async move { let mut device_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { device_keys_input_fed.insert(user_id.to_owned(), keys.clone()); @@ -522,17 +540,23 @@ where let request = federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed }; + let response = tokio::time::timeout_at( + deadline, + services.sending.send_federation_request(server, request), + ) + .await + // Need to flatten the Result, E> into Result + .map_err(|_| err!(Request(Unknown("Timeout when getting keys over federation.")))) + .and_then(|res| res); - let response = services - .sending - .send_federation_request(server, request) - .await; - - (server, response) + Some((server, response)) }) - .collect(); + .then(async |v| v) + .collect::>() + .await + .into_iter(); - while let Some((server, response)) = futures.next().await { + while let Some((server, response)) = futures.next() { match response { | Ok(response) => { for (user, master_key) in response.master_keys { @@ -564,8 +588,9 @@ where self_signing_keys.extend(response.self_signing_keys); device_keys.extend(response.device_keys); }, - | _ => { - failures.insert(server.to_string(), json!({})); + | Err(e) => { + // Extra branch is needed because Elapsed != Error (types) + failures.insert(server.to_string(), json!({ "error": e.to_string() })); }, } } @@ -608,7 +633,9 @@ fn add_unsigned_device_display_name( pub(crate) async fn claim_keys_helper( services: &Services, one_time_keys_input: &BTreeMap>, + timeout: Duration, ) -> Result { + let deadline = tokio::time::Instant::now().add(timeout); let mut one_time_keys = BTreeMap::new(); let mut get_over_federation = BTreeMap::new(); @@ -638,26 +665,34 @@ pub(crate) async fn claim_keys_helper( let mut failures = BTreeMap::new(); - let mut futures: FuturesUnordered<_> = get_over_federation + let mut futures = get_over_federation .into_iter() - .map(|(server, vec)| async move { + .stream() + .wide_filter_map(|(server, vec)| async move { let mut one_time_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); } - ( - server, - services - .sending - .send_federation_request(server, federation::keys::claim_keys::v1::Request { + let response = tokio::time::timeout_at( + deadline, + services.sending.send_federation_request( + server, + federation::keys::claim_keys::v1::Request { one_time_keys: one_time_keys_input_fed, - }) - .await, + }, + ), ) + .await + .map_err(|_| err!(Request(Unknown("Timeout when claiming keys over federation.")))) + .and_then(|res| res); + Some((server, response)) }) - .collect(); + .then(async |v| v) + .collect::>() + .await + .into_iter(); - while let Some((server, response)) = futures.next().await { + while let Some((server, response)) = futures.next() { match response { | Ok(keys) => { one_time_keys.extend(keys.one_time_keys); diff --git a/src/api/client/well_known.rs b/src/api/client/well_known.rs index 2eeaa350..ea76b2a4 100644 --- a/src/api/client/well_known.rs +++ b/src/api/client/well_known.rs @@ -72,7 +72,7 @@ pub(crate) async fn well_known_support( if contacts.is_empty() { let admin_users = services.admin.get_admins().await; - for user_id in admin_users.iter() { + for user_id in &admin_users { if *user_id == services.globals.server_user { continue; } diff --git a/src/api/server/user.rs b/src/api/server/user.rs index 80c353ab..3c00bdad 100644 --- a/src/api/server/user.rs +++ b/src/api/server/user.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use axum::extract::State; use conduwuit::{Error, Result}; use futures::{FutureExt, StreamExt, TryFutureExt}; @@ -96,6 +98,7 @@ pub(crate) async fn get_keys_route( &body.device_keys, |u| Some(u.server_name()) == body.origin.as_deref(), services.globals.allow_device_name_federation(), + Duration::from_secs(0), ) .await?; @@ -124,7 +127,8 @@ pub(crate) async fn claim_keys_route( )); } - let result = claim_keys_helper(&services, &body.one_time_keys).await?; + let result = + claim_keys_helper(&services, &body.one_time_keys, Duration::from_secs(0)).await?; Ok(claim_keys::v1::Response { one_time_keys: result.one_time_keys }) } diff --git a/src/service/admin/grant.rs b/src/service/admin/grant.rs index de3e263d..35d8f93c 100644 --- a/src/service/admin/grant.rs +++ b/src/service/admin/grant.rs @@ -188,7 +188,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result { warn!( "Revoking the admin status of {user_id} will not work correctly as they are within \ the admins_list config." - ) + ); } let Ok(room_id) = self.get_admin_room().await else { diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 59d61225..5d1a1372 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -406,13 +406,20 @@ impl Service { /// Checks whether a given user is an admin of this server pub async fn user_is_admin(&self, user_id: &UserId) -> bool { - if self.services.server.config.admins_list.contains(&user_id.to_owned()) { + if self + .services + .server + .config + .admins_list + .contains(&user_id.to_owned()) + { return true; } if self.services.server.config.admins_from_room { if let Ok(admin_room) = self.get_admin_room().await { - return self.services + return self + .services .state_cache .is_joined(user_id, &admin_room) .await;