Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Ginger
77ae79396f
feat: Use OptimisticTransactionDB for the database engine 2025-12-16 09:49:13 -05:00
10 changed files with 18 additions and 54 deletions

View file

@ -1224,12 +1224,6 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub rocksdb_repair: bool, pub rocksdb_repair: bool,
#[serde(default)]
pub rocksdb_read_only: bool,
#[serde(default)]
pub rocksdb_secondary: bool,
/// Enables idle CPU priority for compaction thread. This is not enabled by /// Enables idle CPU priority for compaction thread. This is not enabled by
/// default to prevent compaction from falling too far behind on busy /// default to prevent compaction from falling too far behind on busy
/// systems. /// systems.

View file

@ -19,7 +19,7 @@ use std::{
use conduwuit::{Err, Result, debug, info, warn}; use conduwuit::{Err, Result, debug, info, warn};
use rocksdb::{ use rocksdb::{
AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded, AsColumnFamilyRef, BoundColumnFamily, DBCommon, MultiThreaded, OptimisticTransactionDB,
WaitForCompactOptions, WaitForCompactOptions,
}; };
@ -33,13 +33,11 @@ pub struct Engine {
pub(crate) db: Db, pub(crate) db: Db,
pub(crate) pool: Arc<Pool>, pub(crate) pool: Arc<Pool>,
pub(crate) ctx: Arc<Context>, pub(crate) ctx: Arc<Context>,
pub(super) read_only: bool,
pub(super) secondary: bool,
pub(crate) checksums: bool, pub(crate) checksums: bool,
corks: AtomicU32, corks: AtomicU32,
} }
pub(crate) type Db = DBWithThreadMode<MultiThreaded>; pub(crate) type Db = OptimisticTransactionDB<MultiThreaded>;
impl Engine { impl Engine {
#[tracing::instrument( #[tracing::instrument(
@ -129,14 +127,6 @@ impl Engine {
sequence sequence
} }
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.secondary || self.read_only }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.secondary }
} }
impl Drop for Engine { impl Drop for Engine {

View file

@ -12,9 +12,8 @@ pub fn backup(&self) -> Result {
let mut engine = self.backup_engine()?; let mut engine = self.backup_engine()?;
let config = &self.ctx.server.config; let config = &self.ctx.server.config;
if config.database_backups_to_keep > 0 { if config.database_backups_to_keep > 0 {
let flush = !self.is_read_only();
engine engine
.create_new_backup_flush(&self.db, flush) .create_new_backup_flush(&self.db, true)
.map_err(map_err)?; .map_err(map_err)?;
let engine_info = engine.get_backup_info(); let engine_info = engine.get_backup_info();

View file

@ -1,7 +1,7 @@
use std::fmt::Write; use std::fmt::Write;
use conduwuit::{Result, implement}; use conduwuit::{Result, implement};
use rocksdb::perf::get_memory_usage_stats; use rocksdb::perf::MemoryUsageBuilder;
use super::Engine; use super::Engine;
use crate::or_else; use crate::or_else;
@ -9,16 +9,21 @@ use crate::or_else;
#[implement(Engine)] #[implement(Engine)]
pub fn memory_usage(&self) -> Result<String> { pub fn memory_usage(&self) -> Result<String> {
let mut res = String::new(); let mut res = String::new();
let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()]))
.or_else(or_else)?; let mut builder = MemoryUsageBuilder::new().or_else(or_else)?;
builder.add_db(&self.db);
builder.add_cache(&self.ctx.row_cache.lock());
let usage = builder.build().or_else(or_else)?;
let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0; let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0;
writeln!( writeln!(
res, res,
"Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \ "Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \
cache: {:.2} MiB", cache: {:.2} MiB",
mibs(stats.mem_table_total), mibs(usage.approximate_mem_table_total()),
mibs(stats.mem_table_unflushed), mibs(usage.approximate_mem_table_unflushed()),
mibs(stats.mem_table_readers_total), mibs(usage.approximate_mem_table_readers_total()),
mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?), mibs(u64::try_from(self.ctx.row_cache.lock().get_usage())?),
)?; )?;

View file

@ -35,14 +35,7 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
} }
debug!("Opening database..."); debug!("Opening database...");
let db = if config.rocksdb_read_only { let db = Db::open_cf_descriptors(&db_opts, path, cfds).or_else(or_else)?;
Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false)
} else if config.rocksdb_secondary {
Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds)
} else {
Db::open_cf_descriptors(&db_opts, path, cfds)
}
.or_else(or_else)?;
info!( info!(
columns = num_cfds, columns = num_cfds,
@ -55,8 +48,6 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
db, db,
pool: ctx.pool.clone(), pool: ctx.pool.clone(),
ctx: ctx.clone(), ctx: ctx.clone(),
read_only: config.rocksdb_read_only,
secondary: config.rocksdb_secondary,
checksums: config.rocksdb_checksums, checksums: config.rocksdb_checksums,
corks: AtomicU32::new(0), corks: AtomicU32::new(0),
})) }))

View file

@ -219,7 +219,7 @@ where
K: AsRef<[u8]> + Sized + Debug + 'a, K: AsRef<[u8]> + Sized + Debug + 'a,
V: AsRef<[u8]> + Sized + 'a, V: AsRef<[u8]> + Sized + 'a,
{ {
let mut batch = WriteBatchWithTransaction::<false>::default(); let mut batch = WriteBatchWithTransaction::<true>::default();
for (key, val) in iter { for (key, val) in iter {
batch.put_cf(&self.cf(), key.as_ref(), val.as_ref()); batch.put_cf(&self.cf(), key.as_ref(), val.as_ref());
} }

View file

@ -77,14 +77,6 @@ impl Database {
#[inline] #[inline]
pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() } pub fn keys(&self) -> impl Iterator<Item = &MapsKey> + Send + '_ { self.maps.keys() }
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
} }
impl Index<&str> for Database { impl Index<&str> for Database {

View file

@ -37,10 +37,6 @@ impl crate::Service for Service {
} }
async fn worker(self: Arc<Self>) -> Result { async fn worker(self: Arc<Self>) -> Result {
if self.services.globals.is_read_only() {
return Ok(());
}
if self.services.config.ldap.enable { if self.services.config.ldap.enable {
warn!("emergency password feature not available with LDAP enabled."); warn!("emergency password feature not available with LDAP enabled.");
return Ok(()); return Ok(());

View file

@ -171,7 +171,4 @@ impl Service {
pub fn server_is_ours(&self, server_name: &ServerName) -> bool { pub fn server_is_ours(&self, server_name: &ServerName) -> bool {
server_name == self.server_name() server_name == self.server_name()
} }
#[inline]
pub fn is_read_only(&self) -> bool { self.db.db.is_read_only() }
} }

View file

@ -130,7 +130,7 @@ impl Services {
// reset dormant online/away statuses to offline, and set the server user as // reset dormant online/away statuses to offline, and set the server user as
// online // online
if self.server.config.allow_local_presence && !self.db.is_read_only() { if self.server.config.allow_local_presence {
self.presence.unset_all_presence().await; self.presence.unset_all_presence().await;
_ = self _ = self
.presence .presence
@ -146,7 +146,7 @@ impl Services {
info!("Shutting down services..."); info!("Shutting down services...");
// set the server user as offline // set the server user as offline
if self.server.config.allow_local_presence && !self.db.is_read_only() { if self.server.config.allow_local_presence {
_ = self _ = self
.presence .presence
.ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline) .ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)