feat(stitched): Initial algorithm implementation
This commit is contained in:
parent
c85e710760
commit
a1ad9f0144
3 changed files with 263 additions and 0 deletions
|
|
@ -4,6 +4,7 @@ mod build;
|
|||
mod create;
|
||||
mod data;
|
||||
mod redact;
|
||||
mod stitcher;
|
||||
|
||||
use std::{fmt::Write, sync::Arc};
|
||||
|
||||
|
|
|
|||
152
src/service/rooms/timeline/stitcher/algorithm.rs
Normal file
152
src/service/rooms/timeline/stitcher/algorithm.rs
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{BTreeSet, HashSet},
|
||||
};
|
||||
|
||||
use itertools::Itertools;
|
||||
use ruma::{EventId, OwnedEventId};
|
||||
|
||||
use super::{Batch, Gap, OrderKey, StitchedItem, StitcherBackend};
|
||||
|
||||
/// Updates to a gap in the stitched order.
|
||||
pub(super) struct GapUpdate<'id, K: OrderKey> {
|
||||
/// The opaque key of the gap to update.
|
||||
pub key: K,
|
||||
/// The new contents of the gap. If this is empty, the gap should be
|
||||
/// deleted.
|
||||
pub gap: Gap,
|
||||
/// New items to insert after the gap. These items _should not_ be
|
||||
/// synchronized to clients.
|
||||
pub inserted_items: Vec<StitchedItem<'id>>,
|
||||
}
|
||||
|
||||
/// Updates to the stitched order.
|
||||
pub(super) struct OrderUpdates<'id, K: OrderKey> {
|
||||
/// Updates to individual gaps. The items inserted by these updates _should
|
||||
/// not_ be synchronized to clients.
|
||||
pub gap_updates: Vec<GapUpdate<'id, K>>,
|
||||
/// New items to append to the end of the order. These items _should_ be
|
||||
/// synchronized to clients.
|
||||
pub new_items: Vec<StitchedItem<'id>>,
|
||||
}
|
||||
|
||||
pub(super) struct Stitcher<'backend, B: StitcherBackend> {
|
||||
backend: &'backend B,
|
||||
}
|
||||
|
||||
impl<B: StitcherBackend> Stitcher<'_, B> {
|
||||
pub(super) fn new<'backend>(backend: &'backend B) -> Stitcher<'backend, B> {
|
||||
Stitcher { backend }
|
||||
}
|
||||
|
||||
pub(super) fn stitch<'id>(&self, batch: Batch<'id>) -> OrderUpdates<'id, B::Key> {
|
||||
let mut gap_updates = Vec::new();
|
||||
|
||||
let mut remaining_events: BTreeSet<&EventId> = batch.events().collect();
|
||||
|
||||
// 1: Find existing gaps which include IDs of events in `batch`
|
||||
let matching_gaps = self.backend.find_matching_gaps(batch.events());
|
||||
|
||||
// Repeat steps 2-9 for each matching gap
|
||||
for (key, mut gap) in matching_gaps {
|
||||
// 2. Find events in `batch` which are mentioned in `gap`
|
||||
let matching_events = remaining_events.iter().filter(|id| gap.contains(**id));
|
||||
|
||||
// 3. Create the to-insert list from the predecessor sets of each matching event
|
||||
let events_to_insert: Vec<&'id EventId> = matching_events
|
||||
.filter_map(|event| batch.predecessors(event))
|
||||
.flat_map(|predecessors| predecessors.predecessor_set.iter())
|
||||
.filter(|event| remaining_events.contains(*event))
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
// 4. Remove the events in the to-insert list from `remaining_events` so they
|
||||
// aren't processed again
|
||||
remaining_events.retain(|event| !events_to_insert.contains(event));
|
||||
|
||||
// 5 and 6
|
||||
let inserted_items = self.sort_events_and_create_gaps(&batch, events_to_insert);
|
||||
|
||||
// 8. Update gap
|
||||
gap.retain(|id| !batch.contains(id));
|
||||
|
||||
// 7 and 9. Append to-insert list and delete gap if empty
|
||||
// (the actual work of doing this is handled by the callee)
|
||||
gap_updates.push(GapUpdate { key: key.clone(), gap, inserted_items });
|
||||
}
|
||||
|
||||
// 10. Append remaining events and gaps
|
||||
let new_items = self.sort_events_and_create_gaps(&batch, remaining_events);
|
||||
|
||||
OrderUpdates { gap_updates, new_items }
|
||||
}
|
||||
|
||||
fn sort_events_and_create_gaps<'id>(
|
||||
&self,
|
||||
batch: &Batch<'id>,
|
||||
events_to_insert: impl IntoIterator<Item = &'id EventId>,
|
||||
) -> Vec<StitchedItem<'id>> {
|
||||
// 5. Sort the to-insert list with DAG;received order
|
||||
let events_to_insert = events_to_insert
|
||||
.into_iter()
|
||||
.sorted_by(Self::compare_by_dag_received(batch))
|
||||
.collect_vec();
|
||||
|
||||
let mut items = Vec::with_capacity(
|
||||
events_to_insert.capacity() + events_to_insert.capacity().div_euclid(2),
|
||||
);
|
||||
|
||||
for event in events_to_insert.into_iter() {
|
||||
let missing_prev_events: HashSet<OwnedEventId> = batch
|
||||
.predecessors(event)
|
||||
.expect("events in to_insert should be in batch")
|
||||
.prev_events
|
||||
.iter()
|
||||
.filter(|prev_event| {
|
||||
!(batch.contains(prev_event) || self.backend.event_exists(prev_event))
|
||||
})
|
||||
.map(|id| OwnedEventId::from(*id))
|
||||
.collect();
|
||||
|
||||
if !missing_prev_events.is_empty() {
|
||||
items.push(StitchedItem::Gap(missing_prev_events));
|
||||
}
|
||||
|
||||
items.push(StitchedItem::Event(event))
|
||||
}
|
||||
|
||||
items
|
||||
}
|
||||
|
||||
/// Compare two events by DAG;received order.
|
||||
///
|
||||
/// If either event is in the other's predecessor set it comes first,
|
||||
/// otherwise they are sorted by which comes first in the batch.
|
||||
fn compare_by_dag_received<'id>(
|
||||
batch: &Batch<'id>,
|
||||
) -> impl FnMut(&&'id EventId, &&'id EventId) -> Ordering {
|
||||
|a, b| {
|
||||
if batch
|
||||
.predecessors(a)
|
||||
.is_some_and(|it| it.predecessor_set.contains(b))
|
||||
{
|
||||
Ordering::Greater
|
||||
} else if batch
|
||||
.predecessors(b)
|
||||
.is_some_and(|it| it.predecessor_set.contains(a))
|
||||
{
|
||||
Ordering::Less
|
||||
} else {
|
||||
for event in batch.events() {
|
||||
if event == *a {
|
||||
return Ordering::Greater;
|
||||
} else if event == *b {
|
||||
return Ordering::Less;
|
||||
}
|
||||
}
|
||||
|
||||
panic!("neither {} nor {} in batch", a, b);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
110
src/service/rooms/timeline/stitcher/mod.rs
Normal file
110
src/service/rooms/timeline/stitcher/mod.rs
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
use ruma::{EventId, OwnedEventId};
|
||||
|
||||
pub(super) mod algorithm;
|
||||
|
||||
/// A gap in the stitched order.
|
||||
pub(super) type Gap = HashSet<OwnedEventId>;
|
||||
|
||||
pub(super) enum StitchedItem<'id> {
|
||||
Event(&'id EventId),
|
||||
Gap(Gap),
|
||||
}
|
||||
|
||||
/// An opaque key returned by a [`StitcherBackend`] to identify an item in its
|
||||
/// order.
|
||||
pub(super) trait OrderKey: Eq + Clone {}
|
||||
|
||||
pub(super) trait StitcherBackend {
|
||||
type Key: OrderKey;
|
||||
|
||||
fn find_matching_gaps<'a>(
|
||||
&'a self,
|
||||
events: impl Iterator<Item = &'a EventId>,
|
||||
) -> impl Iterator<Item = (Self::Key, Gap)>;
|
||||
|
||||
fn event_exists<'a>(&'a self, event: &'a EventId) -> bool;
|
||||
}
|
||||
|
||||
/// An ordered map from an event ID to its `prev_events`.
|
||||
pub(super) type EventEdges<'id> = BTreeMap<&'id EventId, HashSet<&'id EventId>>;
|
||||
|
||||
/// Information about the `prev_events` of an event.
|
||||
/// This struct does not store the ID of the event itself.
|
||||
struct EventPredecessors<'id> {
|
||||
/// The `prev_events` of the event.
|
||||
pub prev_events: HashSet<&'id EventId>,
|
||||
/// The predecessor set of the event. This is a superset of
|
||||
/// [`EventPredecessors::prev_events`]. See [`Batch::find_predecessor_set`]
|
||||
/// for details.
|
||||
pub predecessor_set: HashSet<&'id EventId>,
|
||||
}
|
||||
|
||||
pub(super) struct Batch<'id> {
|
||||
events: BTreeMap<&'id EventId, EventPredecessors<'id>>,
|
||||
}
|
||||
|
||||
impl<'id> Batch<'id> {
|
||||
pub(super) fn from_edges<'a>(edges: EventEdges<'a>) -> Batch<'a> {
|
||||
let mut events = BTreeMap::new();
|
||||
|
||||
for (event, prev_events) in edges.iter() {
|
||||
let predecessor_set = Self::find_predecessor_set(event, &edges);
|
||||
|
||||
events.insert(*event, EventPredecessors {
|
||||
prev_events: prev_events.clone(),
|
||||
predecessor_set,
|
||||
});
|
||||
}
|
||||
|
||||
Batch { events }
|
||||
}
|
||||
|
||||
/// Build the predecessor set of `event` using `edges`. The predecessor set
|
||||
/// is a subgraph of the room's DAG which may be thought of as a tree
|
||||
/// rooted at `event` containing _only_ events which are included in
|
||||
/// `edges`. It is represented as a set and not a proper tree structure for
|
||||
/// efficiency.
|
||||
fn find_predecessor_set<'a>(
|
||||
event: &'a EventId,
|
||||
edges: &EventEdges<'a>,
|
||||
) -> HashSet<&'a EventId> {
|
||||
// The predecessor set which we are building.
|
||||
let mut predecessor_set = HashSet::new();
|
||||
|
||||
// The queue of events to check for membership in `remaining_events`.
|
||||
let mut events_to_check = vec![event];
|
||||
// Events which we have already checked and do not need to revisit.
|
||||
let mut events_already_checked = HashSet::new();
|
||||
|
||||
while let Some(event) = events_to_check.pop() {
|
||||
// Don't add this event to the queue again.
|
||||
events_already_checked.insert(event);
|
||||
|
||||
// If this event is in `edges`, add it to the predecessor set.
|
||||
if let Some(children) = edges.get(event) {
|
||||
predecessor_set.insert(event);
|
||||
|
||||
// Also add all its `prev_events` to the queue. It's fine if some of them don't
|
||||
// exist in `edges` because they'll just be discarded when they're popped
|
||||
// off the queue.
|
||||
events_to_check.extend(
|
||||
children
|
||||
.iter()
|
||||
.filter(|event| !events_already_checked.contains(*event)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
predecessor_set
|
||||
}
|
||||
|
||||
fn events(&self) -> impl Iterator<Item = &'id EventId> { self.events.keys().copied() }
|
||||
|
||||
fn contains(&self, event: &'id EventId) -> bool { self.events.contains_key(event) }
|
||||
|
||||
fn predecessors(&self, event: &EventId) -> Option<&EventPredecessors<'id>> {
|
||||
self.events.get(event)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue