Compare commits

...
Sign in to create a new pull request.

16 commits

Author SHA1 Message Date
Jacob Taylor
eb29c9eeb0 fix: Clear All Clippy Lints 2026-03-18 16:08:37 -07:00
Jacob Taylor
955db1e634 fix: Pre-Commit Lint Compliance Maneuver 2026-03-18 16:08:37 -07:00
timedout
245cb0e064 fix: Write-lock individual rooms when building sync for them 2026-03-18 16:08:37 -07:00
Jacob Taylor
7480cf842c fix warn 2026-03-18 16:08:37 -07:00
Joe Citrine
f47b562905 fix(resolver): port parser does not handle leading ':'
conditionally trim the leading colon
2026-03-18 16:08:37 -07:00
Jacob Taylor
07d7e15485 upgrade some logs to info 2026-03-18 16:08:37 -07:00
Jade Ellis
214d07214d fix: Use to_str methods on room IDs 2026-03-18 16:08:37 -07:00
Jade Ellis
e286bcf733 chore: cleanup 2026-03-18 16:08:37 -07:00
Jade Ellis
71ed8eadad chore: Fix more complicated clippy warnings 2026-03-18 16:08:37 -07:00
Jade Ellis
c948e58905 feat: Add command to purge sync tokens for empty rooms 2026-03-18 16:08:37 -07:00
Jade Ellis
3f178d7d85 feat: Add admin command to delete sync tokens from a room 2026-03-18 16:08:37 -07:00
Jacob Taylor
f0ffe860d6 exponential backoff is now just bees. did you want bees? no? well you have them now. congrats 2026-03-18 16:08:37 -07:00
Jacob Taylor
b6e88150b2 sender_workers scaling. this time, with feeling! 2026-03-18 16:08:37 -07:00
Jacob Taylor
2d9ad1bab0 log when there are zero extremities 2026-03-18 16:08:37 -07:00
Jacob Taylor
23584050e8 enable converged 6g at the edge in continuwuity 2026-03-18 16:08:37 -07:00
Jacob Taylor
5129803262 bump the number of allowed immutable memtables by 1, to allow for greater flood protection
this should probably not be applied if you have rocksdb_atomic_flush = false (the default)
2026-03-18 16:08:37 -07:00
14 changed files with 338 additions and 44 deletions

View file

@ -1783,11 +1783,9 @@
#stream_amplification = 1024 #stream_amplification = 1024
# Number of sender task workers; determines sender parallelism. Default is # Number of sender task workers; determines sender parallelism. Default is
# '0' which means the value is determined internally, likely matching the # core count. Override by setting a different value.
# number of tokio worker-threads or number of cores, etc. Override by
# setting a non-zero value.
# #
#sender_workers = 0 #sender_workers = core count
# Enables listener sockets; can be set to false to disable listening. This # Enables listener sockets; can be set to false to disable listening. This
# option is intended for developer/diagnostic purposes only. # option is intended for developer/diagnostic purposes only.

View file

@ -1,6 +1,6 @@
use conduwuit::{Err, Result}; use conduwuit::{Err, Result};
use futures::StreamExt; use futures::StreamExt;
use ruma::OwnedRoomId; use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
use crate::{PAGE_SIZE, admin_command, get_room_info}; use crate::{PAGE_SIZE, admin_command, get_room_info};
@ -82,3 +82,185 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
self.write_str(&format!("{result}")).await self.write_str(&format!("{result}")).await
} }
#[admin_command]
pub(super) async fn purge_sync_tokens(&self, room: OwnedRoomOrAliasId) -> Result {
// Resolve the room ID from the room or alias ID
let room_id = self.services.rooms.alias.resolve(&room).await?;
// Delete all tokens for this room using the service method
let Ok(deleted_count) = self.services.rooms.user.delete_room_tokens(&room_id).await else {
return Err!("Failed to delete sync tokens for room {}", room_id.as_str());
};
self.write_str(&format!(
"Successfully deleted {deleted_count} sync tokens for room {}",
room_id.as_str()
))
.await
}
/// Target options for room purging
#[derive(Default, Debug, clap::ValueEnum, Clone)]
pub enum RoomTargetOption {
#[default]
/// Target all rooms
All,
/// Target only disabled rooms
DisabledOnly,
/// Target only banned rooms
BannedOnly,
}
#[admin_command]
pub(super) async fn purge_all_sync_tokens(
&self,
target_option: Option<RoomTargetOption>,
execute: bool,
) -> Result {
use conduwuit::{debug, info};
let mode = if !execute { "Simulating" } else { "Starting" };
// strictly, we should check if these reach the max value after the loop and
// warn the user that the count is too large
let mut total_rooms_checked: usize = 0;
let mut total_tokens_deleted: usize = 0;
let mut error_count: u32 = 0;
let mut skipped_rooms: usize = 0;
info!("{} purge of sync tokens", mode);
// Get all rooms in the server
let all_rooms = self
.services
.rooms
.metadata
.iter_ids()
.collect::<Vec<_>>()
.await;
info!("Found {} rooms total on the server", all_rooms.len());
// Filter rooms based on options
let mut rooms = Vec::new();
for room_id in all_rooms {
if let Some(target) = &target_option {
match target {
| RoomTargetOption::DisabledOnly => {
if !self.services.rooms.metadata.is_disabled(room_id).await {
debug!("Skipping room {} as it's not disabled", room_id.as_str());
skipped_rooms = skipped_rooms.saturating_add(1);
continue;
}
},
| RoomTargetOption::BannedOnly => {
if !self.services.rooms.metadata.is_banned(room_id).await {
debug!("Skipping room {} as it's not banned", room_id.as_str());
skipped_rooms = skipped_rooms.saturating_add(1);
continue;
}
},
| RoomTargetOption::All => {},
}
}
rooms.push(room_id);
}
// Total number of rooms we'll be checking
let total_rooms = rooms.len();
info!(
"Processing {} rooms after filtering (skipped {} rooms)",
total_rooms, skipped_rooms
);
// Process each room
for room_id in rooms {
total_rooms_checked = total_rooms_checked.saturating_add(1);
// Log progress periodically
if total_rooms_checked.is_multiple_of(100) || total_rooms_checked == total_rooms {
info!(
"Progress: {}/{} rooms checked, {} tokens {}",
total_rooms_checked,
total_rooms,
total_tokens_deleted,
if !execute { "would be deleted" } else { "deleted" }
);
}
// In dry run mode, just count what would be deleted, don't actually delete
debug!(
"Room {}: {}",
room_id.as_str(),
if !execute {
"would purge sync tokens"
} else {
"purging sync tokens"
}
);
if !execute {
// For dry run mode, count tokens without deleting
match self.services.rooms.user.count_room_tokens(room_id).await {
| Ok(count) =>
if count > 0 {
debug!(
"Would delete {} sync tokens for room {}",
count,
room_id.as_str()
);
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
} else {
debug!("No sync tokens found for room {}", room_id.as_str());
},
| Err(e) => {
debug!("Error counting sync tokens for room {}: {:?}", room_id.as_str(), e);
error_count = error_count.saturating_add(1);
},
}
} else {
// Real deletion mode
match self.services.rooms.user.delete_room_tokens(room_id).await {
| Ok(count) =>
if count > 0 {
debug!("Deleted {} sync tokens for room {}", count, room_id.as_str());
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
} else {
debug!("No sync tokens found for room {}", room_id.as_str());
},
| Err(e) => {
debug!("Error purging sync tokens for room {}: {:?}", room_id.as_str(), e);
error_count = error_count.saturating_add(1);
},
}
}
}
let action = if !execute { "would be deleted" } else { "deleted" };
info!(
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
if !execute {
"purge simulation"
} else {
"purging sync tokens"
},
total_rooms_checked,
total_rooms,
total_tokens_deleted,
action,
error_count
);
self.write_str(&format!(
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
if !execute { "simulation" } else { "purging sync tokens" },
total_rooms_checked,
total_rooms,
total_tokens_deleted,
action,
error_count
))
.await
}

View file

@ -5,8 +5,9 @@ mod info;
mod moderation; mod moderation;
use clap::Subcommand; use clap::Subcommand;
use commands::RoomTargetOption;
use conduwuit::Result; use conduwuit::Result;
use ruma::OwnedRoomId; use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
use self::{ use self::{
alias::RoomAliasCommand, directory::RoomDirectoryCommand, info::RoomInfoCommand, alias::RoomAliasCommand, directory::RoomDirectoryCommand, info::RoomInfoCommand,
@ -60,4 +61,25 @@ pub enum RoomCommand {
Exists { Exists {
room_id: OwnedRoomId, room_id: OwnedRoomId,
}, },
/// - Delete all sync tokens for a room
PurgeSyncTokens {
/// Room ID or alias to purge sync tokens for
#[arg(value_parser)]
room: OwnedRoomOrAliasId,
},
/// - Delete sync tokens for all rooms that have no local users
///
/// By default, processes all empty rooms.
PurgeAllSyncTokens {
/// Target specific room types
#[arg(long, value_enum)]
target_option: Option<RoomTargetOption>,
/// Execute token deletions. Otherwise,
/// Performs a dry run without actually deleting any tokens
#[arg(long)]
execute: bool,
},
} }

View file

@ -65,6 +65,7 @@ pub(super) async fn load_joined_room(
and `join*` functions are used to perform steps in parallel which do not depend on each other. and `join*` functions are used to perform steps in parallel which do not depend on each other.
*/ */
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
let ( let (
account_data, account_data,
ephemeral, ephemeral,
@ -82,6 +83,7 @@ pub(super) async fn load_joined_room(
) )
.boxed() .boxed()
.await?; .await?;
drop(insert_lock);
if !timeline.is_empty() || !state_events.is_empty() { if !timeline.is_empty() || !state_events.is_empty() {
trace!( trace!(

View file

@ -2051,12 +2051,10 @@ pub struct Config {
pub stream_amplification: usize, pub stream_amplification: usize,
/// Number of sender task workers; determines sender parallelism. Default is /// Number of sender task workers; determines sender parallelism. Default is
/// '0' which means the value is determined internally, likely matching the /// core count. Override by setting a different value.
/// number of tokio worker-threads or number of cores, etc. Override by
/// setting a non-zero value.
/// ///
/// default: 0 /// default: core count
#[serde(default)] #[serde(default = "default_sender_workers")]
pub sender_workers: usize, pub sender_workers: usize,
/// Enables listener sockets; can be set to false to disable listening. This /// Enables listener sockets; can be set to false to disable listening. This
@ -2538,45 +2536,47 @@ fn default_database_backups_to_keep() -> i16 { 1 }
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) } fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) } fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) } fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
fn default_cache_capacity_modifier() -> f64 { 1.0 } fn default_cache_capacity_modifier() -> f64 { 1.0 }
fn default_auth_chain_cache_capacity() -> u32 { fn default_auth_chain_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000)
}
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(50_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(100_000)
} }
fn default_shorteventid_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(100_000)
}
fn default_eventidshort_cache_capacity() -> u32 { fn default_eventidshort_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(100_000)
} }
fn default_eventid_pdu_cache_capacity() -> u32 { fn default_eventid_pdu_cache_capacity() -> u32 {
parallelism_scaled_u32(25_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(100_000)
} }
fn default_shortstatekey_cache_capacity() -> u32 { fn default_shortstatekey_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(100_000)
} }
fn default_statekeyshort_cache_capacity() -> u32 { fn default_statekeyshort_cache_capacity() -> u32 {
parallelism_scaled_u32(10_000).saturating_add(100_000) parallelism_scaled_u32(50_000).saturating_add(100_000)
} }
fn default_servernameevent_data_cache_capacity() -> u32 { fn default_servernameevent_data_cache_capacity() -> u32 {
parallelism_scaled_u32(100_000).saturating_add(500_000) parallelism_scaled_u32(100_000).saturating_add(100_000)
} }
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) } fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) }
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) } fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
parallelism_scaled_u32(500).clamp(100, 12000)
}
fn default_dns_cache_entries() -> u32 { 32768 } fn default_dns_cache_entries() -> u32 { 327_680 }
fn default_dns_min_ttl() -> u64 { 60 * 180 } fn default_dns_min_ttl() -> u64 { 60 * 180 }
@ -2784,15 +2784,26 @@ fn default_admin_log_capture() -> String {
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() } fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
#[must_use]
#[allow(clippy::as_conversions, clippy::cast_precision_loss)] #[allow(clippy::as_conversions, clippy::cast_precision_loss)]
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) } pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
fn parallelism_scaled_u32(val: u32) -> u32 { #[must_use]
let val = val.try_into().expect("failed to cast u32 to usize"); #[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
parallelism_scaled(val).try_into().unwrap_or(u32::MAX) pub fn parallelism_scaled_u32(val: u32) -> u32 {
val.saturating_mul(sys::available_parallelism() as u32)
} }
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } #[must_use]
#[allow(clippy::as_conversions, clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
pub fn parallelism_scaled_i32(val: i32) -> i32 {
val.saturating_mul(sys::available_parallelism() as i32)
}
#[must_use]
pub fn parallelism_scaled(val: usize) -> usize {
val.saturating_mul(sys::available_parallelism())
}
fn default_trusted_server_batch_size() -> usize { 256 } fn default_trusted_server_batch_size() -> usize { 256 }
@ -2812,6 +2823,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
fn default_stream_amplification() -> usize { 1024 } fn default_stream_amplification() -> usize { 1024 }
fn default_sender_workers() -> usize { parallelism_scaled(1) }
fn default_client_receive_timeout() -> u64 { 75 } fn default_client_receive_timeout() -> u64 { 75 }
fn default_client_request_timeout() -> u64 { 180 } fn default_client_request_timeout() -> u64 { 180 }

View file

@ -29,7 +29,7 @@ fn descriptor_cf_options(
set_table_options(&mut opts, &desc, cache)?; set_table_options(&mut opts, &desc, cache)?;
opts.set_min_write_buffer_number(1); opts.set_min_write_buffer_number(1);
opts.set_max_write_buffer_number(2); opts.set_max_write_buffer_number(3);
opts.set_write_buffer_size(desc.write_size); opts.set_write_buffer_size(desc.write_size);
opts.set_target_file_size_base(desc.file_size); opts.set_target_file_size_base(desc.file_size);

View file

@ -8,7 +8,7 @@ use axum::{
extract::State, extract::State,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use conduwuit::{Result, debug, debug_error, debug_warn, err, error, trace}; use conduwuit::{Result, debug_warn, err, error, info, trace};
use conduwuit_service::Services; use conduwuit_service::Services;
use futures::FutureExt; use futures::FutureExt;
use http::{Method, StatusCode, Uri}; use http::{Method, StatusCode, Uri};
@ -102,11 +102,11 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
let reason = status.canonical_reason().unwrap_or("Unknown Reason"); let reason = status.canonical_reason().unwrap_or("Unknown Reason");
if status.is_server_error() { if status.is_server_error() {
error!(%method, %uri, "{code} {reason}"); info!(%method, %uri, "{code} {reason}");
} else if status.is_client_error() { } else if status.is_client_error() {
debug_error!(%method, %uri, "{code} {reason}"); info!(%method, %uri, "{code} {reason}");
} else if status.is_redirection() { } else if status.is_redirection() {
debug!(%method, %uri, "{code} {reason}"); trace!(%method, %uri, "{code} {reason}");
} else { } else {
trace!(%method, %uri, "{code} {reason}"); trace!(%method, %uri, "{code} {reason}");
} }

View file

@ -100,7 +100,7 @@ impl Service {
/// Pings the presence of the given user in the given room, setting the /// Pings the presence of the given user in the given room, setting the
/// specified state. /// specified state.
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> { pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
const REFRESH_TIMEOUT: u64 = 60 * 1000; const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4;
let last_presence = self.db.get_presence(user_id).await; let last_presence = self.db.get_presence(user_id).await;
let state_changed = match last_presence { let state_changed = match last_presence {

View file

@ -119,8 +119,12 @@ impl super::Service {
async fn actual_dest_2(&self, dest: &ServerName, cache: bool, pos: usize) -> Result<FedDest> { async fn actual_dest_2(&self, dest: &ServerName, cache: bool, pos: usize) -> Result<FedDest> {
debug!("2: Hostname with included port"); debug!("2: Hostname with included port");
let (host, port) = dest.as_str().split_at(pos); let (host, port) = dest.as_str().split_at(pos);
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache) self.conditional_query_and_cache(
.await?; host,
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
cache,
)
.await?;
Ok(FedDest::Named( Ok(FedDest::Named(
host.to_owned(), host.to_owned(),
@ -165,8 +169,12 @@ impl super::Service {
) -> Result<FedDest> { ) -> Result<FedDest> {
debug!("3.2: Hostname with port in .well-known file"); debug!("3.2: Hostname with port in .well-known file");
let (host, port) = delegated.split_at(pos); let (host, port) = delegated.split_at(pos);
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache) self.conditional_query_and_cache(
.await?; host,
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
cache,
)
.await?;
Ok(FedDest::Named( Ok(FedDest::Named(
host.to_owned(), host.to_owned(),

View file

@ -53,9 +53,9 @@ impl Resolver {
opts.cache_size = config.dns_cache_entries as usize; opts.cache_size = config.dns_cache_entries as usize;
opts.preserve_intermediates = true; opts.preserve_intermediates = true;
opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain)); opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain));
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30)); opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl)); opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl));
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7)); opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
opts.timeout = Duration::from_secs(config.dns_timeout); opts.timeout = Duration::from_secs(config.dns_timeout);
opts.attempts = config.dns_attempts as usize; opts.attempts = config.dns_attempts as usize;
opts.try_tcp_on_error = config.dns_tcp_fallback; opts.try_tcp_on_error = config.dns_tcp_fallback;

View file

@ -80,7 +80,7 @@ where
{ {
// Exponential backoff // Exponential backoff
const MIN_DURATION: u64 = 60 * 2; const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8; const MAX_DURATION: u64 = 60 * 60;
if continue_exponential_backoff_secs( if continue_exponential_backoff_secs(
MIN_DURATION, MIN_DURATION,
MAX_DURATION, MAX_DURATION,

View file

@ -46,7 +46,7 @@ where
{ {
// Exponential backoff // Exponential backoff
const MIN_DURATION: u64 = 5 * 60; const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24; const MAX_DURATION: u64 = 60 * 60;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!( debug!(
?tries, ?tries,

View file

@ -197,6 +197,15 @@ where
.await; .await;
extremities.push(incoming_pdu.event_id().to_owned()); extremities.push(incoming_pdu.event_id().to_owned());
if extremities.is_empty() {
info!(
"Retained zero extremities when upgrading outlier PDU to timeline PDU with {} \
previous events, event id: {}",
incoming_pdu.prev_events.len(),
incoming_pdu.event_id
);
}
debug!( debug!(
"Retained {} extremities checked against {} prev_events", "Retained {} extremities checked against {} prev_events",
extremities.len(), extremities.len(),

View file

@ -127,3 +127,63 @@ pub async fn get_token_shortstatehash(
.await .await
.deserialized() .deserialized()
} }
/// Count how many sync tokens exist for a room without deleting them
///
/// This is useful for dry runs to see how many tokens would be deleted
#[implement(Service)]
pub async fn count_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
use futures::TryStreamExt;
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
// Create a prefix to search by - all entries for this room will start with its
// short ID
let prefix = &[shortroomid];
// Collect all keys into a Vec and count them
let keys = self
.db
.roomsynctoken_shortstatehash
.keys_prefix_raw(prefix)
.map_ok(|_| ()) // We only need to count, not store the keys
.try_collect::<Vec<_>>()
.await?;
Ok(keys.len())
}
/// Delete all sync tokens associated with a room
///
/// This helps clean up the database as these tokens are never otherwise removed
#[implement(Service)]
pub async fn delete_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
use futures::TryStreamExt;
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
// Create a prefix to search by - all entries for this room will start with its
// short ID
let prefix = &[shortroomid];
// Collect all keys into a Vec first, then delete them
let keys = self
.db
.roomsynctoken_shortstatehash
.keys_prefix_raw(prefix)
.map_ok(|key| {
// Clone the key since we can't store references in the Vec
Vec::from(key)
})
.try_collect::<Vec<_>>()
.await?;
// Delete each key individually
for key in &keys {
self.db.roomsynctoken_shortstatehash.del(key);
}
let count = keys.len();
Ok(count)
}