diff --git a/src/service/migrations.rs b/src/service/migrations.rs index 2f462d31..d114ebfc 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -1,7 +1,7 @@ use std::cmp; use conduwuit::{ - Err, Result, debug, debug_info, debug_warn, error, info, + Err, Pdu, Result, debug, debug_info, debug_warn, error, info, pair_of, result::NotFound, utils::{ IterStream, ReadyExt, @@ -15,9 +15,11 @@ use itertools::Itertools; use ruma::{ OwnedUserId, RoomId, UserId, events::{ - GlobalAccountDataEventType, push_rules::PushRulesEvent, room::member::MembershipState, + GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent, + room::member::MembershipState, }, push::Ruleset, + serde::Raw, }; use crate::{Services, media}; @@ -152,6 +154,14 @@ async fn migrate(services: &Services) -> Result<()> { info!("Migration: Bumped database version to 18"); } + if db["global"] + .get(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER) + .await + .is_not_found() + { + populate_userroomid_leftstate_table(services).await?; + } + assert_eq!( services.globals.db.database_version().await, DATABASE_VERSION, @@ -628,3 +638,69 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result { db.db.sort()?; Ok(()) } + +const POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER: &'static str = + "populate_userroomid_leftstate_table"; +async fn populate_userroomid_leftstate_table(services: &Services) -> Result { + type KeyVal<'a> = (Key<'a>, Raw>); + type Key<'a> = (&'a UserId, &'a RoomId); + + let db = &services.db; + let cork = db.cork_and_sync(); + let userroomid_leftstate = db["userroomid_leftstate"].clone(); + + let (total, fixed) = userroomid_leftstate + .stream() + .try_fold( + (0_usize, 0_usize), + async |(mut total, mut fixed): pair_of!(usize), + ((user_id, room_id), state): KeyVal<'_>| + -> Result { + if matches!(state.deserialize(), Err(_)) { + let Ok(latest_shortstatehash) = + services.rooms.state.get_room_shortstatehash(room_id).await + else { + warn!(?room_id, ?user_id, "room has no shortstatehash"); + return Ok((total, fixed)); + }; + + let leave_state_event = services + .rooms + .state_accessor + .state_get( + latest_shortstatehash, + &StateEventType::RoomMember, + user_id.as_str(), + ) + .await; + + match leave_state_event { + | Ok(leave_state_event) => { + userroomid_leftstate.put((user_id, room_id), Json(leave_state_event)); + fixed = fixed.saturating_add(1); + }, + | Err(_) => { + warn!( + ?room_id, + ?user_id, + "room cached as left has no leave event for user, removing \ + cache entry" + ); + userroomid_leftstate.del((user_id, room_id)); + }, + } + } + + total = total.saturating_add(1); + Ok((total, fixed)) + }, + ) + .await?; + + drop(cork); + info!(?total, ?fixed, "Fixed entries in `userroomid_leftstate`."); + + db["global"].insert(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER, []); + db.db.sort()?; + Ok(()) +}