From a1ad9f014435b14356accd1df673b8b2d9248a37 Mon Sep 17 00:00:00 2001 From: Ginger Date: Thu, 18 Dec 2025 12:46:42 -0500 Subject: [PATCH] feat(stitched): Initial algorithm implementation --- src/service/rooms/timeline/mod.rs | 1 + .../rooms/timeline/stitcher/algorithm.rs | 152 ++++++++++++++++++ src/service/rooms/timeline/stitcher/mod.rs | 110 +++++++++++++ 3 files changed, 263 insertions(+) create mode 100644 src/service/rooms/timeline/stitcher/algorithm.rs create mode 100644 src/service/rooms/timeline/stitcher/mod.rs diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index a35b502c..393143fd 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -4,6 +4,7 @@ mod build; mod create; mod data; mod redact; +mod stitcher; use std::{fmt::Write, sync::Arc}; diff --git a/src/service/rooms/timeline/stitcher/algorithm.rs b/src/service/rooms/timeline/stitcher/algorithm.rs new file mode 100644 index 00000000..c9ef73bc --- /dev/null +++ b/src/service/rooms/timeline/stitcher/algorithm.rs @@ -0,0 +1,152 @@ +use std::{ + cmp::Ordering, + collections::{BTreeSet, HashSet}, +}; + +use itertools::Itertools; +use ruma::{EventId, OwnedEventId}; + +use super::{Batch, Gap, OrderKey, StitchedItem, StitcherBackend}; + +/// Updates to a gap in the stitched order. +pub(super) struct GapUpdate<'id, K: OrderKey> { + /// The opaque key of the gap to update. + pub key: K, + /// The new contents of the gap. If this is empty, the gap should be + /// deleted. + pub gap: Gap, + /// New items to insert after the gap. These items _should not_ be + /// synchronized to clients. + pub inserted_items: Vec>, +} + +/// Updates to the stitched order. +pub(super) struct OrderUpdates<'id, K: OrderKey> { + /// Updates to individual gaps. The items inserted by these updates _should + /// not_ be synchronized to clients. + pub gap_updates: Vec>, + /// New items to append to the end of the order. These items _should_ be + /// synchronized to clients. + pub new_items: Vec>, +} + +pub(super) struct Stitcher<'backend, B: StitcherBackend> { + backend: &'backend B, +} + +impl Stitcher<'_, B> { + pub(super) fn new<'backend>(backend: &'backend B) -> Stitcher<'backend, B> { + Stitcher { backend } + } + + pub(super) fn stitch<'id>(&self, batch: Batch<'id>) -> OrderUpdates<'id, B::Key> { + let mut gap_updates = Vec::new(); + + let mut remaining_events: BTreeSet<&EventId> = batch.events().collect(); + + // 1: Find existing gaps which include IDs of events in `batch` + let matching_gaps = self.backend.find_matching_gaps(batch.events()); + + // Repeat steps 2-9 for each matching gap + for (key, mut gap) in matching_gaps { + // 2. Find events in `batch` which are mentioned in `gap` + let matching_events = remaining_events.iter().filter(|id| gap.contains(**id)); + + // 3. Create the to-insert list from the predecessor sets of each matching event + let events_to_insert: Vec<&'id EventId> = matching_events + .filter_map(|event| batch.predecessors(event)) + .flat_map(|predecessors| predecessors.predecessor_set.iter()) + .filter(|event| remaining_events.contains(*event)) + .copied() + .collect(); + + // 4. Remove the events in the to-insert list from `remaining_events` so they + // aren't processed again + remaining_events.retain(|event| !events_to_insert.contains(event)); + + // 5 and 6 + let inserted_items = self.sort_events_and_create_gaps(&batch, events_to_insert); + + // 8. Update gap + gap.retain(|id| !batch.contains(id)); + + // 7 and 9. Append to-insert list and delete gap if empty + // (the actual work of doing this is handled by the callee) + gap_updates.push(GapUpdate { key: key.clone(), gap, inserted_items }); + } + + // 10. Append remaining events and gaps + let new_items = self.sort_events_and_create_gaps(&batch, remaining_events); + + OrderUpdates { gap_updates, new_items } + } + + fn sort_events_and_create_gaps<'id>( + &self, + batch: &Batch<'id>, + events_to_insert: impl IntoIterator, + ) -> Vec> { + // 5. Sort the to-insert list with DAG;received order + let events_to_insert = events_to_insert + .into_iter() + .sorted_by(Self::compare_by_dag_received(batch)) + .collect_vec(); + + let mut items = Vec::with_capacity( + events_to_insert.capacity() + events_to_insert.capacity().div_euclid(2), + ); + + for event in events_to_insert.into_iter() { + let missing_prev_events: HashSet = batch + .predecessors(event) + .expect("events in to_insert should be in batch") + .prev_events + .iter() + .filter(|prev_event| { + !(batch.contains(prev_event) || self.backend.event_exists(prev_event)) + }) + .map(|id| OwnedEventId::from(*id)) + .collect(); + + if !missing_prev_events.is_empty() { + items.push(StitchedItem::Gap(missing_prev_events)); + } + + items.push(StitchedItem::Event(event)) + } + + items + } + + /// Compare two events by DAG;received order. + /// + /// If either event is in the other's predecessor set it comes first, + /// otherwise they are sorted by which comes first in the batch. + fn compare_by_dag_received<'id>( + batch: &Batch<'id>, + ) -> impl FnMut(&&'id EventId, &&'id EventId) -> Ordering { + |a, b| { + if batch + .predecessors(a) + .is_some_and(|it| it.predecessor_set.contains(b)) + { + Ordering::Greater + } else if batch + .predecessors(b) + .is_some_and(|it| it.predecessor_set.contains(a)) + { + Ordering::Less + } else { + for event in batch.events() { + if event == *a { + return Ordering::Greater; + } else if event == *b { + return Ordering::Less; + } + } + + panic!("neither {} nor {} in batch", a, b); + } + } + } +} diff --git a/src/service/rooms/timeline/stitcher/mod.rs b/src/service/rooms/timeline/stitcher/mod.rs new file mode 100644 index 00000000..66b55989 --- /dev/null +++ b/src/service/rooms/timeline/stitcher/mod.rs @@ -0,0 +1,110 @@ +use std::collections::{BTreeMap, HashSet}; + +use ruma::{EventId, OwnedEventId}; + +pub(super) mod algorithm; + +/// A gap in the stitched order. +pub(super) type Gap = HashSet; + +pub(super) enum StitchedItem<'id> { + Event(&'id EventId), + Gap(Gap), +} + +/// An opaque key returned by a [`StitcherBackend`] to identify an item in its +/// order. +pub(super) trait OrderKey: Eq + Clone {} + +pub(super) trait StitcherBackend { + type Key: OrderKey; + + fn find_matching_gaps<'a>( + &'a self, + events: impl Iterator, + ) -> impl Iterator; + + fn event_exists<'a>(&'a self, event: &'a EventId) -> bool; +} + +/// An ordered map from an event ID to its `prev_events`. +pub(super) type EventEdges<'id> = BTreeMap<&'id EventId, HashSet<&'id EventId>>; + +/// Information about the `prev_events` of an event. +/// This struct does not store the ID of the event itself. +struct EventPredecessors<'id> { + /// The `prev_events` of the event. + pub prev_events: HashSet<&'id EventId>, + /// The predecessor set of the event. This is a superset of + /// [`EventPredecessors::prev_events`]. See [`Batch::find_predecessor_set`] + /// for details. + pub predecessor_set: HashSet<&'id EventId>, +} + +pub(super) struct Batch<'id> { + events: BTreeMap<&'id EventId, EventPredecessors<'id>>, +} + +impl<'id> Batch<'id> { + pub(super) fn from_edges<'a>(edges: EventEdges<'a>) -> Batch<'a> { + let mut events = BTreeMap::new(); + + for (event, prev_events) in edges.iter() { + let predecessor_set = Self::find_predecessor_set(event, &edges); + + events.insert(*event, EventPredecessors { + prev_events: prev_events.clone(), + predecessor_set, + }); + } + + Batch { events } + } + + /// Build the predecessor set of `event` using `edges`. The predecessor set + /// is a subgraph of the room's DAG which may be thought of as a tree + /// rooted at `event` containing _only_ events which are included in + /// `edges`. It is represented as a set and not a proper tree structure for + /// efficiency. + fn find_predecessor_set<'a>( + event: &'a EventId, + edges: &EventEdges<'a>, + ) -> HashSet<&'a EventId> { + // The predecessor set which we are building. + let mut predecessor_set = HashSet::new(); + + // The queue of events to check for membership in `remaining_events`. + let mut events_to_check = vec![event]; + // Events which we have already checked and do not need to revisit. + let mut events_already_checked = HashSet::new(); + + while let Some(event) = events_to_check.pop() { + // Don't add this event to the queue again. + events_already_checked.insert(event); + + // If this event is in `edges`, add it to the predecessor set. + if let Some(children) = edges.get(event) { + predecessor_set.insert(event); + + // Also add all its `prev_events` to the queue. It's fine if some of them don't + // exist in `edges` because they'll just be discarded when they're popped + // off the queue. + events_to_check.extend( + children + .iter() + .filter(|event| !events_already_checked.contains(*event)), + ); + } + } + + predecessor_set + } + + fn events(&self) -> impl Iterator { self.events.keys().copied() } + + fn contains(&self, event: &'id EventId) -> bool { self.events.contains_key(event) } + + fn predecessors(&self, event: &EventId) -> Option<&EventPredecessors<'id>> { + self.events.get(event) + } +}