From ae595dd0d1528165fcb206d126274dc4c1900bcf Mon Sep 17 00:00:00 2001 From: timedout Date: Wed, 19 Nov 2025 19:10:39 +0000 Subject: [PATCH] feat(wip;sender): Centralise status registries --- Cargo.lock | 21 +++++++++++++++++++++ Cargo.toml | 3 --- src/service/Cargo.toml | 1 + src/service/sending/mod.rs | 5 ++++- src/service/sending/sender.rs | 11 ++++++++--- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50fb6e69..92592abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1169,6 +1169,7 @@ dependencies = [ "conduwuit_database", "const-str", "ctor", + "dashmap", "either", "futures", "hickory-resolver", @@ -1525,6 +1526,20 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "817fa642fb0ee7fe42e95783e00e0969927b96091bdd4b9b1af082acd943913b" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -2112,6 +2127,12 @@ version = "0.1.2+12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "647deb1583b14d160f85f3ff626f20b6edd366e3852c9843b06077388f794cb6" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" diff --git a/Cargo.toml b/Cargo.toml index 5665fe8b..91ff4342 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -554,9 +554,6 @@ version = "0.12.0" default-features = false features = ["sync", "tls-rustls", "rustls-provider"] -[workspace.dependencies.resolv-conf] -version = "0.7.5" - # # Patches # diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index 878190a3..7be09055 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -121,6 +121,7 @@ blurhash.workspace = true blurhash.optional = true recaptcha-verify = { version = "0.1.5", default-features = false } ctor.workspace = true +dashmap = "6.1.0" [target.'cfg(all(unix, target_os = "linux"))'.dependencies] sd-notify.workspace = true diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 08ca7010..7f359233 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -4,10 +4,11 @@ mod dest; mod sender; use std::{ + collections::HashMap, fmt::Debug, hash::{DefaultHasher, Hash, Hasher}, iter::once, - sync::Arc, + sync::{Arc, Mutex}, }; use async_trait::async_trait; @@ -39,6 +40,7 @@ pub struct Service { server: Arc, services: Services, channels: Vec<(loole::Sender, loole::Receiver)>, + statuses: Vec, } struct Services { @@ -101,6 +103,7 @@ impl crate::Service for Service { federation: args.depend::("federation"), }, channels: (0..num_senders).map(|_| loole::unbounded()).collect(), + statuses: vec![sender::CurTransactionStatus::new(); num_senders], })) } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 33b0c1c3..58923377 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -20,6 +20,7 @@ use conduwuit_core::{ }, warn, }; +use dashmap::DashMap; use futures::{ FutureExt, StreamExt, future::{BoxFuture, OptionFuture}, @@ -55,7 +56,7 @@ use super::{ }; #[derive(Debug)] -enum TransactionStatus { +pub(crate) enum TransactionStatus { Running, Failed(u32, Instant), // number of times failed, time of last failure Retrying(u32), // number of times failed @@ -65,7 +66,7 @@ type SendingError = (Destination, Error); type SendingResult = Result; type SendingFuture<'a> = BoxFuture<'a, SendingResult>; type SendingFutures<'a> = FuturesUnordered>; -type CurTransactionStatus = HashMap; +pub(crate) type CurTransactionStatus = DashMap; const SELECT_PRESENCE_LIMIT: usize = 256; const SELECT_RECEIPT_LIMIT: usize = 256; @@ -78,9 +79,13 @@ pub const EDU_LIMIT: usize = 100; impl Service { #[tracing::instrument(skip(self), level = "debug")] pub(super) async fn sender(self: Arc, id: usize) -> Result { - let mut statuses: CurTransactionStatus = CurTransactionStatus::new(); let mut futures: SendingFutures<'_> = FuturesUnordered::new(); + let mut statuses = self + .statuses + .get_mut(id) + .expect("missing status for worker"); + self.startup_netburst(id, &mut futures, &mut statuses) .boxed() .await;