diff --git a/Cargo.lock b/Cargo.lock index e256c5ba..504848b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1209,6 +1209,7 @@ dependencies = [ "hickory-resolver", "http", "image", + "indexmap", "ipaddress", "itertools 0.14.0", "ldap3", diff --git a/Cargo.toml b/Cargo.toml index 38def82f..997a51e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -548,6 +548,10 @@ features = ["sync", "tls-rustls", "rustls-provider"] [workspace.dependencies.resolv-conf] version = "0.7.5" +# Used by stitched ordering +[workspace.dependencies.indexmap] +version = "2.13.0" + # # Patches # diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index d24b8a22..26f45f58 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -118,6 +118,7 @@ webpage.optional = true blurhash.workspace = true blurhash.optional = true recaptcha-verify = { version = "0.1.5", default-features = false } +indexmap.workspace = true [target.'cfg(all(unix, target_os = "linux"))'.dependencies] sd-notify.workspace = true diff --git a/src/service/rooms/timeline/stitcher/algorithm.rs b/src/service/rooms/timeline/stitcher/algorithm.rs index e4db7e3b..13f0a9be 100644 --- a/src/service/rooms/timeline/stitcher/algorithm.rs +++ b/src/service/rooms/timeline/stitcher/algorithm.rs @@ -1,8 +1,9 @@ use std::{ cmp::Ordering, - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, }; +use indexmap::IndexSet; use itertools::Itertools; use super::{Batch, Gap, OrderKey, StitchedItem, StitcherBackend}; @@ -40,8 +41,9 @@ impl Stitcher<'_, B> { pub(super) fn stitch<'id>(&self, batch: Batch<'id>) -> OrderUpdates<'id, B::Key> { let mut gap_updates = Vec::new(); + let mut all_new_events: HashSet<&'id str> = HashSet::new(); - let mut remaining_events: BTreeSet<_> = batch.events().collect(); + let mut remaining_events: IndexSet<_> = batch.events().collect(); // 1: Find existing gaps which include IDs of events in `batch` let matching_gaps = self.backend.find_matching_gaps(batch.events()); @@ -59,12 +61,15 @@ impl Stitcher<'_, B> { .copied() .collect(); + all_new_events.extend(events_to_insert.iter()); + // 4. Remove the events in the to-insert list from `remaining_events` so they // aren't processed again remaining_events.retain(|event| !events_to_insert.contains(event)); // 5 and 6 - let inserted_items = self.sort_events_and_create_gaps(&batch, events_to_insert); + let inserted_items = + self.sort_events_and_create_gaps(&batch, &all_new_events, events_to_insert); // 8. Update gap gap.retain(|id| !batch.contains(id)); @@ -75,7 +80,10 @@ impl Stitcher<'_, B> { } // 10. Append remaining events and gaps - let new_items = self.sort_events_and_create_gaps(&batch, remaining_events); + + all_new_events.extend(remaining_events.iter()); + let new_items = + self.sort_events_and_create_gaps(&batch, &all_new_events, remaining_events); OrderUpdates { gap_updates, new_items } } @@ -83,12 +91,13 @@ impl Stitcher<'_, B> { fn sort_events_and_create_gaps<'id>( &self, batch: &Batch<'id>, + all_new_events: &HashSet<&'id str>, events_to_insert: impl IntoIterator, ) -> Vec> { // 5. Sort the to-insert list with DAG;received order let events_to_insert = events_to_insert .into_iter() - .sorted_by(Self::compare_by_dag_received(batch)) + .sorted_by(batch.compare_by_dag_received()) .collect_vec(); let mut items = Vec::with_capacity( @@ -102,7 +111,9 @@ impl Stitcher<'_, B> { .prev_events .iter() .filter(|prev_event| { - !(batch.contains(prev_event) || self.backend.event_exists(prev_event)) + !(batch.contains(prev_event) + || all_new_events.contains(*prev_event) + || self.backend.event_exists(prev_event)) }) .map(|id| String::from(*id)) .collect(); @@ -116,36 +127,4 @@ impl Stitcher<'_, B> { items } - - /// Compare two events by DAG;received order. - /// - /// If either event is in the other's predecessor set it comes first, - /// otherwise they are sorted by which comes first in the batch. - fn compare_by_dag_received<'id>( - batch: &Batch<'id>, - ) -> impl FnMut(&&'id str, &&'id str) -> Ordering { - |a, b| { - if batch - .predecessors(a) - .is_some_and(|it| it.predecessor_set.contains(b)) - { - Ordering::Greater - } else if batch - .predecessors(b) - .is_some_and(|it| it.predecessor_set.contains(a)) - { - Ordering::Less - } else { - for event in batch.events() { - if event == *a { - return Ordering::Greater; - } else if event == *b { - return Ordering::Less; - } - } - - panic!("neither {a} nor {b} in batch"); - } - } - } } diff --git a/src/service/rooms/timeline/stitcher/mod.rs b/src/service/rooms/timeline/stitcher/mod.rs index 9b9e43cf..471d1357 100644 --- a/src/service/rooms/timeline/stitcher/mod.rs +++ b/src/service/rooms/timeline/stitcher/mod.rs @@ -1,4 +1,9 @@ -use std::collections::{BTreeMap, HashSet}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashSet}, +}; + +use indexmap::IndexMap; pub(super) mod algorithm; #[cfg(test)] @@ -33,10 +38,11 @@ pub(super) trait StitcherBackend { } /// An ordered map from an event ID to its `prev_events`. -pub(super) type EventEdges<'id> = BTreeMap<&'id str, HashSet<&'id str>>; +pub(super) type EventEdges<'id> = IndexMap<&'id str, HashSet<&'id str>>; /// Information about the `prev_events` of an event. /// This struct does not store the ID of the event itself. +#[derive(Debug)] struct EventPredecessors<'id> { /// The `prev_events` of the event. pub prev_events: HashSet<&'id str>, @@ -46,13 +52,14 @@ struct EventPredecessors<'id> { pub predecessor_set: HashSet<&'id str>, } +#[derive(Debug)] pub(super) struct Batch<'id> { - events: BTreeMap<&'id str, EventPredecessors<'id>>, + events: IndexMap<&'id str, EventPredecessors<'id>>, } impl<'id> Batch<'id> { pub(super) fn from_edges(edges: EventEdges<'_>) -> Batch<'_> { - let mut events = BTreeMap::new(); + let mut events = IndexMap::new(); for (event, prev_events) in &edges { let predecessor_set = Self::find_predecessor_set(event, &edges); @@ -109,4 +116,29 @@ impl<'id> Batch<'id> { fn predecessors(&self, event: &str) -> Option<&EventPredecessors<'id>> { self.events.get(event) } + + /// Compare two events by DAG;received order. + /// + /// If either event is in the other's predecessor set it comes first, + /// otherwise they are sorted by which comes first in the batch. + fn compare_by_dag_received(&self) -> impl FnMut(&&'id str, &&'id str) -> Ordering { + |a, b| { + if self + .predecessors(a) + .is_some_and(|it| it.predecessor_set.contains(b)) + { + Ordering::Greater + } else if self + .predecessors(b) + .is_some_and(|it| it.predecessor_set.contains(a)) + { + Ordering::Less + } else { + let a_index = self.events.get_index_of(a).expect("a should be in events"); + let b_index = self.events.get_index_of(b).expect("b should be in events"); + + a_index.cmp(&b_index) + } + } + } } diff --git a/src/service/rooms/timeline/stitcher/test/mod.rs b/src/service/rooms/timeline/stitcher/test/mod.rs index 47c5de32..e40b37ed 100644 --- a/src/service/rooms/timeline/stitcher/test/mod.rs +++ b/src/service/rooms/timeline/stitcher/test/mod.rs @@ -23,14 +23,21 @@ impl<'id> TestStitcherBackend<'id> { panic!("bad update key {}", update.key); }; - let insertion_index; - if update.gap.is_empty() { + let insertion_index = if update.gap.is_empty() { self.items.remove(gap_index); - insertion_index = gap_index; + gap_index } else { - self.items[gap_index] = (self.next_id(), StitchedItem::Gap(update.gap)); - insertion_index = gap_index + 1; - } + match self.items.get_mut(gap_index) { + | Some((_, StitchedItem::Gap(gap))) => { + *gap = update.gap; + }, + | Some((key, other)) => { + panic!("expected item with key {key} to be a gap, it was {other:?}"); + }, + | None => unreachable!("we just checked that this index is valid"), + } + gap_index + 1 + }; let to_insert: Vec<_> = update .inserted_items @@ -88,12 +95,20 @@ impl StitcherBackend for TestStitcherBackend<'_> { fn run_testcase(testcase: parser::TestCase<'_>) { let mut backend = TestStitcherBackend::default(); - for phase in testcase { + for (index, phase) in testcase.into_iter().enumerate() { let stitcher = Stitcher::new(&backend); let batch = Batch::from_edges(phase.batch); let updates = stitcher.stitch(batch); - println!("updates to make: {:?}", updates); + println!(); + println!("===== phase {index}"); + println!("expected new items: {:?}", &phase.order.new_items); + println!(" actual new items: {:?}", &updates.new_items); + for update in &updates.gap_updates { + println!("update to gap {}:", update.key); + println!(" new gap contents: {:?}", update.gap); + println!(" new items: {:?}", update.inserted_items); + } for (expected, actual) in phase .order @@ -103,20 +118,17 @@ fn run_testcase(testcase: parser::TestCase<'_>) { { assert_eq!( expected, actual, - "bad new item, expected {:?} but got {:?}", - expected, actual + "bad new item, expected {expected:?} but got {actual:?}" ); } - backend.extend(updates); - println!("ordering: {:?}", backend.items); + backend.extend(updates); for (expected, actual) in phase.order.iter().zip_eq(backend.iter()) { assert_eq!( expected, actual, - "bad item in order, expected {:?} but got {:?}", - expected, actual + "bad item in order, expected {expected:?} but got {actual:?}", ); } diff --git a/src/service/rooms/timeline/stitcher/test/parser.rs b/src/service/rooms/timeline/stitcher/test/parser.rs index bd914c6f..961e0273 100644 --- a/src/service/rooms/timeline/stitcher/test/parser.rs +++ b/src/service/rooms/timeline/stitcher/test/parser.rs @@ -1,5 +1,7 @@ use std::collections::{BTreeMap, HashSet}; +use indexmap::IndexMap; + use crate::rooms::timeline::stitcher::{StitchedItem, test}; pub(super) type TestEventId<'id> = &'id str; @@ -31,7 +33,7 @@ pub(super) struct Phase<'id> { pub updated_gaps: Option>>, } -pub(super) type Batch<'id> = BTreeMap, HashSet>>; +pub(super) type Batch<'id> = IndexMap, HashSet>>; pub(super) struct Order<'id> { pub inserted_items: Vec>, @@ -83,7 +85,7 @@ peg::parser! { represents a _single_ event `A` with two prev events, `B` and `C`. */ events.into_iter() - .fold(BTreeMap::new(), |mut batch: Batch<'_>, (id, prev_event)| { + .fold(IndexMap::new(), |mut batch: Batch<'_>, (id, prev_event)| { // Find the prev events set of this event in the batch. // If it doesn't exist, make a new empty one. let mut prev_events = batch.entry(id).or_default(); diff --git a/src/service/rooms/timeline/stitcher/test/testcases/zzz-being_before_a_gap_item_beats_being_after_an_existing_item_multiple.stitched b/src/service/rooms/timeline/stitcher/test/testcases/zzz-being_before_a_gap_item_beats_being_after_an_existing_item_multiple.stitched index 37dfbd59..aa6a775c 100644 --- a/src/service/rooms/timeline/stitcher/test/testcases/zzz-being_before_a_gap_item_beats_being_after_an_existing_item_multiple.stitched +++ b/src/service/rooms/timeline/stitcher/test/testcases/zzz-being_before_a_gap_item_beats_being_after_an_existing_item_multiple.stitched @@ -6,10 +6,10 @@ B --> A C --> G2 === then we arrange into this order === -G1* -A -B +A* +B* -G2* -C +C* === when we receive these events === # When we receive F, which is a predecessor of both G1 and G2 F diff --git a/src/service/rooms/timeline/stitcher/test/testcases/zzz-partially_filling_a_gap_with_two_events.stitched b/src/service/rooms/timeline/stitcher/test/testcases/zzz-partially_filling_a_gap_with_two_events.stitched index 81e2fba8..a1f38267 100644 --- a/src/service/rooms/timeline/stitcher/test/testcases/zzz-partially_filling_a_gap_with_two_events.stitched +++ b/src/service/rooms/timeline/stitcher/test/testcases/zzz-partially_filling_a_gap_with_two_events.stitched @@ -8,7 +8,7 @@ F --> E === then we arrange into this order === A* -B,C,D,E* -F +F* === when we receive these events === # When we provide some of the missing events