fix(sync/v3): Cache shortstatehashes to speed up migration
This commit is contained in:
parent
9a27bccc8e
commit
f71cfd18a5
1 changed files with 25 additions and 15 deletions
|
|
@ -1,7 +1,7 @@
|
|||
use std::cmp;
|
||||
use std::{cmp, collections::HashMap};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Pdu, Result, debug, debug_info, debug_warn, error, info, pair_of,
|
||||
Err, Pdu, Result, debug, debug_info, debug_warn, error, info,
|
||||
result::NotFound,
|
||||
utils::{
|
||||
IterStream, ReadyExt,
|
||||
|
|
@ -13,7 +13,7 @@ use database::Json;
|
|||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use itertools::Itertools;
|
||||
use ruma::{
|
||||
OwnedUserId, RoomId, UserId,
|
||||
OwnedRoomId, OwnedUserId, RoomId, UserId,
|
||||
events::{
|
||||
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
|
||||
room::member::MembershipState,
|
||||
|
|
@ -22,7 +22,7 @@ use ruma::{
|
|||
serde::Raw,
|
||||
};
|
||||
|
||||
use crate::{Services, media};
|
||||
use crate::{Services, media, rooms::short::ShortStateHash};
|
||||
|
||||
/// The current schema version.
|
||||
/// - If database is opened at greater version we reject with error. The
|
||||
|
|
@ -649,20 +649,30 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
|
|||
let cork = db.cork_and_sync();
|
||||
let userroomid_leftstate = db["userroomid_leftstate"].clone();
|
||||
|
||||
let (total, fixed) = userroomid_leftstate
|
||||
let (total, fixed, _) = userroomid_leftstate
|
||||
.stream()
|
||||
.try_fold(
|
||||
(0_usize, 0_usize),
|
||||
async |(mut total, mut fixed): pair_of!(usize),
|
||||
(0_usize, 0_usize, HashMap::<OwnedRoomId, ShortStateHash>::new()),
|
||||
async |(mut total, mut fixed, mut shortstatehash_cache): (
|
||||
usize,
|
||||
usize,
|
||||
HashMap<_, _>,
|
||||
),
|
||||
((user_id, room_id), state): KeyVal<'_>|
|
||||
-> Result<pair_of!(usize)> {
|
||||
-> Result<(usize, usize, HashMap<_, _>)> {
|
||||
if matches!(state.deserialize(), Err(_)) {
|
||||
let Ok(latest_shortstatehash) =
|
||||
services.rooms.state.get_room_shortstatehash(room_id).await
|
||||
else {
|
||||
warn!(?room_id, ?user_id, "room has no shortstatehash");
|
||||
return Ok((total, fixed));
|
||||
};
|
||||
let latest_shortstatehash =
|
||||
if let Some(shortstatehash) = shortstatehash_cache.get(room_id) {
|
||||
*shortstatehash
|
||||
} else if let Ok(shortstatehash) =
|
||||
services.rooms.state.get_room_shortstatehash(room_id).await
|
||||
{
|
||||
shortstatehash_cache.insert(room_id.to_owned(), shortstatehash);
|
||||
shortstatehash
|
||||
} else {
|
||||
warn!(?room_id, ?user_id, "room has no shortstatehash");
|
||||
return Ok((total, fixed, shortstatehash_cache));
|
||||
};
|
||||
|
||||
let leave_state_event = services
|
||||
.rooms
|
||||
|
|
@ -692,7 +702,7 @@ async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
|
|||
}
|
||||
|
||||
total = total.saturating_add(1);
|
||||
Ok((total, fixed))
|
||||
Ok((total, fixed, shortstatehash_cache))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue