From 35e441452f6b2d69a4e7cd931cbc3f495b527010 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sat, 21 Feb 2026 19:33:43 +0000 Subject: [PATCH] feat: Attempt to build localised DAG before processing PDUs --- src/api/server/send.rs | 99 ++++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 28 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 756b1803..336df99d 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap, HashSet}, net::IpAddr, time::{Duration, Instant}, }; @@ -9,6 +9,7 @@ use axum_client_ip::InsecureClientIp; use conduwuit::{ Err, Error, Result, debug, debug_warn, err, error, result::LogErr, + state_res::lexicographical_topological_sort, trace, utils::{ IterStream, ReadyExt, millis_since_unix_epoch, @@ -22,7 +23,8 @@ use conduwuit_service::{ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use ruma::{ - CanonicalJsonObject, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, ServerName, UserId, + CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, + RoomId, ServerName, UserId, api::{ client::error::{ErrorKind, ErrorKind::LimitExceeded}, federation::transactions::{ @@ -35,8 +37,10 @@ use ruma::{ }, }, events::receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType}, + int, serde::Raw, to_device::DeviceIdOrAllDevices, + uint, }; use service::transaction_ids::{TxnKey, WrappedTransactionResponse}; use tokio::sync::watch::{Receiver, Sender}; @@ -151,8 +155,7 @@ async fn process_inbound_transaction( .stream(); debug!(pdus = body.pdus.len(), edus = body.edus.len(), "Processing transaction",); - let Ok(results) = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await - else { + let Ok(results) = handle(&services, &client, body.origin(), pdus, edus).await else { // TODO: handle this properly. The channel doesn't like being closed with no // value, returning an empty response may lie to the remote and make them // think we processed it properly (and lose events), but we also can't return @@ -197,7 +200,6 @@ async fn handle( services: &Services, client: &IpAddr, origin: &ServerName, - started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, ) -> Result { @@ -217,7 +219,7 @@ async fn handle( .into_iter() .try_stream() .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, started, room_id, pdus.into_iter()) + handle_room(services, client, origin, room_id, pdus.into_iter()) .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) }) @@ -234,11 +236,48 @@ async fn handle( Ok(results) } +/// Attempts to build a localised directed acyclic graph out of the given PDUs, +/// returning them in a topologically sorted order. +/// +/// This is used to attempt to process PDUs in an order that respects their +/// dependencies, however it is ultimately the sender's responsibility to send +/// them in a processable order, so this is just a best effort attempt. It does +/// not account for power levels or other tie breaks. +async fn build_local_dag( + pdu_map: &HashMap, +) -> Result> { + debug_assert!(pdu_map.len() >= 2, "needless call to build_local_dag with less than 2 PDUs"); + let mut dag: HashMap> = HashMap::new(); + + for (event_id, value) in pdu_map { + let prev_events = value + .get("prev_events") + .expect("pdu must have prev_events") + .as_array() + .expect("prev_events must be an array") + .iter() + .map(|v| { + OwnedEventId::parse(v.as_str().expect("prev_events values must be strings")) + .expect("prev_events must be valid event IDs") + }) + .collect::>(); + + dag.insert(event_id.clone(), prev_events); + } + lexicographical_topological_sort(&dag, &|_| async { + // Note: we don't bother fetching power levels because that would massively slow + // this function down. This is a best-effort attempt to order events correctly + // for processing, however ultimately that should be the sender's job. + Ok((int!(0), MilliSecondsSinceUnixEpoch(uint!(0)))) + }) + .await + .map_err(|e| err!("failed to resolve local graph: {e}")) +} + async fn handle_room( services: &Services, _client: &IpAddr, origin: &ServerName, - txn_start_time: Instant, room_id: OwnedRoomId, pdus: impl Iterator + Send, ) -> Result> { @@ -250,27 +289,31 @@ async fn handle_room( .await; let room_id = &room_id; - pdus.try_stream() - .and_then(|(_, event_id, value)| async move { - services.server.check_running()?; - let pdu_start_time = Instant::now(); - let result = services - .rooms - .event_handler - .handle_incoming_pdu(origin, room_id, &event_id, value, true) - .await - .map(|_| ()); - - debug!( - pdu_elapsed = ?pdu_start_time.elapsed(), - txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", - ); - - Ok((event_id, result)) - }) - .try_collect() - .await + let pdu_map: HashMap = pdus + .into_iter() + .map(|(_, event_id, value)| (event_id, value)) + .collect(); + let sorted_event_ids = if pdu_map.len() >= 2 { + build_local_dag(&pdu_map).await? + } else { + pdu_map.keys().cloned().collect() + }; + let mut results = Vec::with_capacity(sorted_event_ids.len()); + for event_id in sorted_event_ids { + let value = pdu_map + .get(&event_id) + .expect("sorted event IDs must be from the original map") + .clone(); + services.server.check_running()?; + let result = services + .rooms + .event_handler + .handle_incoming_pdu(origin, room_id, &event_id, value, true) + .await + .map(|_| ()); + results.push((event_id, result)); + } + Ok(results) } async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) {