diff --git a/src/api/server/send.rs b/src/api/server/send.rs index ccdd074f..4420223d 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,16 +1,19 @@ -use std::{collections::BTreeMap, net::IpAddr, time::Instant}; +use std::{ + collections::BTreeMap, + net::IpAddr, + time::{Duration, Instant}, +}; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Error, Result, debug, debug_warn, err, error, + Err, Error, Result, debug, debug_warn, err, error, info, result::LogErr, trace, utils::{ IterStream, ReadyExt, millis_since_unix_epoch, stream::{BroadbandExt, TryBroadbandExt, automatic_width}, }, - warn, }; use conduwuit_service::{ Services, @@ -21,7 +24,7 @@ use itertools::Itertools; use ruma::{ CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, api::{ - client::error::ErrorKind, + client::error::{ErrorKind, ErrorKind::LimitExceeded}, federation::transactions::{ edu::{ DeviceListUpdateContent, DirectDeviceContent, Edu, PresenceContent, @@ -35,6 +38,8 @@ use ruma::{ serde::Raw, to_device::DeviceIdOrAllDevices, }; +use service::transaction_ids::{TxnKey, WrappedTransactionResponse}; +use tokio::sync::watch::{Receiver, Sender}; use crate::Ruma; @@ -44,15 +49,6 @@ type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); /// # `PUT /_matrix/federation/v1/send/{txnId}` /// /// Push EDUs and PDUs to this server. -#[tracing::instrument( - name = "txn", - level = "debug", - skip_all, - fields( - %client, - origin = body.origin().as_str() - ), -)] pub(crate) async fn send_transaction_message_route( State(services): State, InsecureClientIp(client): InsecureClientIp, @@ -64,6 +60,18 @@ pub(crate) async fn send_transaction_message_route( ))); } + let txn_key = (body.origin().to_owned(), body.transaction_id.clone()); + + // Did we already process this transaction + if let Some(response) = services.transaction_ids.get_cached_txn(&txn_key) { + return Ok(response); + } + // Or are currently processing it + if let Some(receiver) = services.transaction_ids.get_active_federation_txn(&txn_key) { + // Wait up to 50 seconds for a result + return wait_for_result(receiver).await; + } + if body.pdus.len() > PDU_LIMIT { return Err!(Request(Forbidden( "Not allowed to send more than {PDU_LIMIT} PDUs in one transaction" @@ -76,16 +84,48 @@ pub(crate) async fn send_transaction_message_route( ))); } - let txn_start_time = Instant::now(); - trace!( - pdus = body.pdus.len(), - edus = body.edus.len(), - elapsed = ?txn_start_time.elapsed(), - id = %body.transaction_id, - origin = %body.origin(), - "Starting txn", - ); + let sender = services + .transaction_ids + .start_federation_txn(txn_key.clone())?; + services.server.runtime().spawn(process_inbound_transaction( + services, + body, + client, + txn_key.clone(), + sender, + )); + let receiver = services + .transaction_ids + .get_active_federation_txn(&txn_key) + .expect("just-created transaction was missing"); + wait_for_result(receiver).await +} +async fn wait_for_result( + mut recv: Receiver, +) -> Result { + if tokio::time::timeout(Duration::from_secs(50), recv.changed()) + .await + .is_err() + { + // Took too long, return 429 to encourage the sender to try again + return Err(Error::BadRequest( + LimitExceeded { retry_after: None }, + "Transaction is being still being processed. Please try again later.", + )); + } + let value = recv.borrow_and_update(); + Ok(value.clone().expect("channel returned with no value?")) +} + +async fn process_inbound_transaction( + services: crate::State, + body: Ruma, + client: IpAddr, + txn_key: TxnKey, + sender: Sender>, +) { + let txn_start_time = Instant::now(); let pdus = body .pdus .iter() @@ -102,30 +142,53 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); - let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; - - debug!( + info!( + id = ?txn_key.1, + origin = ?txn_key.0, pdus = body.pdus.len(), edus = body.edus.len(), - elapsed = ?txn_start_time.elapsed(), - id = %body.transaction_id, - origin = %body.origin(), - "Finished txn", + "Processing transaction", ); + let Ok(results) = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await + else { + // TODO: handle this properly. The channel doesn't like being closed with no + // value, returning an empty response may lie to the remote and make them + // think we processed it properly (and lose events), but we also can't return + // an actual error. + panic!("failed to handle incoming transaction"); + }; + for (id, result) in &results { if let Err(e) = result { if matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) { - warn!("Incoming PDU failed {id}: {e:?}"); + debug_warn!("Incoming PDU failed {id}: {e:?}"); } } } - Ok(send_transaction_message::v1::Response { + info!( + id = ?txn_key.1, + origin = ?txn_key.0, + pdus = body.pdus.len(), + edus = body.edus.len(), + elapsed = ?txn_start_time.elapsed(), + "Finished processing transaction" + ); + + let response = send_transaction_message::v1::Response { pdus: results .into_iter() .map(|(e, r)| (e, r.map_err(error::sanitized_message))) .collect(), - }) + }; + services + .transaction_ids + .set_cached_txn(txn_key.clone(), response.clone()); + sender + .send(Some(response)) + .expect("couldn't send response to channel"); + services.transaction_ids.finish_federation_txn(&txn_key); + drop(sender); } async fn handle(