fix(spaces): add enforcement dedup, cache bounds, PartialEq derives, skip server user
- Add per-space dedup set to prevent concurrent enforcement tasks from competing when multiple role events fire rapidly for the same space - Add space_roles_cache_capacity config (default 1000) to bound cache growth, clearing all caches when exceeded - Add PartialEq/Eq derives to all space role event content types - Skip server user in auto_join_qualifying_rooms and handle_space_member_join Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
aa610b055a
commit
b14889176e
3 changed files with 50 additions and 6 deletions
|
|
@ -611,6 +611,13 @@ pub struct Config {
|
|||
#[serde(default)]
|
||||
pub space_permission_cascading: bool,
|
||||
|
||||
/// Maximum number of spaces to cache role data for. When exceeded the
|
||||
/// cache is cleared and repopulated on demand.
|
||||
///
|
||||
/// default: 1000
|
||||
#[serde(default = "default_space_roles_cache_capacity")]
|
||||
pub space_roles_cache_capacity: u32,
|
||||
|
||||
/// Enabling this setting opens registration to anyone without restrictions.
|
||||
/// This makes your server vulnerable to abuse
|
||||
#[serde(default)]
|
||||
|
|
@ -2834,3 +2841,5 @@ fn default_ldap_search_filter() -> String { "(objectClass=*)".to_owned() }
|
|||
fn default_ldap_uid_attribute() -> String { String::from("uid") }
|
||||
|
||||
fn default_ldap_name_attribute() -> String { String::from("givenName") }
|
||||
|
||||
fn default_space_roles_cache_capacity() -> u32 { 1000 }
|
||||
|
|
|
|||
|
|
@ -19,13 +19,13 @@ pub const SPACE_ROLE_ROOM_EVENT_TYPE: &str = "com.continuwuity.space.role.room";
|
|||
/// Content for `com.continuwuity.space.roles` (state key: "")
|
||||
///
|
||||
/// Defines available roles for a Space.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct SpaceRolesEventContent {
|
||||
pub roles: BTreeMap<String, RoleDefinition>,
|
||||
}
|
||||
|
||||
/// A single role definition within a Space.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct RoleDefinition {
|
||||
pub description: String,
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ pub struct RoleDefinition {
|
|||
/// Content for `com.continuwuity.space.role.member` (state key: user ID)
|
||||
///
|
||||
/// Assigns roles to a user within a Space.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct SpaceRoleMemberEventContent {
|
||||
pub roles: Vec<String>,
|
||||
}
|
||||
|
|
@ -46,7 +46,7 @@ pub struct SpaceRoleMemberEventContent {
|
|||
/// Content for `com.continuwuity.space.role.room` (state key: room ID)
|
||||
///
|
||||
/// Declares which roles a child room requires for access.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct SpaceRoleRoomEventContent {
|
||||
pub required_roles: Vec<String>,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,6 +60,8 @@ pub struct Service {
|
|||
pub space_to_rooms: RwLock<HashMap<OwnedRoomId, HashSet<OwnedRoomId>>>,
|
||||
/// Semaphore to limit concurrent enforcement tasks
|
||||
pub enforcement_semaphore: tokio::sync::Semaphore,
|
||||
/// Spaces that currently have an enforcement task in progress
|
||||
pending_enforcement: RwLock<HashSet<OwnedRoomId>>,
|
||||
}
|
||||
|
||||
struct Services {
|
||||
|
|
@ -91,6 +93,7 @@ impl crate::Service for Service {
|
|||
room_to_space: RwLock::new(HashMap::new()),
|
||||
space_to_rooms: RwLock::new(HashMap::new()),
|
||||
enforcement_semaphore: tokio::sync::Semaphore::new(4),
|
||||
pending_enforcement: RwLock::new(HashSet::new()),
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
@ -237,6 +240,16 @@ pub async fn populate_space(&self, space_id: &RoomId) {
|
|||
return;
|
||||
}
|
||||
|
||||
// Check cache capacity — if over limit, clear and let spaces repopulate on demand
|
||||
if self.roles.read().await.len() >= self.server.config.space_roles_cache_capacity as usize {
|
||||
self.roles.write().await.clear();
|
||||
self.user_roles.write().await.clear();
|
||||
self.room_requirements.write().await.clear();
|
||||
self.room_to_space.write().await.clear();
|
||||
self.space_to_rooms.write().await.clear();
|
||||
debug_warn!("Space roles cache exceeded capacity, cleared");
|
||||
}
|
||||
|
||||
// 1. Read com.continuwuity.space.roles (state key: "")
|
||||
let roles_event_type = StateEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned());
|
||||
if let Ok(content) = self
|
||||
|
|
@ -581,11 +594,15 @@ pub async fn auto_join_qualifying_rooms(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip server user — it doesn't need role-based auto-join
|
||||
let server_user = self.services.globals.server_user.as_ref();
|
||||
if user_id == server_user {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Get all child rooms from the room_to_space reverse index
|
||||
let child_rooms = self.get_child_rooms(space_id).await;
|
||||
|
||||
let server_user = self.services.globals.server_user.as_ref();
|
||||
|
||||
for child_room_id in &child_rooms {
|
||||
// Skip if already joined
|
||||
if self
|
||||
|
|
@ -679,6 +696,16 @@ impl Service {
|
|||
|
||||
let this = Arc::clone(self);
|
||||
self.server.runtime().spawn(async move {
|
||||
// Deduplicate: if enforcement is already pending for this space, skip.
|
||||
// The running task's populate_space will pick up the latest state.
|
||||
{
|
||||
let mut pending = this.pending_enforcement.write().await;
|
||||
if pending.contains(&space_id) {
|
||||
return;
|
||||
}
|
||||
pending.insert(space_id.clone());
|
||||
}
|
||||
|
||||
let _permit = this.enforcement_semaphore.acquire().await;
|
||||
|
||||
// Always repopulate cache first
|
||||
|
|
@ -776,6 +803,9 @@ impl Service {
|
|||
},
|
||||
| _ => {},
|
||||
}
|
||||
|
||||
// Remove from pending set so future events can trigger enforcement
|
||||
this.pending_enforcement.write().await.remove(&space_id);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -941,6 +971,11 @@ impl Service {
|
|||
return;
|
||||
}
|
||||
|
||||
// Skip if the user is the server user
|
||||
if user_id == self.services.globals.server_user {
|
||||
return;
|
||||
}
|
||||
|
||||
let this = Arc::clone(self);
|
||||
self.server.runtime().spawn(async move {
|
||||
let _permit = this.enforcement_semaphore.acquire().await;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue