fix: Redo the get_missing_events federation route

This commit is contained in:
timedout 2026-02-12 16:48:12 +00:00
parent 4e55e1ea90
commit b2ec13d342
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F

View file

@ -1,6 +1,9 @@
use std::collections::{HashSet, VecDeque};
use axum::extract::State; use axum::extract::State;
use conduwuit::{Err, Result, debug, debug_error, info, utils::to_canonical_object}; use conduwuit::{Err, Event, Result, debug, info, trace, utils::to_canonical_object, warn};
use ruma::api::federation::event::get_missing_events; use ruma::{OwnedEventId, api::federation::event::get_missing_events};
use serde_json::{json, value::RawValue};
use super::AccessCheck; use super::AccessCheck;
use crate::Ruma; use crate::Ruma;
@ -45,59 +48,72 @@ pub(crate) async fn get_missing_events_route(
.unwrap_or(LIMIT_DEFAULT) .unwrap_or(LIMIT_DEFAULT)
.min(LIMIT_MAX); .min(LIMIT_MAX);
let mut queued_events = body.latest_events.clone(); let room_version = services.rooms.state.get_room_version(&body.room_id).await?;
// the vec will never have more entries the limit
let mut events = Vec::with_capacity(limit);
let mut i: usize = 0; let mut queue: VecDeque<OwnedEventId> = VecDeque::from(body.latest_events.clone());
while i < queued_events.len() && events.len() < limit { let mut results: Vec<Box<RawValue>> = Vec::with_capacity(limit);
let Ok(pdu) = services.rooms.timeline.get_pdu(&queued_events[i]).await else { let mut seen: HashSet<OwnedEventId> = HashSet::from_iter(body.earliest_events.clone());
debug!(
body.origin = body.origin.as_ref().map(tracing::field::display), while let Some(next_event_id) = queue.pop_front() {
"Event {} does not exist locally, skipping", &queued_events[i] if seen.contains(&next_event_id) {
); trace!(%next_event_id, "already seen event, skipping");
i = i.saturating_add(1);
continue; continue;
}
if results.len() >= limit {
debug!(%next_event_id, "reached limit of events to return, breaking");
break;
}
let mut pdu = match services.rooms.timeline.get_pdu(&next_event_id).await {
| Ok(pdu) => pdu,
| Err(e) => {
warn!("could not find event {next_event_id} while walking missing events: {e}");
continue;
},
}; };
if pdu.room_id_or_hash() != body.room_id {
if body.earliest_events.contains(&queued_events[i]) { return Err!(Request(Unknown(
i = i.saturating_add(1); "Event {next_event_id} is not in room {}",
continue; body.room_id
)));
} }
if !services if !services
.rooms .rooms
.state_accessor .state_accessor
.server_can_see_event(body.origin(), &body.room_id, &queued_events[i]) .server_can_see_event(body.origin(), &body.room_id, pdu.event_id())
.await .await
{ {
debug!( debug!(%next_event_id, origin = %body.origin(), "redacting event origin cannot see");
body.origin = body.origin.as_ref().map(tracing::field::display), pdu.redact(&room_version, json!({}))?
"Server cannot see {:?} in {:?}, skipping", pdu.event_id, pdu.room_id
);
i = i.saturating_add(1);
continue;
} }
i = i.saturating_add(1); trace!(
let Ok(event) = to_canonical_object(&pdu) else { %next_event_id,
debug_error!( prev_events = ?pdu.prev_events().collect::<Vec<_>>(),
body.origin = body.origin.as_ref().map(tracing::field::display), "adding event to results and queueing prev events"
"Failed to convert PDU in database to canonical JSON: {pdu:?}" );
); queue.extend(pdu.prev_events.clone());
continue; seen.insert(next_event_id.clone());
}; results.push(
services
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned); .sending
.convert_to_outgoing_federation_event(to_canonical_object(pdu)?)
let event = services .await,
.sending );
.convert_to_outgoing_federation_event(event) trace!(
.await; %next_event_id,
queue_len = queue.len(),
queued_events.extend(prev_events); seen_len = seen.len(),
events.push(event); results_len = results.len(),
"event added to results"
)
} }
Ok(get_missing_events::v1::Response { events }) if !queue.is_empty() {
debug!("limit reached before queue was empty");
}
results.reverse(); // return oldest first
Ok(get_missing_events::v1::Response { events: results })
} }