use std::{cmp, collections::HashMap, future::ready}; use conduwuit::{ Err, Event, Pdu, Result, debug, debug_info, debug_warn, error, info, result::NotFound, utils::{ IterStream, ReadyExt, stream::{TryExpect, TryIgnore}, }, warn, }; use database::Json; use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use ruma::{ OwnedRoomId, OwnedUserId, RoomId, UserId, events::{ AnyStrippedStateEvent, GlobalAccountDataEventType, StateEventType, push_rules::PushRulesEvent, room::member::{MembershipState, RoomMemberEventContent}, }, push::Ruleset, serde::Raw, }; use crate::{Services, media, rooms::short::ShortStateHash}; /// The current schema version. /// - If database is opened at greater version we reject with error. The /// software must be updated for backward-incompatible changes. /// - If database is opened at lesser version we apply migrations up to this. /// Note that named-feature migrations may also be performed when opening at /// equal or lesser version. These are expected to be backward-compatible. pub(crate) const DATABASE_VERSION: u64 = 18; pub(crate) async fn migrations(services: &Services) -> Result<()> { let users_count = services.users.count().await; // Matrix resource ownership is based on the server name; changing it // requires recreating the database from scratch. if users_count > 0 { let server_user = &services.globals.server_user; if !services.users.exists(server_user).await { error!("The {server_user} server user does not exist, and the database is not new."); return Err!(Database( "Cannot reuse an existing database after changing the server name, please \ delete the old one first.", )); } } if users_count > 0 { migrate(services).await } else { fresh(services).await } } async fn fresh(services: &Services) -> Result<()> { let db = &services.db; services.globals.db.bump_database_version(DATABASE_VERSION); db["global"].insert(b"feat_sha256_media", []); db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); db["global"].insert(b"fix_referencedevents_missing_sep", []); db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); // Create the admin room and server user on first run crate::admin::create_admin_room(services).boxed().await?; warn!("Created new RocksDB database with version {DATABASE_VERSION}"); Ok(()) } /// Apply any migrations async fn migrate(services: &Services) -> Result<()> { let db = &services.db; let config = &services.server.config; if services.globals.db.database_version().await < 11 { return Err!(Database( "Database schema version {} is no longer supported", services.globals.db.database_version().await )); } if services.globals.db.database_version().await < 12 { db_lt_12(services).await?; } // This migration can be reused as-is anytime the server-default rules are // updated. if services.globals.db.database_version().await < 13 { db_lt_13(services).await?; } if db["global"].get(b"feat_sha256_media").await.is_not_found() { media::migrations::migrate_sha256_media(services).await?; } else if config.media_startup_check { media::migrations::checkup_sha256_media(services).await?; } if db["global"] .get(b"fix_bad_double_separator_in_state_cache") .await .is_not_found() { fix_bad_double_separator_in_state_cache(services).await?; } if db["global"] .get(b"retroactively_fix_bad_data_from_roomuserid_joined") .await .is_not_found() { retroactively_fix_bad_data_from_roomuserid_joined(services).await?; } if db["global"] .get(b"fix_referencedevents_missing_sep") .await .is_not_found() || services.globals.db.database_version().await < 17 { fix_referencedevents_missing_sep(services).await?; } if db["global"] .get(b"fix_readreceiptid_readreceipt_duplicates") .await .is_not_found() || services.globals.db.database_version().await < 17 { fix_readreceiptid_readreceipt_duplicates(services).await?; } if services.globals.db.database_version().await < 17 { services.globals.db.bump_database_version(17); info!("Migration: Bumped database version to 17"); } if db["global"] .get(FIXED_CORRUPT_MSC4133_FIELDS_MARKER) .await .is_not_found() { fix_corrupt_msc4133_fields(services).await?; } if services.globals.db.database_version().await < 18 { services.globals.db.bump_database_version(18); info!("Migration: Bumped database version to 18"); } if db["global"] .get(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER) .await .is_not_found() { populate_userroomid_leftstate_table(services).await?; } if db["global"] .get(FIXED_LOCAL_INVITE_STATE_MARKER) .await .is_not_found() { fix_local_invite_state(services).await?; } assert_eq!( services.globals.db.database_version().await, DATABASE_VERSION, "Failed asserting local database version {} is equal to known latest conduwuit database \ version {}", services.globals.db.database_version().await, DATABASE_VERSION, ); { let patterns = services.globals.forbidden_usernames(); if !patterns.is_empty() { services .users .stream() .filter(|user_id| services.users.is_active_local(user_id)) .ready_for_each(|user_id| { let matches = patterns.matches(user_id.localpart()); if matches.matched_any() { warn!( "User {} matches the following forbidden username patterns: {}", user_id.to_string(), matches .into_iter() .map(|x| &patterns.patterns()[x]) .join(", ") ); } }) .await; } } { let patterns = services.globals.forbidden_alias_names(); if !patterns.is_empty() { for room_id in services .rooms .metadata .iter_ids() .map(ToOwned::to_owned) .collect::>() .await { services .rooms .alias .local_aliases_for_room(&room_id) .ready_for_each(|room_alias| { let matches = patterns.matches(room_alias.alias()); if matches.matched_any() { warn!( "Room with alias {} ({}) matches the following forbidden room \ name patterns: {}", room_alias, &room_id, matches .into_iter() .map(|x| &patterns.patterns()[x]) .join(", ") ); } }) .await; } } } info!("Loaded RocksDB database with schema version {DATABASE_VERSION}"); Ok(()) } async fn db_lt_12(services: &Services) -> Result<()> { for username in &services .users .list_local_users() .map(ToOwned::to_owned) .collect::>() .await { let user = match UserId::parse_with_server_name(username.as_str(), &services.server.name) { | Ok(u) => u, | Err(e) => { warn!("Invalid username {username}: {e}"); continue; }, }; let mut account_data: PushRulesEvent = services .account_data .get_global(&user, GlobalAccountDataEventType::PushRules) .await .expect("Username is invalid"); let rules_list = &mut account_data.content.global; //content rule { let content_rule_transformation = [".m.rules.contains_user_name", ".m.rule.contains_user_name"]; let rule = rules_list.content.get(content_rule_transformation[0]); if let Some(rule) = rule { let mut rule = rule.clone(); content_rule_transformation[1].clone_into(&mut rule.rule_id); rules_list .content .shift_remove(content_rule_transformation[0]); rules_list.content.insert(rule); } } //underride rules { let underride_rule_transformation = [ [".m.rules.call", ".m.rule.call"], [".m.rules.room_one_to_one", ".m.rule.room_one_to_one"], [".m.rules.encrypted_room_one_to_one", ".m.rule.encrypted_room_one_to_one"], [".m.rules.message", ".m.rule.message"], [".m.rules.encrypted", ".m.rule.encrypted"], ]; for transformation in underride_rule_transformation { let rule = rules_list.underride.get(transformation[0]); if let Some(rule) = rule { let mut rule = rule.clone(); transformation[1].clone_into(&mut rule.rule_id); rules_list.underride.shift_remove(transformation[0]); rules_list.underride.insert(rule); } } } services .account_data .update( None, &user, GlobalAccountDataEventType::PushRules.to_string().into(), &serde_json::to_value(account_data).expect("to json value always works"), ) .await?; } services.globals.db.bump_database_version(12); info!("Migration: 11 -> 12 finished"); Ok(()) } async fn db_lt_13(services: &Services) -> Result<()> { for username in &services .users .list_local_users() .map(ToOwned::to_owned) .collect::>() .await { let user = match UserId::parse_with_server_name(username.as_str(), &services.server.name) { | Ok(u) => u, | Err(e) => { warn!("Invalid username {username}: {e}"); continue; }, }; let mut account_data: PushRulesEvent = services .account_data .get_global(&user, GlobalAccountDataEventType::PushRules) .await .expect("Username is invalid"); let user_default_rules = Ruleset::server_default(&user); account_data .content .global .update_with_server_default(user_default_rules); services .account_data .update( None, &user, GlobalAccountDataEventType::PushRules.to_string().into(), &serde_json::to_value(account_data).expect("to json value always works"), ) .await?; } services.globals.db.bump_database_version(13); info!("Migration: 12 -> 13 finished"); Ok(()) } async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result<()> { warn!("Fixing bad double separator in state_cache roomuserid_joined"); let db = &services.db; let roomuserid_joined = &db["roomuserid_joined"]; let _cork = db.cork_and_sync(); let mut iter_count: usize = 0; roomuserid_joined .raw_stream() .ignore_err() .ready_for_each(|(key, value)| { let mut key = key.to_vec(); iter_count = iter_count.saturating_add(1); debug_info!(%iter_count); let first_sep_index = key .iter() .position(|&i| i == 0xFF) .expect("found 0xFF delim"); if key .iter() .get(first_sep_index..=first_sep_index.saturating_add(1)) .copied() .collect_vec() == vec![0xFF, 0xFF] { debug_warn!("Found bad key: {key:?}"); roomuserid_joined.remove(&key); key.remove(first_sep_index); debug_warn!("Fixed key: {key:?}"); roomuserid_joined.insert(&key, value); } }) .await; db.db.sort()?; db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); info!("Finished fixing"); Ok(()) } async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result<()> { warn!("Retroactively fixing bad data from broken roomuserid_joined"); let db = &services.db; let _cork = db.cork_and_sync(); let room_ids = services .rooms .metadata .iter_ids() .map(ToOwned::to_owned) .collect::>() .await; for room_id in &room_ids { debug_info!("Fixing room {room_id}"); let users_in_room: Vec = services .rooms .state_cache .room_members(room_id) .map(ToOwned::to_owned) .collect() .await; let joined_members = users_in_room .iter() .stream() .filter(|user_id| { services .rooms .state_accessor .get_member(room_id, user_id) .map(|member| { member.is_ok_and(|member| member.membership == MembershipState::Join) }) }) .collect::>() .await; let non_joined_members = users_in_room .iter() .stream() .filter(|user_id| { services .rooms .state_accessor .get_member(room_id, user_id) .map(|member| { member.is_ok_and(|member| member.membership == MembershipState::Join) }) }) .collect::>() .await; for user_id in &joined_members { debug_info!("User is joined, marking as joined"); services.rooms.state_cache.mark_as_joined(user_id, room_id); } for user_id in &non_joined_members { debug_info!("User is left or banned, marking as left"); services .rooms .state_cache .mark_as_left(user_id, room_id, None) .await; } } for room_id in &room_ids { debug_info!( "Updating joined count for room {room_id} to fix servers in room after correcting \ membership states" ); services .rooms .state_cache .update_joined_count(room_id) .await; } db.db.sort()?; db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); info!("Finished fixing"); Ok(()) } async fn fix_referencedevents_missing_sep(services: &Services) -> Result { warn!("Fixing missing record separator between room_id and event_id in referencedevents"); let db = &services.db; let cork = db.cork_and_sync(); let referencedevents = db["referencedevents"].clone(); let totals: (usize, usize) = (0, 0); let (total, fixed) = referencedevents .raw_stream() .expect_ok() .enumerate() .ready_fold(totals, |mut a, (i, (key, val))| { debug_assert!(val.is_empty(), "expected no value"); let has_sep = key.contains(&database::SEP); if !has_sep { let key_str = std::str::from_utf8(key).expect("key not utf-8"); let room_id_len = key_str.find('$').expect("missing '$' in key"); let (room_id, event_id) = key_str.split_at(room_id_len); debug!(?a, "fixing {room_id}, {event_id}"); let new_key = (room_id, event_id); referencedevents.put_raw(new_key, val); referencedevents.remove(key); } a.0 = cmp::max(i, a.0); a.1 = a.1.saturating_add((!has_sep).into()); a }) .await; drop(cork); info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'."); db["global"].insert(b"fix_referencedevents_missing_sep", []); db.db.sort() } async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result { use conduwuit::arrayvec::ArrayString; use ruma::identifiers_validation::MAX_BYTES; type ArrayId = ArrayString; type Key<'a> = (&'a RoomId, u64, &'a UserId); warn!("Fixing undeleted entries in readreceiptid_readreceipt..."); let db = &services.db; let cork = db.cork_and_sync(); let readreceiptid_readreceipt = db["readreceiptid_readreceipt"].clone(); let mut cur_room: Option = None; let mut cur_user: Option = None; let (mut total, mut fixed): (usize, usize) = (0, 0); readreceiptid_readreceipt .keys() .expect_ok() .ready_for_each(|key: Key<'_>| { let (room_id, _, user_id) = key; let last_room = cur_room.replace( room_id .as_str() .try_into() .expect("invalid room_id in database"), ); let last_user = cur_user.replace( user_id .as_str() .try_into() .expect("invalid user_id in database"), ); let is_dup = cur_room == last_room && cur_user == last_user; if is_dup { readreceiptid_readreceipt.del(key); } fixed = fixed.saturating_add(is_dup.into()); total = total.saturating_add(1); }) .await; drop(cork); info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt."); db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); db.db.sort() } const FIXED_CORRUPT_MSC4133_FIELDS_MARKER: &[u8] = b"fix_corrupt_msc4133_fields"; async fn fix_corrupt_msc4133_fields(services: &Services) -> Result { // Due to an old bug, some conduwuit databases have `us.cloke.msc4175.tz` user // profile fields with raw strings instead of quoted JSON ones. // This migration fixes that. use serde_json::{Value, from_slice}; type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]); warn!("Fixing corrupted `us.cloke.msc4175.tz` fields..."); let db = &services.db; let cork = db.cork_and_sync(); let useridprofilekey_value = db["useridprofilekey_value"].clone(); let (total, fixed) = useridprofilekey_value .stream() .try_fold( (0_usize, 0_usize), async |(mut total, mut fixed), ((user, key), value): KeyVal<'_>| -> Result<(usize, usize)> { match from_slice::(value) { // corrupted timezone field | Err(_) if key == "us.cloke.msc4175.tz" => { let new_value = Value::String(String::from_utf8(value.to_vec())?); useridprofilekey_value.put((user, key), Json(new_value)); fixed = fixed.saturating_add(1); }, // corrupted value for some other key | Err(error) => { warn!( "deleting MSC4133 key {} for user {} due to deserialization \ failure: {}", key, user, error ); useridprofilekey_value.del((user, key)); }, // other key with no issues | Ok(_) => { // do nothing }, } total = total.saturating_add(1); Ok((total, fixed)) }, ) .await?; drop(cork); info!(?total, ?fixed, "Fixed corrupted `us.cloke.msc4175.tz` fields."); db["global"].insert(FIXED_CORRUPT_MSC4133_FIELDS_MARKER, []); db.db.sort()?; Ok(()) } const POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER: &str = "populate_userroomid_leftstate_table"; async fn populate_userroomid_leftstate_table(services: &Services) -> Result { type KeyVal<'a> = (Key<'a>, Raw>); type Key<'a> = (&'a UserId, &'a RoomId); let db = &services.db; let cork = db.cork_and_sync(); let userroomid_leftstate = db["userroomid_leftstate"].clone(); let (total, fixed, _) = userroomid_leftstate .stream() .try_fold( (0_usize, 0_usize, HashMap::::new()), async |(mut total, mut fixed, mut shortstatehash_cache): ( usize, usize, HashMap<_, _>, ), ((user_id, room_id), state): KeyVal<'_>| -> Result<(usize, usize, HashMap<_, _>)> { if state.deserialize().is_err() { let latest_shortstatehash = if let Some(shortstatehash) = shortstatehash_cache.get(room_id) { *shortstatehash } else if let Ok(shortstatehash) = services.rooms.state.get_room_shortstatehash(room_id).await { shortstatehash_cache.insert(room_id.to_owned(), shortstatehash); shortstatehash } else { warn!(%room_id, %user_id, "room has no shortstatehash"); return Ok((total, fixed, shortstatehash_cache)); }; let leave_state_event = services .rooms .state_accessor .state_get( latest_shortstatehash, &StateEventType::RoomMember, user_id.as_str(), ) .await; match leave_state_event { | Ok(leave_state_event) => { userroomid_leftstate.put((user_id, room_id), Json(leave_state_event)); fixed = fixed.saturating_add(1); }, | Err(_) => { warn!( %room_id, %user_id, "room cached as left has no leave event for user, removing \ cache entry" ); userroomid_leftstate.del((user_id, room_id)); }, } } total = total.saturating_add(1); Ok((total, fixed, shortstatehash_cache)) }, ) .await?; drop(cork); info!(?total, ?fixed, "Fixed entries in `userroomid_leftstate`."); db["global"].insert(POPULATED_USERROOMID_LEFTSTATE_TABLE_MARKER, []); db.db.sort()?; Ok(()) } const FIXED_LOCAL_INVITE_STATE_MARKER: &str = "fix_local_invite_state"; async fn fix_local_invite_state(services: &Services) -> Result { // Clean up the effects of !1249 by caching stripped state for invites type KeyVal<'a> = (Key<'a>, Raw>); type Key<'a> = (&'a UserId, &'a RoomId); let db = &services.db; let cork = db.cork_and_sync(); let userroomid_invitestate = services.db["userroomid_invitestate"].clone(); // for each user invited to a room let fixed = userroomid_invitestate.stream() // if they're a local user on this homeserver .try_filter(|((user_id, _), _): &KeyVal<'_>| ready(services.globals.user_is_local(user_id))) .and_then(async |((user_id, room_id), stripped_state): KeyVal<'_>| Ok::<_, conduwuit::Error>((user_id.to_owned(), room_id.to_owned(), stripped_state.deserialize()?))) .try_fold(0_usize, async |mut fixed, (user_id, room_id, stripped_state)| { // and their invite state is None if stripped_state.is_empty() // and they are actually invited to the room && let Ok(membership_event) = services.rooms.state_accessor.room_state_get(&room_id, &StateEventType::RoomMember, user_id.as_str()).await && membership_event.get_content::().is_ok_and(|content| content.membership == MembershipState::Invite) // and the invite was sent by a local user && services.globals.user_is_local(&membership_event.sender) { // build and save stripped state for their invite in the database let stripped_state = services.rooms.state.summary_stripped(&membership_event, &room_id).await; userroomid_invitestate.put((&user_id, &room_id), Json(stripped_state)); fixed = fixed.saturating_add(1); } Ok(fixed) }) .await?; drop(cork); info!(?fixed, "Fixed local invite state cache entries."); db["global"].insert(FIXED_LOCAL_INVITE_STATE_MARKER, []); db.db.sort()?; Ok(()) }