fix(sync/v3): Implement a migration for the userroomid_leftstate table
This commit is contained in:
parent
800ac8d1f1
commit
16f37d21ff
1 changed files with 78 additions and 2 deletions
|
|
@ -1,7 +1,7 @@
|
|||
use std::cmp;
|
||||
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, debug_warn, error, info,
|
||||
Err, Pdu, Result, debug, debug_info, debug_warn, error, info, pair_of,
|
||||
result::NotFound,
|
||||
utils::{
|
||||
IterStream, ReadyExt,
|
||||
|
|
@ -15,9 +15,11 @@ use itertools::Itertools;
|
|||
use ruma::{
|
||||
OwnedUserId, RoomId, UserId,
|
||||
events::{
|
||||
GlobalAccountDataEventType, push_rules::PushRulesEvent, room::member::MembershipState,
|
||||
GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent,
|
||||
room::member::MembershipState,
|
||||
},
|
||||
push::Ruleset,
|
||||
serde::Raw,
|
||||
};
|
||||
|
||||
use crate::{Services, media};
|
||||
|
|
@ -152,6 +154,14 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
info!("Migration: Bumped database version to 18");
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
.get(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER)
|
||||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
populate_userroomid_leftstate_table(services).await?;
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
services.globals.db.database_version().await,
|
||||
DATABASE_VERSION,
|
||||
|
|
@ -628,3 +638,69 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
|
|||
db.db.sort()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER: &'static str =
|
||||
"populate_userroomid_leftstate_table";
|
||||
async fn populate_userroomid_leftstate_table(services: &Services) -> Result {
|
||||
type KeyVal<'a> = (Key<'a>, Raw<Option<Pdu>>);
|
||||
type Key<'a> = (&'a UserId, &'a RoomId);
|
||||
|
||||
let db = &services.db;
|
||||
let cork = db.cork_and_sync();
|
||||
let userroomid_leftstate = db["userroomid_leftstate"].clone();
|
||||
|
||||
let (total, fixed) = userroomid_leftstate
|
||||
.stream()
|
||||
.try_fold(
|
||||
(0_usize, 0_usize),
|
||||
async |(mut total, mut fixed): pair_of!(usize),
|
||||
((user_id, room_id), state): KeyVal<'_>|
|
||||
-> Result<pair_of!(usize)> {
|
||||
if matches!(state.deserialize(), Err(_)) {
|
||||
let Ok(latest_shortstatehash) =
|
||||
services.rooms.state.get_room_shortstatehash(room_id).await
|
||||
else {
|
||||
warn!(?room_id, ?user_id, "room has no shortstatehash");
|
||||
return Ok((total, fixed));
|
||||
};
|
||||
|
||||
let leave_state_event = services
|
||||
.rooms
|
||||
.state_accessor
|
||||
.state_get(
|
||||
latest_shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
user_id.as_str(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match leave_state_event {
|
||||
| Ok(leave_state_event) => {
|
||||
userroomid_leftstate.put((user_id, room_id), Json(leave_state_event));
|
||||
fixed = fixed.saturating_add(1);
|
||||
},
|
||||
| Err(_) => {
|
||||
warn!(
|
||||
?room_id,
|
||||
?user_id,
|
||||
"room cached as left has no leave event for user, removing \
|
||||
cache entry"
|
||||
);
|
||||
userroomid_leftstate.del((user_id, room_id));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
total = total.saturating_add(1);
|
||||
Ok((total, fixed))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(cork);
|
||||
info!(?total, ?fixed, "Fixed entries in `userroomid_leftstate`.");
|
||||
|
||||
db["global"].insert(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER, []);
|
||||
db.db.sort()?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue