From e986cd45369c45132b28227ef29620decd5f9e35 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Fri, 20 Feb 2026 21:28:23 +0000 Subject: [PATCH] feat(federation): Restructure transaction_ids service Adds two new in-memory maps to the service in to prepare for better handlers --- src/service/transaction_ids/mod.rs | 68 ++++++++++++++++++------------ 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/src/service/transaction_ids/mod.rs b/src/service/transaction_ids/mod.rs index 9c284b70..40bf44c3 100644 --- a/src/service/transaction_ids/mod.rs +++ b/src/service/transaction_ids/mod.rs @@ -1,11 +1,22 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; -use conduwuit::{Result, implement}; +use conduwuit::{Result, SyncRwLock}; use database::{Handle, Map}; -use ruma::{DeviceId, TransactionId, UserId}; +use ruma::{ + DeviceId, OwnedServerName, OwnedTransactionId, TransactionId, UserId, + api::federation::transactions::send_transaction_message, +}; +use tokio::sync::watch::{Receiver, Sender}; + +pub type TxnKey = (OwnedServerName, OwnedTransactionId); +pub type TxnChanType = (TxnKey, send_transaction_message::v1::Response); +pub type ActiveTxnsMapType = HashMap, Receiver)>; pub struct Service { db: Data, + pub servername_txnid_response_cache: + Arc>>, + pub servername_txnid_active: Arc>, } struct Data { @@ -18,37 +29,38 @@ impl crate::Service for Service { db: Data { userdevicetxnid_response: args.db["userdevicetxnid_response"].clone(), }, + servername_txnid_response_cache: Arc::new(SyncRwLock::new(HashMap::new())), + servername_txnid_active: Arc::new(SyncRwLock::new(HashMap::new())), })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -#[implement(Service)] -pub fn add_txnid( - &self, - user_id: &UserId, - device_id: Option<&DeviceId>, - txn_id: &TransactionId, - data: &[u8], -) { - let mut key = user_id.as_bytes().to_vec(); - key.push(0xFF); - key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default()); - key.push(0xFF); - key.extend_from_slice(txn_id.as_bytes()); +impl Service { + pub fn add_txnid( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + txn_id: &TransactionId, + data: &[u8], + ) { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xFF); + key.extend_from_slice(device_id.map(DeviceId::as_bytes).unwrap_or_default()); + key.push(0xFF); + key.extend_from_slice(txn_id.as_bytes()); - self.db.userdevicetxnid_response.insert(&key, data); -} + self.db.userdevicetxnid_response.insert(&key, data); + } -// If there's no entry, this is a new transaction -#[implement(Service)] -pub async fn existing_txnid( - &self, - user_id: &UserId, - device_id: Option<&DeviceId>, - txn_id: &TransactionId, -) -> Result> { - let key = (user_id, device_id, txn_id); - self.db.userdevicetxnid_response.qry(&key).await + pub async fn existing_txnid( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + txn_id: &TransactionId, + ) -> Result> { + let key = (user_id, device_id, txn_id); + self.db.userdevicetxnid_response.qry(&key).await + } }