Compare commits
1 commit
main
...
nex/fix/ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae595dd0d1 |
5 changed files with 34 additions and 7 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
|
@ -1169,6 +1169,7 @@ dependencies = [
|
||||||
"conduwuit_database",
|
"conduwuit_database",
|
||||||
"const-str",
|
"const-str",
|
||||||
"ctor",
|
"ctor",
|
||||||
|
"dashmap",
|
||||||
"either",
|
"either",
|
||||||
"futures",
|
"futures",
|
||||||
"hickory-resolver",
|
"hickory-resolver",
|
||||||
|
|
@ -1525,6 +1526,20 @@ version = "2.1.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "817fa642fb0ee7fe42e95783e00e0969927b96091bdd4b9b1af082acd943913b"
|
checksum = "817fa642fb0ee7fe42e95783e00e0969927b96091bdd4b9b1af082acd943913b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dashmap"
|
||||||
|
version = "6.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"crossbeam-utils",
|
||||||
|
"hashbrown 0.14.5",
|
||||||
|
"lock_api",
|
||||||
|
"once_cell",
|
||||||
|
"parking_lot_core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "data-encoding"
|
name = "data-encoding"
|
||||||
version = "2.9.0"
|
version = "2.9.0"
|
||||||
|
|
@ -2112,6 +2127,12 @@ version = "0.1.2+12"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "647deb1583b14d160f85f3ff626f20b6edd366e3852c9843b06077388f794cb6"
|
checksum = "647deb1583b14d160f85f3ff626f20b6edd366e3852c9843b06077388f794cb6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashbrown"
|
||||||
|
version = "0.14.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashbrown"
|
name = "hashbrown"
|
||||||
version = "0.15.5"
|
version = "0.15.5"
|
||||||
|
|
|
||||||
|
|
@ -554,9 +554,6 @@ version = "0.12.0"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["sync", "tls-rustls", "rustls-provider"]
|
features = ["sync", "tls-rustls", "rustls-provider"]
|
||||||
|
|
||||||
[workspace.dependencies.resolv-conf]
|
|
||||||
version = "0.7.5"
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Patches
|
# Patches
|
||||||
#
|
#
|
||||||
|
|
|
||||||
|
|
@ -121,6 +121,7 @@ blurhash.workspace = true
|
||||||
blurhash.optional = true
|
blurhash.optional = true
|
||||||
recaptcha-verify = { version = "0.1.5", default-features = false }
|
recaptcha-verify = { version = "0.1.5", default-features = false }
|
||||||
ctor.workspace = true
|
ctor.workspace = true
|
||||||
|
dashmap = "6.1.0"
|
||||||
|
|
||||||
[target.'cfg(all(unix, target_os = "linux"))'.dependencies]
|
[target.'cfg(all(unix, target_os = "linux"))'.dependencies]
|
||||||
sd-notify.workspace = true
|
sd-notify.workspace = true
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,11 @@ mod dest;
|
||||||
mod sender;
|
mod sender;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
hash::{DefaultHasher, Hash, Hasher},
|
hash::{DefaultHasher, Hash, Hasher},
|
||||||
iter::once,
|
iter::once,
|
||||||
sync::Arc,
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
@ -39,6 +40,7 @@ pub struct Service {
|
||||||
server: Arc<Server>,
|
server: Arc<Server>,
|
||||||
services: Services,
|
services: Services,
|
||||||
channels: Vec<(loole::Sender<Msg>, loole::Receiver<Msg>)>,
|
channels: Vec<(loole::Sender<Msg>, loole::Receiver<Msg>)>,
|
||||||
|
statuses: Vec<sender::CurTransactionStatus>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Services {
|
struct Services {
|
||||||
|
|
@ -101,6 +103,7 @@ impl crate::Service for Service {
|
||||||
federation: args.depend::<federation::Service>("federation"),
|
federation: args.depend::<federation::Service>("federation"),
|
||||||
},
|
},
|
||||||
channels: (0..num_senders).map(|_| loole::unbounded()).collect(),
|
channels: (0..num_senders).map(|_| loole::unbounded()).collect(),
|
||||||
|
statuses: vec![sender::CurTransactionStatus::new(); num_senders],
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ use conduwuit_core::{
|
||||||
},
|
},
|
||||||
warn,
|
warn,
|
||||||
};
|
};
|
||||||
|
use dashmap::DashMap;
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, StreamExt,
|
FutureExt, StreamExt,
|
||||||
future::{BoxFuture, OptionFuture},
|
future::{BoxFuture, OptionFuture},
|
||||||
|
|
@ -55,7 +56,7 @@ use super::{
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum TransactionStatus {
|
pub(crate) enum TransactionStatus {
|
||||||
Running,
|
Running,
|
||||||
Failed(u32, Instant), // number of times failed, time of last failure
|
Failed(u32, Instant), // number of times failed, time of last failure
|
||||||
Retrying(u32), // number of times failed
|
Retrying(u32), // number of times failed
|
||||||
|
|
@ -65,7 +66,7 @@ type SendingError = (Destination, Error);
|
||||||
type SendingResult = Result<Destination, SendingError>;
|
type SendingResult = Result<Destination, SendingError>;
|
||||||
type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
|
type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
|
||||||
type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
|
type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
|
||||||
type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
|
pub(crate) type CurTransactionStatus = DashMap<Destination, TransactionStatus>;
|
||||||
|
|
||||||
const SELECT_PRESENCE_LIMIT: usize = 256;
|
const SELECT_PRESENCE_LIMIT: usize = 256;
|
||||||
const SELECT_RECEIPT_LIMIT: usize = 256;
|
const SELECT_RECEIPT_LIMIT: usize = 256;
|
||||||
|
|
@ -78,9 +79,13 @@ pub const EDU_LIMIT: usize = 100;
|
||||||
impl Service {
|
impl Service {
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
|
pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
|
||||||
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
|
||||||
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
||||||
|
|
||||||
|
let mut statuses = self
|
||||||
|
.statuses
|
||||||
|
.get_mut(id)
|
||||||
|
.expect("missing status for worker");
|
||||||
|
|
||||||
self.startup_netburst(id, &mut futures, &mut statuses)
|
self.startup_netburst(id, &mut futures, &mut statuses)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await;
|
.await;
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue