Compare commits
16 commits
main
...
aranje/ill
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb29c9eeb0 | ||
|
|
955db1e634 | ||
|
|
245cb0e064 | ||
|
|
7480cf842c | ||
|
|
f47b562905 | ||
|
|
07d7e15485 | ||
|
|
214d07214d | ||
|
|
e286bcf733 | ||
|
|
71ed8eadad | ||
|
|
c948e58905 | ||
|
|
3f178d7d85 | ||
|
|
f0ffe860d6 | ||
|
|
b6e88150b2 | ||
|
|
2d9ad1bab0 | ||
|
|
23584050e8 | ||
|
|
5129803262 |
14 changed files with 338 additions and 44 deletions
|
|
@ -1783,11 +1783,9 @@
|
|||
#stream_amplification = 1024
|
||||
|
||||
# Number of sender task workers; determines sender parallelism. Default is
|
||||
# '0' which means the value is determined internally, likely matching the
|
||||
# number of tokio worker-threads or number of cores, etc. Override by
|
||||
# setting a non-zero value.
|
||||
# core count. Override by setting a different value.
|
||||
#
|
||||
#sender_workers = 0
|
||||
#sender_workers = core count
|
||||
|
||||
# Enables listener sockets; can be set to false to disable listening. This
|
||||
# option is intended for developer/diagnostic purposes only.
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use conduwuit::{Err, Result};
|
||||
use futures::StreamExt;
|
||||
use ruma::OwnedRoomId;
|
||||
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
|
||||
|
||||
use crate::{PAGE_SIZE, admin_command, get_room_info};
|
||||
|
||||
|
|
@ -82,3 +82,185 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
|
|||
|
||||
self.write_str(&format!("{result}")).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn purge_sync_tokens(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
// Resolve the room ID from the room or alias ID
|
||||
let room_id = self.services.rooms.alias.resolve(&room).await?;
|
||||
|
||||
// Delete all tokens for this room using the service method
|
||||
let Ok(deleted_count) = self.services.rooms.user.delete_room_tokens(&room_id).await else {
|
||||
return Err!("Failed to delete sync tokens for room {}", room_id.as_str());
|
||||
};
|
||||
|
||||
self.write_str(&format!(
|
||||
"Successfully deleted {deleted_count} sync tokens for room {}",
|
||||
room_id.as_str()
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Target options for room purging
|
||||
#[derive(Default, Debug, clap::ValueEnum, Clone)]
|
||||
pub enum RoomTargetOption {
|
||||
#[default]
|
||||
/// Target all rooms
|
||||
All,
|
||||
/// Target only disabled rooms
|
||||
DisabledOnly,
|
||||
/// Target only banned rooms
|
||||
BannedOnly,
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn purge_all_sync_tokens(
|
||||
&self,
|
||||
target_option: Option<RoomTargetOption>,
|
||||
execute: bool,
|
||||
) -> Result {
|
||||
use conduwuit::{debug, info};
|
||||
|
||||
let mode = if !execute { "Simulating" } else { "Starting" };
|
||||
|
||||
// strictly, we should check if these reach the max value after the loop and
|
||||
// warn the user that the count is too large
|
||||
let mut total_rooms_checked: usize = 0;
|
||||
let mut total_tokens_deleted: usize = 0;
|
||||
let mut error_count: u32 = 0;
|
||||
let mut skipped_rooms: usize = 0;
|
||||
|
||||
info!("{} purge of sync tokens", mode);
|
||||
|
||||
// Get all rooms in the server
|
||||
let all_rooms = self
|
||||
.services
|
||||
.rooms
|
||||
.metadata
|
||||
.iter_ids()
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
info!("Found {} rooms total on the server", all_rooms.len());
|
||||
|
||||
// Filter rooms based on options
|
||||
let mut rooms = Vec::new();
|
||||
for room_id in all_rooms {
|
||||
if let Some(target) = &target_option {
|
||||
match target {
|
||||
| RoomTargetOption::DisabledOnly => {
|
||||
if !self.services.rooms.metadata.is_disabled(room_id).await {
|
||||
debug!("Skipping room {} as it's not disabled", room_id.as_str());
|
||||
skipped_rooms = skipped_rooms.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
| RoomTargetOption::BannedOnly => {
|
||||
if !self.services.rooms.metadata.is_banned(room_id).await {
|
||||
debug!("Skipping room {} as it's not banned", room_id.as_str());
|
||||
skipped_rooms = skipped_rooms.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
| RoomTargetOption::All => {},
|
||||
}
|
||||
}
|
||||
|
||||
rooms.push(room_id);
|
||||
}
|
||||
|
||||
// Total number of rooms we'll be checking
|
||||
let total_rooms = rooms.len();
|
||||
info!(
|
||||
"Processing {} rooms after filtering (skipped {} rooms)",
|
||||
total_rooms, skipped_rooms
|
||||
);
|
||||
|
||||
// Process each room
|
||||
for room_id in rooms {
|
||||
total_rooms_checked = total_rooms_checked.saturating_add(1);
|
||||
|
||||
// Log progress periodically
|
||||
if total_rooms_checked.is_multiple_of(100) || total_rooms_checked == total_rooms {
|
||||
info!(
|
||||
"Progress: {}/{} rooms checked, {} tokens {}",
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
if !execute { "would be deleted" } else { "deleted" }
|
||||
);
|
||||
}
|
||||
|
||||
// In dry run mode, just count what would be deleted, don't actually delete
|
||||
debug!(
|
||||
"Room {}: {}",
|
||||
room_id.as_str(),
|
||||
if !execute {
|
||||
"would purge sync tokens"
|
||||
} else {
|
||||
"purging sync tokens"
|
||||
}
|
||||
);
|
||||
|
||||
if !execute {
|
||||
// For dry run mode, count tokens without deleting
|
||||
match self.services.rooms.user.count_room_tokens(room_id).await {
|
||||
| Ok(count) =>
|
||||
if count > 0 {
|
||||
debug!(
|
||||
"Would delete {} sync tokens for room {}",
|
||||
count,
|
||||
room_id.as_str()
|
||||
);
|
||||
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
|
||||
} else {
|
||||
debug!("No sync tokens found for room {}", room_id.as_str());
|
||||
},
|
||||
| Err(e) => {
|
||||
debug!("Error counting sync tokens for room {}: {:?}", room_id.as_str(), e);
|
||||
error_count = error_count.saturating_add(1);
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// Real deletion mode
|
||||
match self.services.rooms.user.delete_room_tokens(room_id).await {
|
||||
| Ok(count) =>
|
||||
if count > 0 {
|
||||
debug!("Deleted {} sync tokens for room {}", count, room_id.as_str());
|
||||
total_tokens_deleted = total_tokens_deleted.saturating_add(count);
|
||||
} else {
|
||||
debug!("No sync tokens found for room {}", room_id.as_str());
|
||||
},
|
||||
| Err(e) => {
|
||||
debug!("Error purging sync tokens for room {}: {:?}", room_id.as_str(), e);
|
||||
error_count = error_count.saturating_add(1);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let action = if !execute { "would be deleted" } else { "deleted" };
|
||||
info!(
|
||||
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
|
||||
if !execute {
|
||||
"purge simulation"
|
||||
} else {
|
||||
"purging sync tokens"
|
||||
},
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
action,
|
||||
error_count
|
||||
);
|
||||
|
||||
self.write_str(&format!(
|
||||
"Finished {}: checked {} rooms out of {} total, {} tokens {}, errors: {}",
|
||||
if !execute { "simulation" } else { "purging sync tokens" },
|
||||
total_rooms_checked,
|
||||
total_rooms,
|
||||
total_tokens_deleted,
|
||||
action,
|
||||
error_count
|
||||
))
|
||||
.await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,8 +5,9 @@ mod info;
|
|||
mod moderation;
|
||||
|
||||
use clap::Subcommand;
|
||||
use commands::RoomTargetOption;
|
||||
use conduwuit::Result;
|
||||
use ruma::OwnedRoomId;
|
||||
use ruma::{OwnedRoomId, OwnedRoomOrAliasId};
|
||||
|
||||
use self::{
|
||||
alias::RoomAliasCommand, directory::RoomDirectoryCommand, info::RoomInfoCommand,
|
||||
|
|
@ -60,4 +61,25 @@ pub enum RoomCommand {
|
|||
Exists {
|
||||
room_id: OwnedRoomId,
|
||||
},
|
||||
|
||||
/// - Delete all sync tokens for a room
|
||||
PurgeSyncTokens {
|
||||
/// Room ID or alias to purge sync tokens for
|
||||
#[arg(value_parser)]
|
||||
room: OwnedRoomOrAliasId,
|
||||
},
|
||||
|
||||
/// - Delete sync tokens for all rooms that have no local users
|
||||
///
|
||||
/// By default, processes all empty rooms.
|
||||
PurgeAllSyncTokens {
|
||||
/// Target specific room types
|
||||
#[arg(long, value_enum)]
|
||||
target_option: Option<RoomTargetOption>,
|
||||
|
||||
/// Execute token deletions. Otherwise,
|
||||
/// Performs a dry run without actually deleting any tokens
|
||||
#[arg(long)]
|
||||
execute: bool,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ pub(super) async fn load_joined_room(
|
|||
and `join*` functions are used to perform steps in parallel which do not depend on each other.
|
||||
*/
|
||||
|
||||
let insert_lock = services.rooms.timeline.mutex_insert.lock(room_id).await;
|
||||
let (
|
||||
account_data,
|
||||
ephemeral,
|
||||
|
|
@ -82,6 +83,7 @@ pub(super) async fn load_joined_room(
|
|||
)
|
||||
.boxed()
|
||||
.await?;
|
||||
drop(insert_lock);
|
||||
|
||||
if !timeline.is_empty() || !state_events.is_empty() {
|
||||
trace!(
|
||||
|
|
|
|||
|
|
@ -2051,12 +2051,10 @@ pub struct Config {
|
|||
pub stream_amplification: usize,
|
||||
|
||||
/// Number of sender task workers; determines sender parallelism. Default is
|
||||
/// '0' which means the value is determined internally, likely matching the
|
||||
/// number of tokio worker-threads or number of cores, etc. Override by
|
||||
/// setting a non-zero value.
|
||||
/// core count. Override by setting a different value.
|
||||
///
|
||||
/// default: 0
|
||||
#[serde(default)]
|
||||
/// default: core count
|
||||
#[serde(default = "default_sender_workers")]
|
||||
pub sender_workers: usize,
|
||||
|
||||
/// Enables listener sockets; can be set to false to disable listening. This
|
||||
|
|
@ -2538,45 +2536,47 @@ fn default_database_backups_to_keep() -> i16 { 1 }
|
|||
|
||||
fn default_db_write_buffer_capacity_mb() -> f64 { 48.0 + parallelism_scaled_f64(4.0) }
|
||||
|
||||
fn default_db_cache_capacity_mb() -> f64 { 128.0 + parallelism_scaled_f64(64.0) }
|
||||
fn default_db_cache_capacity_mb() -> f64 { 512.0 + parallelism_scaled_f64(512.0) }
|
||||
|
||||
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(10_000).saturating_add(100_000) }
|
||||
fn default_pdu_cache_capacity() -> u32 { parallelism_scaled_u32(50_000).saturating_add(100_000) }
|
||||
|
||||
fn default_cache_capacity_modifier() -> f64 { 1.0 }
|
||||
|
||||
fn default_auth_chain_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shorteventid_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shorteventid_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(100_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_eventidshort_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(25_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_eventid_pdu_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(25_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_shortstatekey_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_statekeyshort_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(10_000).saturating_add(100_000)
|
||||
parallelism_scaled_u32(50_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_servernameevent_data_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(100_000).saturating_add(500_000)
|
||||
parallelism_scaled_u32(100_000).saturating_add(100_000)
|
||||
}
|
||||
|
||||
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(100) }
|
||||
fn default_stateinfo_cache_capacity() -> u32 { parallelism_scaled_u32(500).clamp(100, 12000) }
|
||||
|
||||
fn default_roomid_spacehierarchy_cache_capacity() -> u32 { parallelism_scaled_u32(1000) }
|
||||
fn default_roomid_spacehierarchy_cache_capacity() -> u32 {
|
||||
parallelism_scaled_u32(500).clamp(100, 12000)
|
||||
}
|
||||
|
||||
fn default_dns_cache_entries() -> u32 { 32768 }
|
||||
fn default_dns_cache_entries() -> u32 { 327_680 }
|
||||
|
||||
fn default_dns_min_ttl() -> u64 { 60 * 180 }
|
||||
|
||||
|
|
@ -2784,15 +2784,26 @@ fn default_admin_log_capture() -> String {
|
|||
|
||||
fn default_admin_room_tag() -> String { "m.server_notice".to_owned() }
|
||||
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
|
||||
fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
|
||||
pub fn parallelism_scaled_f64(val: f64) -> f64 { val * (sys::available_parallelism() as f64) }
|
||||
|
||||
fn parallelism_scaled_u32(val: u32) -> u32 {
|
||||
let val = val.try_into().expect("failed to cast u32 to usize");
|
||||
parallelism_scaled(val).try_into().unwrap_or(u32::MAX)
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_possible_truncation)]
|
||||
pub fn parallelism_scaled_u32(val: u32) -> u32 {
|
||||
val.saturating_mul(sys::available_parallelism() as u32)
|
||||
}
|
||||
|
||||
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
|
||||
#[must_use]
|
||||
#[allow(clippy::as_conversions, clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
|
||||
pub fn parallelism_scaled_i32(val: i32) -> i32 {
|
||||
val.saturating_mul(sys::available_parallelism() as i32)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn parallelism_scaled(val: usize) -> usize {
|
||||
val.saturating_mul(sys::available_parallelism())
|
||||
}
|
||||
|
||||
fn default_trusted_server_batch_size() -> usize { 256 }
|
||||
|
||||
|
|
@ -2812,6 +2823,8 @@ fn default_stream_width_scale() -> f32 { 1.0 }
|
|||
|
||||
fn default_stream_amplification() -> usize { 1024 }
|
||||
|
||||
fn default_sender_workers() -> usize { parallelism_scaled(1) }
|
||||
|
||||
fn default_client_receive_timeout() -> u64 { 75 }
|
||||
|
||||
fn default_client_request_timeout() -> u64 { 180 }
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ fn descriptor_cf_options(
|
|||
set_table_options(&mut opts, &desc, cache)?;
|
||||
|
||||
opts.set_min_write_buffer_number(1);
|
||||
opts.set_max_write_buffer_number(2);
|
||||
opts.set_max_write_buffer_number(3);
|
||||
opts.set_write_buffer_size(desc.write_size);
|
||||
|
||||
opts.set_target_file_size_base(desc.file_size);
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use axum::{
|
|||
extract::State,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use conduwuit::{Result, debug, debug_error, debug_warn, err, error, trace};
|
||||
use conduwuit::{Result, debug_warn, err, error, info, trace};
|
||||
use conduwuit_service::Services;
|
||||
use futures::FutureExt;
|
||||
use http::{Method, StatusCode, Uri};
|
||||
|
|
@ -102,11 +102,11 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
|
|||
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
|
||||
|
||||
if status.is_server_error() {
|
||||
error!(%method, %uri, "{code} {reason}");
|
||||
info!(%method, %uri, "{code} {reason}");
|
||||
} else if status.is_client_error() {
|
||||
debug_error!(%method, %uri, "{code} {reason}");
|
||||
info!(%method, %uri, "{code} {reason}");
|
||||
} else if status.is_redirection() {
|
||||
debug!(%method, %uri, "{code} {reason}");
|
||||
trace!(%method, %uri, "{code} {reason}");
|
||||
} else {
|
||||
trace!(%method, %uri, "{code} {reason}");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ impl Service {
|
|||
/// Pings the presence of the given user in the given room, setting the
|
||||
/// specified state.
|
||||
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
|
||||
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
||||
const REFRESH_TIMEOUT: u64 = 60 * 1000 * 4;
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let state_changed = match last_presence {
|
||||
|
|
|
|||
|
|
@ -119,7 +119,11 @@ impl super::Service {
|
|||
async fn actual_dest_2(&self, dest: &ServerName, cache: bool, pos: usize) -> Result<FedDest> {
|
||||
debug!("2: Hostname with included port");
|
||||
let (host, port) = dest.as_str().split_at(pos);
|
||||
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache)
|
||||
self.conditional_query_and_cache(
|
||||
host,
|
||||
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
|
||||
cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(FedDest::Named(
|
||||
|
|
@ -165,7 +169,11 @@ impl super::Service {
|
|||
) -> Result<FedDest> {
|
||||
debug!("3.2: Hostname with port in .well-known file");
|
||||
let (host, port) = delegated.split_at(pos);
|
||||
self.conditional_query_and_cache(host, port.parse::<u16>().unwrap_or(8448), cache)
|
||||
self.conditional_query_and_cache(
|
||||
host,
|
||||
port.trim_start_matches(':').parse::<u16>().unwrap_or(8448),
|
||||
cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(FedDest::Named(
|
||||
|
|
|
|||
|
|
@ -53,9 +53,9 @@ impl Resolver {
|
|||
opts.cache_size = config.dns_cache_entries as usize;
|
||||
opts.preserve_intermediates = true;
|
||||
opts.negative_min_ttl = Some(Duration::from_secs(config.dns_min_ttl_nxdomain));
|
||||
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 30));
|
||||
opts.negative_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
|
||||
opts.positive_min_ttl = Some(Duration::from_secs(config.dns_min_ttl));
|
||||
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24 * 7));
|
||||
opts.positive_max_ttl = Some(Duration::from_secs(60 * 60 * 24));
|
||||
opts.timeout = Duration::from_secs(config.dns_timeout);
|
||||
opts.attempts = config.dns_attempts as usize;
|
||||
opts.try_tcp_on_error = config.dns_tcp_fallback;
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ where
|
|||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 60 * 2;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 8;
|
||||
const MAX_DURATION: u64 = 60 * 60;
|
||||
if continue_exponential_backoff_secs(
|
||||
MIN_DURATION,
|
||||
MAX_DURATION,
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ where
|
|||
{
|
||||
// Exponential backoff
|
||||
const MIN_DURATION: u64 = 5 * 60;
|
||||
const MAX_DURATION: u64 = 60 * 60 * 24;
|
||||
const MAX_DURATION: u64 = 60 * 60;
|
||||
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
|
||||
debug!(
|
||||
?tries,
|
||||
|
|
|
|||
|
|
@ -197,6 +197,15 @@ where
|
|||
.await;
|
||||
extremities.push(incoming_pdu.event_id().to_owned());
|
||||
|
||||
if extremities.is_empty() {
|
||||
info!(
|
||||
"Retained zero extremities when upgrading outlier PDU to timeline PDU with {} \
|
||||
previous events, event id: {}",
|
||||
incoming_pdu.prev_events.len(),
|
||||
incoming_pdu.event_id
|
||||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Retained {} extremities checked against {} prev_events",
|
||||
extremities.len(),
|
||||
|
|
|
|||
|
|
@ -127,3 +127,63 @@ pub async fn get_token_shortstatehash(
|
|||
.await
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
/// Count how many sync tokens exist for a room without deleting them
|
||||
///
|
||||
/// This is useful for dry runs to see how many tokens would be deleted
|
||||
#[implement(Service)]
|
||||
pub async fn count_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
// Create a prefix to search by - all entries for this room will start with its
|
||||
// short ID
|
||||
let prefix = &[shortroomid];
|
||||
|
||||
// Collect all keys into a Vec and count them
|
||||
let keys = self
|
||||
.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.keys_prefix_raw(prefix)
|
||||
.map_ok(|_| ()) // We only need to count, not store the keys
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(keys.len())
|
||||
}
|
||||
|
||||
/// Delete all sync tokens associated with a room
|
||||
///
|
||||
/// This helps clean up the database as these tokens are never otherwise removed
|
||||
#[implement(Service)]
|
||||
pub async fn delete_room_tokens(&self, room_id: &RoomId) -> Result<usize> {
|
||||
use futures::TryStreamExt;
|
||||
|
||||
let shortroomid = self.services.short.get_shortroomid(room_id).await?;
|
||||
|
||||
// Create a prefix to search by - all entries for this room will start with its
|
||||
// short ID
|
||||
let prefix = &[shortroomid];
|
||||
|
||||
// Collect all keys into a Vec first, then delete them
|
||||
let keys = self
|
||||
.db
|
||||
.roomsynctoken_shortstatehash
|
||||
.keys_prefix_raw(prefix)
|
||||
.map_ok(|key| {
|
||||
// Clone the key since we can't store references in the Vec
|
||||
Vec::from(key)
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
// Delete each key individually
|
||||
for key in &keys {
|
||||
self.db.roomsynctoken_shortstatehash.del(key);
|
||||
}
|
||||
|
||||
let count = keys.len();
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue