Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
timedout
ae595dd0d1 feat(wip;sender): Centralise status registries 2025-11-21 17:11:12 +00:00
5 changed files with 34 additions and 7 deletions

21
Cargo.lock generated
View file

@ -1169,6 +1169,7 @@ dependencies = [
"conduwuit_database",
"const-str",
"ctor",
"dashmap",
"either",
"futures",
"hickory-resolver",
@ -1525,6 +1526,20 @@ version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "817fa642fb0ee7fe42e95783e00e0969927b96091bdd4b9b1af082acd943913b"
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.9.0"
@ -2112,6 +2127,12 @@ version = "0.1.2+12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "647deb1583b14d160f85f3ff626f20b6edd366e3852c9843b06077388f794cb6"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.5"

View file

@ -554,9 +554,6 @@ version = "0.12.0"
default-features = false
features = ["sync", "tls-rustls", "rustls-provider"]
[workspace.dependencies.resolv-conf]
version = "0.7.5"
#
# Patches
#

View file

@ -121,6 +121,7 @@ blurhash.workspace = true
blurhash.optional = true
recaptcha-verify = { version = "0.1.5", default-features = false }
ctor.workspace = true
dashmap = "6.1.0"
[target.'cfg(all(unix, target_os = "linux"))'.dependencies]
sd-notify.workspace = true

View file

@ -4,10 +4,11 @@ mod dest;
mod sender;
use std::{
collections::HashMap,
fmt::Debug,
hash::{DefaultHasher, Hash, Hasher},
iter::once,
sync::Arc,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
@ -39,6 +40,7 @@ pub struct Service {
server: Arc<Server>,
services: Services,
channels: Vec<(loole::Sender<Msg>, loole::Receiver<Msg>)>,
statuses: Vec<sender::CurTransactionStatus>,
}
struct Services {
@ -101,6 +103,7 @@ impl crate::Service for Service {
federation: args.depend::<federation::Service>("federation"),
},
channels: (0..num_senders).map(|_| loole::unbounded()).collect(),
statuses: vec![sender::CurTransactionStatus::new(); num_senders],
}))
}

View file

@ -20,6 +20,7 @@ use conduwuit_core::{
},
warn,
};
use dashmap::DashMap;
use futures::{
FutureExt, StreamExt,
future::{BoxFuture, OptionFuture},
@ -55,7 +56,7 @@ use super::{
};
#[derive(Debug)]
enum TransactionStatus {
pub(crate) enum TransactionStatus {
Running,
Failed(u32, Instant), // number of times failed, time of last failure
Retrying(u32), // number of times failed
@ -65,7 +66,7 @@ type SendingError = (Destination, Error);
type SendingResult = Result<Destination, SendingError>;
type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
pub(crate) type CurTransactionStatus = DashMap<Destination, TransactionStatus>;
const SELECT_PRESENCE_LIMIT: usize = 256;
const SELECT_RECEIPT_LIMIT: usize = 256;
@ -78,9 +79,13 @@ pub const EDU_LIMIT: usize = 100;
impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
let mut statuses = self
.statuses
.get_mut(id)
.expect("missing status for worker");
self.startup_netburst(id, &mut futures, &mut statuses)
.boxed()
.await;