From 5e3174493d7dd40c5432eb55a696b8ee75923c22 Mon Sep 17 00:00:00 2001 From: timedout Date: Thu, 26 Feb 2026 16:16:45 +0000 Subject: [PATCH] feat: Add method to flush senders when a server becomes active --- src/api/router/auth.rs | 10 +++++++- src/service/sending/mod.rs | 44 +++++++++++++++++++++++++++++++++-- src/service/sending/sender.rs | 15 +++++++++++- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/api/router/auth.rs b/src/api/router/auth.rs index 8f48166d..8c72cbb4 100644 --- a/src/api/router/auth.rs +++ b/src/api/router/auth.rs @@ -4,7 +4,7 @@ use axum_extra::{ headers::{Authorization, authorization::Bearer}, typed_header::TypedHeaderRejectionReason, }; -use conduwuit::{Err, Error, Result, debug_error, err, warn}; +use conduwuit::{Err, Error, Result, debug_error, debug_info, err, warn}; use futures::{ TryFutureExt, future::{ @@ -329,6 +329,14 @@ async fn auth_server( return Err!(Request(Forbidden("Failed to verify X-Matrix signatures."))); } + if services.sending.server_is_offline(destination).await { + debug_info!(?destination, "server returned from being offline"); + services + .sending + .mark_server_online(destination, false) + .await; + } + Ok(Auth { origin: origin.to_owned().into(), sender_user: None, diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 7997d532..c60700f9 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -5,6 +5,7 @@ mod dest; mod sender; use std::{ + collections::HashSet, fmt::Debug, hash::{DefaultHasher, Hash, Hasher}, iter::once, @@ -19,8 +20,8 @@ use conduwuit::{ warn, }; use futures::{FutureExt, Stream, StreamExt}; -use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest}; -use tokio::{task, task::JoinSet}; +use ruma::{OwnedServerName, RoomId, ServerName, UserId, api::OutgoingRequest}; +use tokio::{sync::RwLock, task, task::JoinSet}; use self::data::Data; pub use self::{ @@ -37,6 +38,7 @@ pub struct Service { server: Arc, services: Services, channels: Vec<(loole::Sender, loole::Receiver)>, + pub offline_servers: RwLock>, } struct Services { @@ -52,6 +54,7 @@ struct Services { account_data: Dep, appservice: Dep, pusher: Dep, + resolver: Dep, federation: Dep, } @@ -96,9 +99,11 @@ impl crate::Service for Service { account_data: args.depend::("account_data"), appservice: args.depend::("appservice"), pusher: args.depend::("pusher"), + resolver: args.depend::("resolver"), federation: args.depend::("federation"), }, channels: (0..num_senders).map(|_| loole::unbounded()).collect(), + offline_servers: RwLock::new(HashSet::new()), })) } @@ -146,6 +151,8 @@ impl crate::Service for Service { fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn unconstrained(&self) -> bool { true } + + async fn clear_cache(&self) { self.offline_servers.write().await.clear(); } } impl Service { @@ -379,6 +386,39 @@ impl Service { let chans = self.channels.len().max(1); hash.overflowing_rem(chans).0 } + + /// Marks a server as offline + pub async fn mark_server_offline(&self, server: OwnedServerName) { + self.offline_servers.write().await.insert(server); + } + + /// Marks a server as online again and flushes the senders if it was + /// previously marked as offline + pub async fn mark_server_online(&self, server: &ServerName, skip_flush: bool) { + if self.offline_servers.write().await.remove(server) && !skip_flush { + // Flush the senders if this server was previously offline + self.services.resolver.cache.del_destination(server); + self.services.resolver.cache.del_override(server); + self.dispatch(Msg { + dest: Destination::Federation(server.to_owned()), + event: SendingEvent::Flush, + queue_id: Vec::::new(), + }) + .inspect_err(|e| { + error!( + ?server, + ?e, + "failed to dispatch flush message for server coming back online" + ); + }) + .ok(); + } + } + + /// Checks if a server is currently marked as offline + pub async fn server_is_offline(&self, server: &ServerName) -> bool { + self.offline_servers.read().await.contains(server) + } } fn num_senders(args: &crate::Args<'_>) -> usize { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 2044dc51..0426a510 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -9,6 +9,7 @@ use std::{ }; use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; +use conduwuit::debug_warn; use conduwuit_core::{ Error, Event, Result, debug, err, error, result::LogErr, @@ -135,7 +136,13 @@ impl Service { ) { match response { | Ok(dest) => self.handle_response_ok(&dest, futures, statuses).await, - | Err((dest, e)) => Self::handle_response_err(dest, statuses, &e), + | Err((dest, e)) => { + Self::handle_response_err(dest.clone(), statuses, &e); + if let Destination::Federation(server_name) = dest { + debug_warn!(?server_name, "marking server offline due to error: {e:?}"); + self.mark_server_offline(server_name).await; + } + }, } } @@ -180,6 +187,12 @@ impl Service { } else { statuses.remove(dest); } + if let Destination::Federation(server_name) = dest { + self.mark_server_online(server_name, true).await; + // We skip the flush here because we were already able to contact + // the server, and have queued any pending events, and the + // resolver cache will be fine. + } } #[allow(clippy::needless_pass_by_ref_mut)]