Compare commits
5 commits
main
...
nex/fix/in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18e4f787a2 | ||
|
|
d0c7dfe63a | ||
|
|
acc5aa3f25 | ||
|
|
b4a330f260 | ||
|
|
afa9678564 |
2 changed files with 96 additions and 26 deletions
|
|
@ -1,8 +1,9 @@
|
|||
use std::{cmp, collections::HashMap, future::ready};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Event, Pdu, Result, debug, debug_info, debug_warn, error, info,
|
||||
Err, Event, Pdu, Result, debug, debug_info, debug_warn, err, error, info,
|
||||
result::NotFound,
|
||||
trace,
|
||||
utils::{
|
||||
IterStream, ReadyExt,
|
||||
stream::{TryExpect, TryIgnore},
|
||||
|
|
@ -57,6 +58,7 @@ pub(crate) async fn migrations(services: &Services) -> Result<()> {
|
|||
}
|
||||
|
||||
async fn fresh(services: &Services) -> Result<()> {
|
||||
info!("Creating new fresh database");
|
||||
let db = &services.db;
|
||||
|
||||
services.globals.db.bump_database_version(DATABASE_VERSION);
|
||||
|
|
@ -66,11 +68,18 @@ async fn fresh(services: &Services) -> Result<()> {
|
|||
db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
|
||||
db["global"].insert(b"fix_referencedevents_missing_sep", []);
|
||||
db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
|
||||
db["global"].insert(b"fix_corrupt_msc4133_fields", []);
|
||||
db["global"].insert(b"populate_userroomid_leftstate_table", []);
|
||||
db["global"].insert(b"fix_local_invite_state", []);
|
||||
|
||||
// Create the admin room and server user on first run
|
||||
crate::admin::create_admin_room(services).boxed().await?;
|
||||
info!("Creating admin room and server user");
|
||||
crate::admin::create_admin_room(services)
|
||||
.boxed()
|
||||
.await
|
||||
.inspect_err(|e| error!("Failed to create admin room during db init: {e}"))?;
|
||||
|
||||
warn!("Created new RocksDB database with version {DATABASE_VERSION}");
|
||||
info!("Created new database with version {DATABASE_VERSION}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -88,19 +97,33 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
}
|
||||
|
||||
if services.globals.db.database_version().await < 12 {
|
||||
db_lt_12(services).await?;
|
||||
db_lt_12(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to run v12 migrations: {e}"))?;
|
||||
}
|
||||
|
||||
// This migration can be reused as-is anytime the server-default rules are
|
||||
// updated.
|
||||
if services.globals.db.database_version().await < 13 {
|
||||
db_lt_13(services).await?;
|
||||
db_lt_13(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to run v13 migrations: {e}"))?;
|
||||
}
|
||||
|
||||
if db["global"].get(b"feat_sha256_media").await.is_not_found() {
|
||||
media::migrations::migrate_sha256_media(services).await?;
|
||||
media::migrations::migrate_sha256_media(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to run SHA256 media migration: {e}"))?;
|
||||
} else if config.media_startup_check {
|
||||
media::migrations::checkup_sha256_media(services).await?;
|
||||
info!("Starting media startup integrity check.");
|
||||
let now = std::time::Instant::now();
|
||||
media::migrations::checkup_sha256_media(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to verify media integrity: {e}"))?;
|
||||
info!(
|
||||
"Finished media startup integrity check in {} seconds.",
|
||||
now.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
|
|
@ -108,7 +131,12 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
fix_bad_double_separator_in_state_cache(services).await?;
|
||||
info!("Running migration 'fix_bad_double_separator_in_state_cache'");
|
||||
fix_bad_double_separator_in_state_cache(services)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!("Failed to run 'fix_bad_double_separator_in_state_cache' migration: {e}")
|
||||
})?;
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
|
|
@ -116,7 +144,15 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
retroactively_fix_bad_data_from_roomuserid_joined(services).await?;
|
||||
info!("Running migration 'retroactively_fix_bad_data_from_roomuserid_joined'");
|
||||
retroactively_fix_bad_data_from_roomuserid_joined(services)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!(
|
||||
"Failed to run 'retroactively_fix_bad_data_from_roomuserid_joined' \
|
||||
migration: {e}"
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
|
|
@ -125,7 +161,12 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.is_not_found()
|
||||
|| services.globals.db.database_version().await < 17
|
||||
{
|
||||
fix_referencedevents_missing_sep(services).await?;
|
||||
info!("Running migration 'fix_referencedevents_missing_sep'");
|
||||
fix_referencedevents_missing_sep(services)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!("Failed to run 'fix_referencedevents_missing_sep' migration': {e}")
|
||||
})?;
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
|
|
@ -134,7 +175,12 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.is_not_found()
|
||||
|| services.globals.db.database_version().await < 17
|
||||
{
|
||||
fix_readreceiptid_readreceipt_duplicates(services).await?;
|
||||
info!("Running migration 'fix_readreceiptid_readreceipt_duplicates'");
|
||||
fix_readreceiptid_readreceipt_duplicates(services)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!("Failed to run 'fix_readreceiptid_readreceipt_duplicates' migration': {e}")
|
||||
})?;
|
||||
}
|
||||
|
||||
if services.globals.db.database_version().await < 17 {
|
||||
|
|
@ -147,7 +193,10 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
fix_corrupt_msc4133_fields(services).await?;
|
||||
info!("Running migration 'fix_corrupt_msc4133_fields'");
|
||||
fix_corrupt_msc4133_fields(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to run 'fix_corrupt_msc4133_fields' migration': {e}"))?;
|
||||
}
|
||||
|
||||
if services.globals.db.database_version().await < 18 {
|
||||
|
|
@ -160,7 +209,12 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
populate_userroomid_leftstate_table(services).await?;
|
||||
info!("Running migration 'populate_userroomid_leftstate_table'");
|
||||
populate_userroomid_leftstate_table(services)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
err!("Failed to run 'populate_userroomid_leftstate_table' migration': {e}")
|
||||
})?;
|
||||
}
|
||||
|
||||
if db["global"]
|
||||
|
|
@ -168,14 +222,17 @@ async fn migrate(services: &Services) -> Result<()> {
|
|||
.await
|
||||
.is_not_found()
|
||||
{
|
||||
fix_local_invite_state(services).await?;
|
||||
info!("Running migration 'fix_local_invite_state'");
|
||||
fix_local_invite_state(services)
|
||||
.await
|
||||
.map_err(|e| err!("Failed to run 'fix_local_invite_state' migration': {e}"))?;
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
services.globals.db.database_version().await,
|
||||
DATABASE_VERSION,
|
||||
"Failed asserting local database version {} is equal to known latest conduwuit database \
|
||||
version {}",
|
||||
"Failed asserting local database version {} is equal to known latest continuwuity \
|
||||
database version {}",
|
||||
services.globals.db.database_version().await,
|
||||
DATABASE_VERSION,
|
||||
);
|
||||
|
|
@ -370,7 +427,7 @@ async fn db_lt_13(services: &Services) -> Result<()> {
|
|||
}
|
||||
|
||||
async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result<()> {
|
||||
warn!("Fixing bad double separator in state_cache roomuserid_joined");
|
||||
info!("Fixing bad double separator in state_cache roomuserid_joined");
|
||||
|
||||
let db = &services.db;
|
||||
let roomuserid_joined = &db["roomuserid_joined"];
|
||||
|
|
@ -414,7 +471,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result<
|
|||
}
|
||||
|
||||
async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) -> Result<()> {
|
||||
warn!("Retroactively fixing bad data from broken roomuserid_joined");
|
||||
info!("Retroactively fixing bad data from broken roomuserid_joined");
|
||||
|
||||
let db = &services.db;
|
||||
let _cork = db.cork_and_sync();
|
||||
|
|
@ -504,7 +561,7 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
|
|||
}
|
||||
|
||||
async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
|
||||
warn!("Fixing missing record separator between room_id and event_id in referencedevents");
|
||||
info!("Fixing missing record separator between room_id and event_id in referencedevents");
|
||||
|
||||
let db = &services.db;
|
||||
let cork = db.cork_and_sync();
|
||||
|
|
@ -552,7 +609,7 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
|
|||
type ArrayId = ArrayString<MAX_BYTES>;
|
||||
type Key<'a> = (&'a RoomId, u64, &'a UserId);
|
||||
|
||||
warn!("Fixing undeleted entries in readreceiptid_readreceipt...");
|
||||
info!("Fixing undeleted entries in readreceiptid_readreceipt...");
|
||||
|
||||
let db = &services.db;
|
||||
let cork = db.cork_and_sync();
|
||||
|
|
@ -606,7 +663,7 @@ async fn fix_corrupt_msc4133_fields(services: &Services) -> Result {
|
|||
use serde_json::{Value, from_slice};
|
||||
type KeyVal<'a> = ((OwnedUserId, String), &'a [u8]);
|
||||
|
||||
warn!("Fixing corrupted `us.cloke.msc4175.tz` fields...");
|
||||
info!("Fixing corrupted `us.cloke.msc4175.tz` fields...");
|
||||
|
||||
let db = &services.db;
|
||||
let cork = db.cork_and_sync();
|
||||
|
|
@ -746,7 +803,18 @@ async fn fix_local_invite_state(services: &Services) -> Result {
|
|||
let fixed = userroomid_invitestate.stream()
|
||||
// if they're a local user on this homeserver
|
||||
.try_filter(|((user_id, _), _): &KeyVal<'_>| ready(services.globals.user_is_local(user_id)))
|
||||
.and_then(async |((user_id, room_id), stripped_state): KeyVal<'_>| Ok::<_, conduwuit::Error>((user_id.to_owned(), room_id.to_owned(), stripped_state.deserialize()?)))
|
||||
.and_then(async |((user_id, room_id), stripped_state): KeyVal<'_>| Ok::<_,
|
||||
conduwuit::Error>((user_id.to_owned(), room_id.to_owned(), stripped_state.deserialize
|
||||
().unwrap_or_else(|e| {
|
||||
trace!("Failed to deserialize: {:?}", stripped_state.json());
|
||||
warn!(
|
||||
%user_id,
|
||||
%room_id,
|
||||
"Failed to deserialize stripped state for invite, removing from db: {e}"
|
||||
);
|
||||
userroomid_invitestate.del((user_id, room_id));
|
||||
vec![]
|
||||
}))))
|
||||
.try_fold(0_usize, async |mut fixed, (user_id, room_id, stripped_state)| {
|
||||
// and their invite state is None
|
||||
if stripped_state.is_empty()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use std::{any::Any, collections::BTreeMap, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Result, Server, SyncRwLock, debug, debug_info, info, trace, utils::stream::IterStream,
|
||||
Result, Server, SyncRwLock, debug, debug_info, error, info, trace, utils::stream::IterStream,
|
||||
};
|
||||
use database::Database;
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
|
|
@ -125,10 +125,12 @@ impl Services {
|
|||
}
|
||||
|
||||
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
|
||||
debug_info!("Starting services...");
|
||||
info!("Starting services...");
|
||||
|
||||
self.admin.set_services(Some(Arc::clone(self)).as_ref());
|
||||
super::migrations::migrations(self).await?;
|
||||
super::migrations::migrations(self)
|
||||
.await
|
||||
.inspect_err(|e| error!("Migrations failed: {e}"))?;
|
||||
self.manager
|
||||
.lock()
|
||||
.await
|
||||
|
|
@ -147,7 +149,7 @@ impl Services {
|
|||
.await;
|
||||
}
|
||||
|
||||
debug_info!("Services startup complete.");
|
||||
info!("Services startup complete.");
|
||||
|
||||
Ok(Arc::clone(self))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue