Compare commits
1 commit
main
...
nex/fix/ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae595dd0d1 |
5 changed files with 34 additions and 7 deletions
21
Cargo.lock
generated
21
Cargo.lock
generated
|
|
@ -1169,6 +1169,7 @@ dependencies = [
|
|||
"conduwuit_database",
|
||||
"const-str",
|
||||
"ctor",
|
||||
"dashmap",
|
||||
"either",
|
||||
"futures",
|
||||
"hickory-resolver",
|
||||
|
|
@ -1525,6 +1526,20 @@ version = "2.1.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "817fa642fb0ee7fe42e95783e00e0969927b96091bdd4b9b1af082acd943913b"
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "6.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.9.0"
|
||||
|
|
@ -2112,6 +2127,12 @@ version = "0.1.2+12"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "647deb1583b14d160f85f3ff626f20b6edd366e3852c9843b06077388f794cb6"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.15.5"
|
||||
|
|
|
|||
|
|
@ -554,9 +554,6 @@ version = "0.12.0"
|
|||
default-features = false
|
||||
features = ["sync", "tls-rustls", "rustls-provider"]
|
||||
|
||||
[workspace.dependencies.resolv-conf]
|
||||
version = "0.7.5"
|
||||
|
||||
#
|
||||
# Patches
|
||||
#
|
||||
|
|
|
|||
|
|
@ -121,6 +121,7 @@ blurhash.workspace = true
|
|||
blurhash.optional = true
|
||||
recaptcha-verify = { version = "0.1.5", default-features = false }
|
||||
ctor.workspace = true
|
||||
dashmap = "6.1.0"
|
||||
|
||||
[target.'cfg(all(unix, target_os = "linux"))'.dependencies]
|
||||
sd-notify.workspace = true
|
||||
|
|
|
|||
|
|
@ -4,10 +4,11 @@ mod dest;
|
|||
mod sender;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
hash::{DefaultHasher, Hash, Hasher},
|
||||
iter::once,
|
||||
sync::Arc,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
|
@ -39,6 +40,7 @@ pub struct Service {
|
|||
server: Arc<Server>,
|
||||
services: Services,
|
||||
channels: Vec<(loole::Sender<Msg>, loole::Receiver<Msg>)>,
|
||||
statuses: Vec<sender::CurTransactionStatus>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
|
|
@ -101,6 +103,7 @@ impl crate::Service for Service {
|
|||
federation: args.depend::<federation::Service>("federation"),
|
||||
},
|
||||
channels: (0..num_senders).map(|_| loole::unbounded()).collect(),
|
||||
statuses: vec![sender::CurTransactionStatus::new(); num_senders],
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ use conduwuit_core::{
|
|||
},
|
||||
warn,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use futures::{
|
||||
FutureExt, StreamExt,
|
||||
future::{BoxFuture, OptionFuture},
|
||||
|
|
@ -55,7 +56,7 @@ use super::{
|
|||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
enum TransactionStatus {
|
||||
pub(crate) enum TransactionStatus {
|
||||
Running,
|
||||
Failed(u32, Instant), // number of times failed, time of last failure
|
||||
Retrying(u32), // number of times failed
|
||||
|
|
@ -65,7 +66,7 @@ type SendingError = (Destination, Error);
|
|||
type SendingResult = Result<Destination, SendingError>;
|
||||
type SendingFuture<'a> = BoxFuture<'a, SendingResult>;
|
||||
type SendingFutures<'a> = FuturesUnordered<SendingFuture<'a>>;
|
||||
type CurTransactionStatus = HashMap<Destination, TransactionStatus>;
|
||||
pub(crate) type CurTransactionStatus = DashMap<Destination, TransactionStatus>;
|
||||
|
||||
const SELECT_PRESENCE_LIMIT: usize = 256;
|
||||
const SELECT_RECEIPT_LIMIT: usize = 256;
|
||||
|
|
@ -78,9 +79,13 @@ pub const EDU_LIMIT: usize = 100;
|
|||
impl Service {
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub(super) async fn sender(self: Arc<Self>, id: usize) -> Result {
|
||||
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
|
||||
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
|
||||
|
||||
let mut statuses = self
|
||||
.statuses
|
||||
.get_mut(id)
|
||||
.expect("missing status for worker");
|
||||
|
||||
self.startup_netburst(id, &mut futures, &mut statuses)
|
||||
.boxed()
|
||||
.await;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue