feat(federation): Restructure transaction_ids service

Adds two new in-memory maps to the service in to prepare for better handlers
This commit is contained in:
nexy7574 2026-02-20 21:28:23 +00:00 committed by timedout
parent 526d862296
commit e986cd4536
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F

View file

@ -1,11 +1,22 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use conduwuit::{Result, implement};
use conduwuit::{Result, SyncRwLock};
use database::{Handle, Map};
use ruma::{DeviceId, TransactionId, UserId};
use ruma::{
DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId,
api::federation::transactions::send_transaction_message,
};
use tokio::sync::watch::{Receiver, Sender};
pub type TxnKey = (OwnedServerName, OwnedTransactionId);
pub type TxnChanType = (TxnKey, send_transaction_message::v1::Response);
pub type ActiveTxnsMapType = HashMap<TxnKey, (Sender<TxnChanType>, Receiver<TxnChanType>)>;
pub struct Service {
db: Data,
pub servername_txnid_response_cache:
Arc<SyncRwLock<HashMap<TxnKey, send_transaction_message::v1::Response>>>,
pub servername_txnid_active: Arc<SyncRwLock<ActiveTxnsMapType>>,
}
struct Data {
@ -18,37 +29,38 @@ impl crate::Service for Service {
db: Data {
userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(),
},
servername_txnid_response_cache: Arc::new(SyncRwLock::new(HashMap::new())),
servername_txnid_active: Arc::new(SyncRwLock::new(HashMap::new())),
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
pub fn add_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
impl Service {
pub fn add_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
data: &[u8],
) {
let mut key = user_id.as_bytes().to_vec();
key.push(0xFF);
key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default());
key.push(0xFF);
key.extend_from_slice(txn_id.as_bytes());
self.db.userdevicetxnid_response.insert(&key, data);
}
self.db.userdevicetxnid_response.insert(&key, data);
}
// If there's no entry, this is a new transaction
#[implement(Service)]
pub async fn existing_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
pub async fn existing_txnid(
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &TransactionId,
) -> Result<Handle<'_>> {
let key = (user_id, device_id, txn_id);
self.db.userdevicetxnid_response.qry(&key).await
}
}