diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 7f5fb502..3b42c89e 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -21,6 +21,7 @@ use conduwuit_service::{ sending::{EDU_LIMIT, PDU_LIMIT}, }; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use http::StatusCode; use itertools::Itertools; use ruma::{ CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, @@ -42,7 +43,9 @@ use ruma::{ to_device::DeviceIdOrAllDevices, uint, }; -use service::transaction_ids::{FederationTxnState, TxnKey, WrappedTransactionResponse}; +use service::transaction_ids::{ + FederationTxnState, TransactionError, TxnKey, WrappedTransactionResponse, +}; use tokio::sync::watch::{Receiver, Sender}; use tracing::instrument; @@ -118,7 +121,15 @@ async fn wait_for_result( )); } let value = recv.borrow_and_update(); - Ok(value.clone().expect("channel returned with no value?")) + match value.clone() { + | Some(Ok(response)) => Ok(response), + | Some(Err(err)) => Err(transaction_error_to_response(&err)), + | None => Err(Error::Request( + ErrorKind::Unknown, + "Transaction processing failed unexpectedly".into(), + StatusCode::INTERNAL_SERVER_ERROR, + )), + } } #[instrument( @@ -133,7 +144,7 @@ async fn process_inbound_transaction( body: Ruma, client: IpAddr, txn_key: TxnKey, - sender: Sender>, + sender: Sender, ) { let txn_start_time = Instant::now(); let pdus = body @@ -153,14 +164,12 @@ async fn process_inbound_transaction( .stream(); debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",); - let Ok(results) = handle(&services, &client, body.origin(), pdus, edus).await else { - // TODO: handle this properly. The channel doesn't like being closed with no - // value, returning an empty response may lie to the remote and make them - // think we processed it properly (and lose events), but we also can't return - // an actual error. - drop(sender); - services.transaction_ids.remove_federation_txn(&txn_key); - panic!("failed to handle incoming transaction"); + let results = match handle(&services, &client, body.origin(), pdus, edus).await { + | Ok(results) => results, + | Err(err) => { + fail_federation_txn(services, &txn_key, &sender, err); + return; + }, }; for (id, result) in &results { @@ -190,13 +199,43 @@ async fn process_inbound_transaction( .finish_federation_txn(txn_key, sender, response); } +/// Handles a failed federation transaction by sending the error through +/// the channel and cleaning up the transaction state. This allows waiters to +/// receive an appropriate error response. +fn fail_federation_txn( + services: crate::State, + txn_key: &TxnKey, + sender: &Sender, + err: TransactionError, +) { + debug!("Transaction failed: {err}"); + + // Remove from active state so the transaction can be retried + services.transaction_ids.remove_federation_txn(txn_key); + + // Send the error to any waiters + sender + .send(Some(Err(err))) + .expect("couldn't send error to channel"); +} + +/// Converts a TransactionError into an appropriate HTTP error response. +fn transaction_error_to_response(err: &TransactionError) -> Error { + match err { + | TransactionError::ShuttingDown => Error::Request( + ErrorKind::Unknown, + "Server is shutting down, please retry later".into(), + StatusCode::SERVICE_UNAVAILABLE, + ), + } +} async fn handle( services: &Services, client: &IpAddr, origin: &ServerName, pdus: impl Stream + Send, edus: impl Stream + Send, -) -> Result { +) -> std::result::Result { // group pdus by room let pdus = pdus .collect() @@ -274,7 +313,7 @@ async fn handle_room( origin: &ServerName, room_id: OwnedRoomId, pdus: impl Iterator + Send, -) -> Result> { +) -> std::result::Result, TransactionError> { let _room_lock = services .rooms .event_handler @@ -287,8 +326,14 @@ async fn handle_room( .into_iter() .map(|(_, event_id, value)| (event_id, value)) .collect(); + // Try to sort PDUs by their dependencies, but fall back to arbitrary order on + // failure (e.g., cycles). This is best-effort; proper ordering is the sender's + // responsibility. let sorted_event_ids = if pdu_map.len() >= 2 { - build_local_dag(&pdu_map).await? + build_local_dag(&pdu_map).await.unwrap_or_else(|e| { + debug_warn!("Failed to build local DAG for room {room_id}: {e}"); + pdu_map.keys().cloned().collect() + }) } else { pdu_map.keys().cloned().collect() }; @@ -298,7 +343,10 @@ async fn handle_room( .get(&event_id) .expect("sorted event IDs must be from the original map") .clone(); - services.server.check_running()?; + services + .server + .check_running() + .map_err(|_| TransactionError::ShuttingDown)?; let result = services .rooms .event_handler diff --git a/src/service/transaction_ids/mod.rs b/src/service/transaction_ids/mod.rs index 35b83121..bdef1a6b 100644 --- a/src/service/transaction_ids/mod.rs +++ b/src/service/transaction_ids/mod.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + fmt, sync::{ Arc, atomic::{AtomicU64, Ordering}, @@ -22,7 +23,26 @@ use tokio::sync::watch::{Receiver, Sender}; use crate::{Dep, config}; pub type TxnKey = (OwnedServerName, OwnedTransactionId); -pub type WrappedTransactionResponse = Option; +pub type WrappedTransactionResponse = + Option>; + +/// Errors that can occur during federation transaction processing. +#[derive(Debug, Clone)] +pub enum TransactionError { + /// Server is shutting down - the sender should retry the entire + /// transaction. + ShuttingDown, +} + +impl fmt::Display for TransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + | Self::ShuttingDown => write!(f, "Server is shutting down"), + } + } +} + +impl std::error::Error for TransactionError {} /// Minimum interval between cache cleanup runs. /// Exists to prevent thrashing when the cache is full of things that can't be @@ -201,7 +221,7 @@ impl Service { pub fn finish_federation_txn( &self, key: TxnKey, - sender: Sender>, + sender: Sender, response: send_transaction_message::v1::Response, ) { // Check if cleanup might be needed before acquiring the lock @@ -220,7 +240,7 @@ impl Service { ); sender - .send(Some(response)) + .send(Some(Ok(response))) .expect("couldn't send response to channel"); // explicitly close