From b2ec13d34251f1768d98670c4f954b45eb12ce99 Mon Sep 17 00:00:00 2001 From: timedout Date: Thu, 12 Feb 2026 16:48:12 +0000 Subject: [PATCH] fix: Redo the get_missing_events federation route --- src/api/server/get_missing_events.rs | 102 ++++++++++++++++----------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/src/api/server/get_missing_events.rs b/src/api/server/get_missing_events.rs index 79940374..135e6c75 100644 --- a/src/api/server/get_missing_events.rs +++ b/src/api/server/get_missing_events.rs @@ -1,6 +1,9 @@ +use std::collections::{HashSet, VecDeque}; + use axum::extract::State; -use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object}; -use ruma::api::federation::event::get_missing_events; +use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn}; +use ruma::{OwnedEventId, api::federation::event::get_missing_events}; +use serde_json::{json, value::RawValue}; use super::AccessCheck; use crate::Ruma; @@ -45,59 +48,72 @@ pub(crate) async fn get_missing_events_route( .unwrap_or(LIMIT_DEFAULT) .min(LIMIT_MAX); - let mut queued_events = body.latest_events.clone(); - // the vec will never have more entries the limit - let mut events = Vec::with_capacity(limit); + let room_version = services.rooms.state.get_room_version(&body.room_id).await?; - let mut i: usize = 0; - while i < queued_events.len() && events.len() < limit { - let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else { - debug!( - body.origin = body.origin.as_ref().map(tracing::field::display), - "Event {} does not exist locally, skipping", &queued_events[i] - ); - i = i.saturating_add(1); + let mut queue: VecDeque = VecDeque::from(body.latest_events.clone()); + let mut results: Vec> = Vec::with_capacity(limit); + let mut seen: HashSet = HashSet::from_iter(body.earliest_events.clone()); + + while let Some(next_event_id) = queue.pop_front() { + if seen.contains(&next_event_id) { + trace!(%next_event_id, "already seen event, skipping"); continue; + } + + if results.len() >= limit { + debug!(%next_event_id, "reached limit of events to return, breaking"); + break; + } + + let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await { + | Ok(pdu) => pdu, + | Err(e) => { + warn!("could not find event {next_event_id} while walking missing events: {e}"); + continue; + }, }; - - if body.earliest_events.contains(&queued_events[i]) { - i = i.saturating_add(1); - continue; + if pdu.room_id_or_hash() != body.room_id { + return Err!(Request(Unknown( + "Event {next_event_id} is not in room {}", + body.room_id + ))); } if !services .rooms .state_accessor - .server_can_see_event(body.origin(), &body.room_id, &queued_events[i]) + .server_can_see_event(body.origin(), &body.room_id, pdu.event_id()) .await { - debug!( - body.origin = body.origin.as_ref().map(tracing::field::display), - "Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id - ); - i = i.saturating_add(1); - continue; + debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see"); + pdu.redact(&room_version, json!({}))? } - i = i.saturating_add(1); - let Ok(event) = to_canonical_object(&pdu) else { - debug_error!( - body.origin = body.origin.as_ref().map(tracing::field::display), - "Failed to convert PDU in database to canonical JSON: {pdu:?}" - ); - continue; - }; - - let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned); - - let event = services - .sending - .convert_to_outgoing_federation_event(event) - .await; - - queued_events.extend(prev_events); - events.push(event); + trace!( + %next_event_id, + prev_events = ?pdu.prev_events().collect::>(), + "adding event to results and queueing prev events" + ); + queue.extend(pdu.prev_events.clone()); + seen.insert(next_event_id.clone()); + results.push( + services + .sending + .convert_to_outgoing_federation_event(to_canonical_object(pdu)?) + .await, + ); + trace!( + %next_event_id, + queue_len = queue.len(), + seen_len = seen.len(), + results_len = results.len(), + "event added to results" + ) } - Ok(get_missing_events::v1::Response { events }) + if !queue.is_empty() { + debug!("limit reached before queue was empty"); + } + results.reverse(); // return oldest first + Ok(get_missing_events::v1::Response { events: results }) }