From f4ab456bbd09bbe17ac4151957550212e1b8b646 Mon Sep 17 00:00:00 2001 From: ember33 Date: Thu, 19 Mar 2026 22:43:08 +0100 Subject: [PATCH] feat(spaces): add space roles service with enforcement and caching Implement the roles service that manages space permission cascading: - In-memory cache populated from state events, rebuilt on startup - Join gating, power level sync (highest-wins across parent spaces), auto-join on role grant, auto-kick on role revocation - Per-space enable/disable via com.continuwuity.space.cascading event - Background enforcement tasks with semaphore-limited concurrency - Graceful shutdown support via server.running() checks --- src/service/rooms/mod.rs | 2 + src/service/rooms/roles/mod.rs | 1202 ++++++++++++++++++++++++++++++ src/service/rooms/roles/tests.rs | 204 +++++ src/service/services.rs | 1 + 4 files changed, 1409 insertions(+) create mode 100644 src/service/rooms/roles/mod.rs create mode 100644 src/service/rooms/roles/tests.rs diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 44a83582..bf4304f7 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -7,6 +7,7 @@ pub mod metadata; pub mod outlier; pub mod pdu_metadata; pub mod read_receipt; +pub mod roles; pub mod search; pub mod short; pub mod spaces; @@ -31,6 +32,7 @@ pub struct Service { pub outlier: Arc, pub pdu_metadata: Arc, pub read_receipt: Arc, + pub roles: Arc, pub search: Arc, pub short: Arc, pub spaces: Arc, diff --git a/src/service/rooms/roles/mod.rs b/src/service/rooms/roles/mod.rs new file mode 100644 index 00000000..73a2d430 --- /dev/null +++ b/src/service/rooms/roles/mod.rs @@ -0,0 +1,1202 @@ +#[cfg(test)] +mod tests; + +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + fmt::Write, + sync::Arc, +}; + +use async_trait::async_trait; +use conduwuit::{ + Err, Event, Result, Server, debug, debug_warn, implement, info, + matrix::pdu::{PduBuilder, PduEvent}, + warn, +}; +use conduwuit_core::{ + matrix::space_roles::{ + RoleDefinition, SPACE_CASCADING_EVENT_TYPE, SPACE_ROLE_MEMBER_EVENT_TYPE, + SPACE_ROLE_ROOM_EVENT_TYPE, SPACE_ROLES_EVENT_TYPE, SpaceCascadingEventContent, + SpaceRoleMemberEventContent, SpaceRoleRoomEventContent, SpaceRolesEventContent, + }, + utils::{ + future::TryExtExt, + stream::{BroadbandExt, ReadyExt}, + }, +}; +use futures::{StreamExt, TryFutureExt}; +use ruma::{ + Int, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + events::{ + StateEventType, + room::{ + member::{MembershipState, RoomMemberEventContent}, + power_levels::RoomPowerLevelsEventContent, + }, + space::child::SpaceChildEventContent, + }, + room::RoomType, +}; +use serde_json::value::to_raw_value; +use tokio::sync::{RwLock, Semaphore}; + +use crate::{Dep, globals, rooms}; + +#[implement(Service)] +pub async fn flush_space_from_cache(&self, space_id: &RoomId) { + self.roles.write().await.remove(space_id); + self.user_roles.write().await.remove(space_id); + self.room_requirements.write().await.remove(space_id); + let mut room_to_space = self.room_to_space.write().await; + room_to_space.retain(|_, parents| { + parents.remove(space_id); + !parents.is_empty() + }); + drop(room_to_space); + self.space_to_rooms.write().await.remove(space_id); +} + +#[implement(Service)] +async fn flush_caches(&self) { + self.roles.write().await.clear(); + self.user_roles.write().await.clear(); + self.room_requirements.write().await.clear(); + self.room_to_space.write().await.clear(); + self.space_to_rooms.write().await.clear(); +} + +fn roles_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned()) +} + +fn member_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLE_MEMBER_EVENT_TYPE.to_owned()) +} + +fn room_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLE_ROOM_EVENT_TYPE.to_owned()) +} + +fn cascading_event_type() -> StateEventType { + StateEventType::from(SPACE_CASCADING_EVENT_TYPE.to_owned()) +} + +pub struct Service { + services: Services, + server: Arc, + roles: RwLock>>, + user_roles: RwLock>>>, + room_requirements: RwLock>>>, + room_to_space: RwLock>>, + space_to_rooms: RwLock>>, + enforcement_semaphore: Semaphore, + pending_enforcement: RwLock>, +} + +struct Services { + globals: Dep, + metadata: Dep, + state_accessor: Dep, + state_cache: Dep, + state: Dep, + timeline: Dep, +} + +#[async_trait] +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: Services { + globals: args.depend::("globals"), + metadata: args.depend::("rooms::metadata"), + state_accessor: args + .depend::("rooms::state_accessor"), + state_cache: args.depend::("rooms::state_cache"), + state: args.depend::("rooms::state"), + timeline: args.depend::("rooms::timeline"), + }, + server: args.server.clone(), + roles: RwLock::new(HashMap::new()), + user_roles: RwLock::new(HashMap::new()), + room_requirements: RwLock::new(HashMap::new()), + room_to_space: RwLock::new(HashMap::new()), + space_to_rooms: RwLock::new(HashMap::new()), + enforcement_semaphore: Semaphore::new(4), + pending_enforcement: RwLock::new(HashSet::new()), + })) + } + + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { + if !self.is_enabled() { + return Ok(()); + } + + let roles = self.roles.read().await.len(); + let user_roles = self.user_roles.read().await.len(); + let room_requirements = self.room_requirements.read().await.len(); + let room_to_space = self.room_to_space.read().await.len(); + let space_to_rooms = self.space_to_rooms.read().await.len(); + + writeln!(out, "space_roles_definitions: {roles}")?; + writeln!(out, "space_user_roles: {user_roles}")?; + writeln!(out, "space_room_requirements: {room_requirements}")?; + writeln!(out, "space_room_to_space_index: {room_to_space}")?; + writeln!(out, "space_space_to_rooms_index: {space_to_rooms}")?; + + Ok(()) + } + + async fn clear_cache(&self) { + if !self.is_enabled() { + return; + } + self.flush_caches().await; + } + + async fn worker(self: Arc) -> Result<()> { + info!("Rebuilding space roles cache from all known rooms"); + + let mut space_count: usize = 0; + let room_ids: Vec = self + .services + .metadata + .iter_ids() + .map(ToOwned::to_owned) + .collect() + .await; + + for room_id in &room_ids { + match self.services.state_accessor.get_room_type(room_id).await { + | Ok(RoomType::Space) => { + // Check per-Space override — skip spaces where cascading is + // disabled + if !self.is_enabled_for_space(room_id).await { + continue; + } + debug!(room_id = %room_id, "Populating space roles cache"); + self.populate_space(room_id).await; + space_count = space_count.saturating_add(1); + }, + | _ => continue, + } + } + + info!(space_count, "Space roles cache rebuilt"); + Ok(()) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +#[implement(Service)] +pub fn is_enabled(&self) -> bool { self.server.config.space_permission_cascading } + +#[implement(Service)] +pub async fn is_enabled_for_space(&self, space_id: &RoomId) -> bool { + let cascading_event_type = cascading_event_type(); + if let Ok(content) = self + .services + .state_accessor + .room_state_get_content::(space_id, &cascading_event_type, "") + .await + { + return content.enabled; + } + + self.server.config.space_permission_cascading +} + +#[implement(Service)] +pub async fn ensure_default_roles(&self, space_id: &RoomId) -> Result { + let server_user = self.services.globals.server_user.as_ref(); + let state_lock = self.services.state.mutex.lock(space_id).await; + + let roles_event_type = roles_event_type(); + if self + .services + .state_accessor + .room_state_get_content::(space_id, &roles_event_type, "") + .await + .is_ok() + { + return Ok(()); + } + + let mut roles = BTreeMap::new(); + roles.insert("admin".to_owned(), RoleDefinition { + description: "Space administrator".to_owned(), + power_level: Some(100), + }); + roles.insert("mod".to_owned(), RoleDefinition { + description: "Space moderator".to_owned(), + power_level: Some(50), + }); + + let content = SpaceRolesEventContent { roles }; + + let pdu = PduBuilder { + event_type: ruma::events::TimelineEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned()), + content: to_raw_value(&content) + .map_err(|e| conduwuit::err!("Failed to serialize SpaceRolesEventContent: {e}"))?, + state_key: Some(String::new().into()), + ..PduBuilder::default() + }; + + self.services + .timeline + .build_and_append_pdu(pdu, server_user, Some(space_id), &state_lock) + .await?; + + debug!(space_id = %space_id, event_type = SPACE_ROLES_EVENT_TYPE, "Sent default space roles event"); + + Ok(()) +} + +#[implement(Service)] +pub async fn populate_space(&self, space_id: &RoomId) { + if !self.is_enabled_for_space(space_id).await { + return; + } + + if self.roles.read().await.len() + >= usize::try_from(self.server.config.space_roles_cache_flush_threshold) + .unwrap_or(usize::MAX) + { + self.flush_caches().await; + debug_warn!("Space roles cache exceeded capacity, cleared"); + } + + let roles_event_type = roles_event_type(); + if let Ok(content) = self + .services + .state_accessor + .room_state_get_content::(space_id, &roles_event_type, "") + .await + { + self.roles + .write() + .await + .insert(space_id.to_owned(), content.roles); + } + + let member_event_type = member_event_type(); + let shortstatehash = match self.services.state.get_room_shortstatehash(space_id).await { + | Ok(hash) => hash, + | Err(e) => { + debug_warn!(space_id = %space_id, error = ?e, "Failed to get shortstatehash, cache may be stale"); + return; + }, + }; + { + let mut user_roles_map: HashMap> = HashMap::new(); + + self.services + .state_accessor + .state_keys_with_ids(shortstatehash, &member_event_type) + .boxed() + .broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move { + self.services + .timeline + .get_pdu(&event_id) + .map_ok(move |pdu| (state_key, pdu)) + .ok() + .await + }) + .ready_filter_map(|(state_key, pdu)| { + let content = pdu.get_content::().ok()?; + let user_id = UserId::parse(&*state_key).ok()?.to_owned(); + Some((user_id, content.roles)) + }) + .for_each(|(user_id, roles)| { + user_roles_map.insert(user_id, roles.into_iter().collect()); + async {} + }) + .await; + + self.user_roles + .write() + .await + .insert(space_id.to_owned(), user_roles_map); + + let room_event_type = room_event_type(); + let mut room_reqs_map: HashMap> = HashMap::new(); + + self.services + .state_accessor + .state_keys_with_ids(shortstatehash, &room_event_type) + .boxed() + .broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move { + self.services + .timeline + .get_pdu(&event_id) + .map_ok(move |pdu| (state_key, pdu)) + .ok() + .await + }) + .ready_filter_map(|(state_key, pdu)| { + let content = pdu.get_content::().ok()?; + let room_id = RoomId::parse(&*state_key).ok()?.to_owned(); + Some((room_id, content.required_roles)) + }) + .for_each(|(room_id, required_roles)| { + room_reqs_map.insert(room_id, required_roles.into_iter().collect()); + async {} + }) + .await; + + self.room_requirements + .write() + .await + .insert(space_id.to_owned(), room_reqs_map); + + let mut child_rooms: Vec = Vec::new(); + + self.services + .state_accessor + .state_keys_with_ids(shortstatehash, &StateEventType::SpaceChild) + .boxed() + .broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move { + self.services + .timeline + .get_pdu(&event_id) + .map_ok(move |pdu| (state_key, pdu)) + .ok() + .await + }) + .ready_filter_map(|(state_key, pdu)| { + if let Ok(content) = pdu.get_content::() { + if content.via.is_empty() { + return None; + } + } else { + return None; + } + let child_room_id = RoomId::parse(&*state_key).ok()?.to_owned(); + Some(child_room_id) + }) + .for_each(|child_room_id| { + child_rooms.push(child_room_id); + async {} + }) + .await; + + { + let mut room_to_space = self.room_to_space.write().await; + room_to_space.retain(|_, parents| { + parents.remove(space_id); + !parents.is_empty() + }); + for child_room_id in &child_rooms { + room_to_space + .entry(child_room_id.clone()) + .or_default() + .insert(space_id.to_owned()); + } + } + + { + let mut space_to_rooms = self.space_to_rooms.write().await; + space_to_rooms.insert(space_id.to_owned(), child_rooms.into_iter().collect()) + }; + } +} + +#[must_use] +pub fn compute_user_power_level( + role_defs: &BTreeMap, + assigned: &HashSet, +) -> Option { + assigned + .iter() + .filter_map(|role_name| role_defs.get(role_name)?.power_level) + .max() +} + +#[must_use] +pub fn roles_satisfy_requirements( + required: &HashSet, + assigned: &HashSet, +) -> bool { + required.iter().all(|r| assigned.contains(r)) +} + +#[implement(Service)] +pub async fn get_user_power_level(&self, space_id: &RoomId, user_id: &UserId) -> Option { + let role_defs = { self.roles.read().await.get(space_id).cloned()? }; + let user_assigned = { + self.user_roles + .read() + .await + .get(space_id)? + .get(user_id) + .cloned()? + }; + compute_user_power_level(&role_defs, &user_assigned) +} + +#[implement(Service)] +pub async fn user_qualifies_for_room( + &self, + space_id: &RoomId, + room_id: &RoomId, + user_id: &UserId, +) -> bool { + let required = { + let guard = self.room_requirements.read().await; + let Some(space_reqs) = guard.get(space_id) else { + return true; + }; + let Some(required) = space_reqs.get(room_id) else { + return true; + }; + if required.is_empty() { + return true; + } + required.clone() + }; + let user_assigned = { + let guard = self.user_roles.read().await; + let Some(space_users) = guard.get(space_id) else { + return false; + }; + let Some(assigned) = space_users.get(user_id) else { + return false; + }; + assigned.clone() + }; + roles_satisfy_requirements(&required, &user_assigned) +} + +#[implement(Service)] +pub async fn get_parent_spaces(&self, room_id: &RoomId) -> Vec { + let all_parents: Vec = self + .room_to_space + .read() + .await + .get(room_id) + .map(|set| set.iter().cloned().collect()) + .unwrap_or_default(); + + let mut enabled_parents = Vec::new(); + for parent in all_parents { + if self.is_enabled_for_space(&parent).await { + enabled_parents.push(parent); + } + } + enabled_parents +} + +#[implement(Service)] +pub async fn get_child_rooms(&self, space_id: &RoomId) -> Vec { + self.space_to_rooms + .read() + .await + .get(space_id) + .map(|set| set.iter().cloned().collect()) + .unwrap_or_default() +} + +#[implement(Service)] +pub async fn get_user_roles_in_space( + &self, + space_id: &RoomId, + user_id: &UserId, +) -> Option> { + self.user_roles + .read() + .await + .get(space_id)? + .get(user_id) + .cloned() +} + +#[implement(Service)] +pub async fn get_room_requirements_in_space( + &self, + space_id: &RoomId, + room_id: &RoomId, +) -> Option> { + self.room_requirements + .read() + .await + .get(space_id)? + .get(room_id) + .cloned() +} + +#[implement(Service)] +pub async fn sync_power_levels(&self, room_id: &RoomId) -> Result { + let server_user = self.services.globals.server_user.as_ref(); + if !self + .services + .state_cache + .is_joined(server_user, room_id) + .await + { + debug_warn!(room_id = %room_id, "Server user is not joined, skipping PL sync"); + return Ok(()); + } + + let mut power_levels_content: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let members: Vec = self + .services + .state_cache + .room_members(room_id) + .map(ToOwned::to_owned) + .collect() + .await; + + let all_parents = self.get_parent_spaces(room_id).await; + + let mut changed = false; + for user_id in &members { + if user_id == server_user { + continue; + } + + let mut max_pl: Option = None; + for parent in &all_parents { + if let Some(pl) = self.get_user_power_level(parent, user_id).await { + max_pl = Some(max_pl.map_or(pl, |current| current.max(pl))); + } + } + + if let Some(effective_pl) = max_pl { + let effective_pl_int = Int::new_saturating(effective_pl); + let current_pl = power_levels_content + .users + .get(user_id) + .copied() + .unwrap_or(power_levels_content.users_default); + + if current_pl != effective_pl_int { + power_levels_content + .users + .insert(user_id.clone(), effective_pl_int); + changed = true; + } + } + } + + if changed { + let state_lock = self.services.state.mutex.lock(room_id).await; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(String::new(), &power_levels_content), + server_user, + Some(room_id), + &state_lock, + ) + .await?; + } + + Ok(()) +} + +#[implement(Service)] +pub async fn auto_join_qualifying_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result { + if !self.is_enabled_for_space(space_id).await { + return Ok(()); + } + + let server_user = self.services.globals.server_user.as_ref(); + if user_id == server_user { + return Ok(()); + } + + let child_rooms = self.get_child_rooms(space_id).await; + + for child_room_id in &child_rooms { + if self + .services + .state_cache + .is_joined(user_id, child_room_id) + .await + { + continue; + } + + if !self + .user_qualifies_for_room(space_id, child_room_id, user_id) + .await + { + continue; + } + + if !self + .services + .state_cache + .is_joined(server_user, child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join"); + continue; + } + + if let Err(e) = self + .invite_and_join_user(child_room_id, user_id, server_user) + .await + { + debug_warn!(user_id = %user_id, room_id = %child_room_id, error = ?e, "Failed to auto-join user"); + } + } + + Ok(()) +} + +#[implement(Service)] +async fn invite_and_join_user( + &self, + room_id: &RoomId, + user_id: &UserId, + server_user: &UserId, +) -> Result { + let state_lock = self.services.state.mutex.lock(room_id).await; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state( + user_id.to_string(), + &RoomMemberEventContent::new(MembershipState::Invite), + ), + server_user, + Some(room_id), + &state_lock, + ) + .await?; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state( + user_id.to_string(), + &RoomMemberEventContent::new(MembershipState::Join), + ), + user_id, + Some(room_id), + &state_lock, + ) + .await?; + + Ok(()) +} + +/// Called from append_pdu after a PDU is persisted. Dispatches to the +/// appropriate handler based on event type. +#[implement(Service)] +pub fn on_pdu_appended(self: &Arc, room_id: &RoomId, pdu: &PduEvent) { + if let Some(state_key) = pdu.state_key() { + let event_type_str = pdu.event_type().to_string(); + match event_type_str.as_str() { + | SPACE_ROLES_EVENT_TYPE + | SPACE_ROLE_MEMBER_EVENT_TYPE + | SPACE_ROLE_ROOM_EVENT_TYPE + | SPACE_CASCADING_EVENT_TYPE => { + self.handle_state_event_change( + room_id.to_owned(), + event_type_str, + state_key.to_owned(), + ); + }, + | _ => { + if *pdu.kind() == ruma::events::TimelineEventType::SpaceChild { + if let Ok(child_room_id) = RoomId::parse(&*state_key) { + self.handle_space_child_change( + room_id.to_owned(), + child_room_id.to_owned(), + ); + } + } + if *pdu.kind() == ruma::events::TimelineEventType::RoomMember { + if let Ok(content) = pdu.get_content::() { + if let Ok(user_id) = UserId::parse(&*state_key) { + match content.membership { + | MembershipState::Join => { + self.handle_space_member_join( + room_id.to_owned(), + user_id.to_owned(), + ); + }, + | MembershipState::Leave | MembershipState::Ban => { + self.handle_space_member_leave( + room_id.to_owned(), + user_id.to_owned(), + ); + }, + | _ => {}, + } + } + } + } + }, + } + } +} + +/// Called from build_and_append_pdu to validate PL changes don't conflict +/// with space-granted power levels. +#[implement(Service)] +pub async fn validate_pl_change( + &self, + room_id: &RoomId, + sender: &UserId, + proposed: &RoomPowerLevelsEventContent, +) -> Result { + if sender == self.services.globals.server_user.as_str() { + return Ok(()); + } + + let parent_spaces = self.get_parent_spaces(room_id).await; + if parent_spaces.is_empty() { + return Ok(()); + } + + let mut effective_pls: HashMap = HashMap::new(); + { + let roles_guard = self.roles.read().await; + let user_roles_guard = self.user_roles.read().await; + for ps in &parent_spaces { + let Some(space_users) = user_roles_guard.get(ps) else { + continue; + }; + let Some(role_defs) = roles_guard.get(ps) else { + continue; + }; + for (user_id, assigned_roles) in space_users { + let pl = assigned_roles + .iter() + .filter_map(|r| role_defs.get(r)?.power_level) + .max(); + if let Some(pl) = pl { + effective_pls + .entry(user_id.clone()) + .and_modify(|current| *current = (*current).max(pl)) + .or_insert(pl); + } + } + } + } + + for (user_id, effective_pl) in &effective_pls { + if !self.services.state_cache.is_joined(user_id, room_id).await { + continue; + } + match proposed.users.get(user_id) { + | None if i64::from(proposed.users_default) != *effective_pl => { + debug_warn!( + user_id = %user_id, + room_id = %room_id, + effective_pl, + "Rejecting PL change: space-managed user omitted" + ); + return Err!(Request(Forbidden( + "Cannot omit a user whose power level is managed by Space roles" + ))); + }, + | Some(pl) if i64::from(*pl) != *effective_pl => { + debug_warn!( + user_id = %user_id, + room_id = %room_id, + proposed_pl = i64::from(*pl), + effective_pl, + "Rejecting PL change conflicting with space role" + ); + return Err!(Request(Forbidden( + "Cannot change power level that is set by Space roles" + ))); + }, + | _ => {}, + } + } + + Ok(()) +} + +/// Called from join_room_by_id_helper to check if a user has the required +/// Space roles to join a room. +#[implement(Service)] +pub async fn check_join_allowed(&self, room_id: &RoomId, user_id: &UserId) -> Result { + let parent_spaces = self.get_parent_spaces(room_id).await; + if parent_spaces.is_empty() { + return Ok(()); + } + + for parent_space in &parent_spaces { + if self + .user_qualifies_for_room(parent_space, room_id, user_id) + .await + { + return Ok(()); + } + } + + Err!(Request(Forbidden("You do not have the required Space roles to join this room"))) +} + +#[implement(Service)] +async fn sync_power_levels_for_children(&self, space_id: &RoomId) { + let child_rooms = self.get_child_rooms(space_id).await; + for child_room_id in &child_rooms { + if let Err(e) = self.sync_power_levels(child_room_id).await { + debug_warn!(room_id = %child_room_id, error = ?e, "Failed to sync power levels"); + } + } +} + +impl Service { + pub fn handle_state_event_change( + self: &Arc, + space_id: OwnedRoomId, + event_type: String, + state_key: String, + ) { + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if event_type != SPACE_CASCADING_EVENT_TYPE + && !this.is_enabled_for_space(&space_id).await + { + return; + } + { + let mut pending = this.pending_enforcement.write().await; + if pending.contains(&space_id) { + return; + } + pending.insert(space_id.clone()) + }; + + async { + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + this.populate_space(&space_id).await; + + match event_type.as_str() { + | SPACE_ROLES_EVENT_TYPE => { + this.sync_power_levels_for_children(&space_id).await; + let space_members: Vec = this + .services + .state_cache + .room_members(&space_id) + .map(ToOwned::to_owned) + .collect() + .await; + for member in &space_members { + if let Err(e) = + Box::pin(this.kick_unqualified_from_rooms(&space_id, member)) + .await + { + debug_warn!(user_id = %member, error = ?e, "Role definition revalidation kick failed"); + } + } + }, + | SPACE_ROLE_MEMBER_EVENT_TYPE => { + if let Ok(user_id) = UserId::parse(state_key.as_str()) { + if let Err(e) = + this.auto_join_qualifying_rooms(&space_id, user_id).await + { + debug_warn!(user_id = %user_id, error = ?e, "Space role auto-join failed"); + } + if let Err(e) = + Box::pin(this.kick_unqualified_from_rooms(&space_id, user_id)) + .await + { + debug_warn!(user_id = %user_id, error = ?e, "Space role auto-kick failed"); + } + this.sync_power_levels_for_children(&space_id).await; + } + }, + | SPACE_ROLE_ROOM_EVENT_TYPE => { + if let Ok(target_room) = RoomId::parse(state_key.as_str()) { + let members: Vec = this + .services + .state_cache + .room_members(target_room) + .map(ToOwned::to_owned) + .collect() + .await; + for member in &members { + if let Err(e) = + Box::pin(this.kick_unqualified_from_rooms(&space_id, member)) + .await + { + debug_warn!(user_id = %member, error = ?e, "Space role requirement kick failed"); + } + } + } + }, + | SPACE_CASCADING_EVENT_TYPE => { + if !this.is_enabled_for_space(&space_id).await { + this.flush_space_from_cache(&space_id).await; + } + }, + | _ => {}, + } + } + .await; + + this.pending_enforcement.write().await.remove(&space_id); + }); + } + + pub fn handle_space_child_change( + self: &Arc, + space_id: OwnedRoomId, + child_room_id: OwnedRoomId, + ) { + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + let child_event_type = StateEventType::SpaceChild; + let is_removal = match this + .services + .state_accessor + .room_state_get_content::( + &space_id, + &child_event_type, + child_room_id.as_str(), + ) + .await + { + | Ok(content) => content.via.is_empty(), + | Err(_) => true, // If we can't read it, treat as removal + }; + + if is_removal { + let mut room_to_space = this.room_to_space.write().await; + if let Some(parents) = room_to_space.get_mut(&child_room_id) { + parents.remove(&space_id); + if parents.is_empty() { + room_to_space.remove(&child_room_id); + } + } + let mut space_to_rooms = this.space_to_rooms.write().await; + if let Some(children) = space_to_rooms.get_mut(&space_id) { + children.remove(&child_room_id); + } + return; + } + + this.room_to_space + .write() + .await + .entry(child_room_id.clone()) + .or_default() + .insert(space_id.clone()); + + this.space_to_rooms + .write() + .await + .entry(space_id.clone()) + .or_default() + .insert(child_room_id.clone()); + + let server_user = this.services.globals.server_user.as_ref(); + if !this + .services + .state_cache + .is_joined(server_user, &child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join enforcement for new child"); + return; + } + + let space_members: Vec = this + .services + .state_cache + .room_members(&space_id) + .map(ToOwned::to_owned) + .collect() + .await; + + for member in &space_members { + if this + .user_qualifies_for_room(&space_id, &child_room_id, member) + .await && !this + .services + .state_cache + .is_joined(member, &child_room_id) + .await + { + if let Err(e) = this + .invite_and_join_user(&child_room_id, member, server_user) + .await + { + debug_warn!(user_id = %member, room_id = %child_room_id, error = ?e, "Failed to auto-join user"); + } + } + } + }); + } + + pub fn handle_space_member_join( + self: &Arc, + space_id: OwnedRoomId, + user_id: OwnedUserId, + ) { + if user_id == self.services.globals.server_user { + return; + } + + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + if let Err(e) = this.auto_join_qualifying_rooms(&space_id, &user_id).await { + debug_warn!(user_id = %user_id, error = ?e, "Auto-join on Space join failed"); + } + this.sync_power_levels_for_children(&space_id).await; + }); + } + + pub fn handle_space_member_leave( + self: &Arc, + space_id: OwnedRoomId, + user_id: OwnedUserId, + ) { + if user_id == self.services.globals.server_user { + return; + } + + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + if let Err(e) = Box::pin(this.kick_unqualified_from_rooms(&space_id, &user_id)).await + { + debug_warn!(user_id = %user_id, error = ?e, "Kick on Space leave failed"); + } + }); + } +} + +#[implement(Service)] +pub async fn kick_unqualified_from_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result { + if !self.is_enabled_for_space(space_id).await { + return Ok(()); + } + + let server_user = self.services.globals.server_user.as_ref(); + if user_id == server_user { + return Ok(()); + } + + let child_rooms: Vec = self + .room_requirements + .read() + .await + .get(space_id) + .map(|reqs| reqs.keys().cloned().collect()) + .unwrap_or_default(); + + for child_room_id in &child_rooms { + if !self + .services + .state_cache + .is_joined(server_user, child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping kick enforcement"); + continue; + } + if !self + .services + .state_cache + .is_joined(user_id, child_room_id) + .await + { + continue; + } + + let all_parents = self.get_parent_spaces(child_room_id).await; + let mut qualifies_in_any = false; + for parent in &all_parents { + if self + .user_qualifies_for_room(parent, child_room_id, user_id) + .await + { + qualifies_in_any = true; + break; + } + } + if qualifies_in_any { + continue; + } + + let Ok(member_content) = self + .services + .state_accessor + .get_member(child_room_id, user_id) + .await + else { + debug_warn!(user_id = %user_id, room_id = %child_room_id, "Could not get member event, skipping kick"); + continue; + }; + + let state_lock = self.services.state.mutex.lock(child_room_id).await; + + if let Err(e) = self + .services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason: Some("No longer has required Space roles".into()), + is_direct: None, + join_authorized_via_users_server: None, + third_party_invite: None, + ..member_content + }), + server_user, + Some(child_room_id), + &state_lock, + ) + .await + { + warn!(user_id = %user_id, room_id = %child_room_id, error = ?e, "Failed to kick user for missing roles"); + } + } + + Ok(()) +} diff --git a/src/service/rooms/roles/tests.rs b/src/service/rooms/roles/tests.rs new file mode 100644 index 00000000..fedcfd79 --- /dev/null +++ b/src/service/rooms/roles/tests.rs @@ -0,0 +1,204 @@ +use std::collections::{BTreeMap, HashSet}; + +use conduwuit_core::matrix::space_roles::RoleDefinition; + +use super::{compute_user_power_level, roles_satisfy_requirements}; + +pub(super) fn make_roles(entries: &[(&str, Option)]) -> BTreeMap { + entries + .iter() + .map(|(name, pl)| { + ((*name).to_owned(), RoleDefinition { + description: format!("{name} role"), + power_level: *pl, + }) + }) + .collect() +} + +pub(super) fn make_set(items: &[&str]) -> HashSet { + items.iter().map(|s| (*s).to_owned()).collect() +} + +#[test] +fn power_level_single_role() { + let roles = make_roles(&[("admin", Some(100)), ("mod", Some(50))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["admin"])), Some(100)); +} + +#[test] +fn power_level_multiple_roles_takes_highest() { + let roles = make_roles(&[("admin", Some(100)), ("mod", Some(50)), ("helper", Some(25))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["mod", "helper"])), Some(50)); +} + +#[test] +fn power_level_no_power_roles() { + let roles = make_roles(&[("nsfw", None), ("vip", None)]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["nsfw", "vip"])), None); +} + +#[test] +fn power_level_mixed_roles() { + let roles = make_roles(&[("mod", Some(50)), ("nsfw", None)]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["mod", "nsfw"])), Some(50)); +} + +#[test] +fn power_level_no_roles_assigned() { + let roles = make_roles(&[("admin", Some(100))]); + assert_eq!(compute_user_power_level(&roles, &HashSet::new()), None); +} + +#[test] +fn power_level_unknown_role_ignored() { + let roles = make_roles(&[("admin", Some(100))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["nonexistent"])), None); +} + +#[test] +fn qualifies_with_all_required_roles() { + assert!(roles_satisfy_requirements( + &make_set(&["nsfw", "vip"]), + &make_set(&["nsfw", "vip", "extra"]), + )); +} + +#[test] +fn does_not_qualify_missing_one_role() { + assert!(!roles_satisfy_requirements(&make_set(&["nsfw", "vip"]), &make_set(&["nsfw"]),)); +} + +#[test] +fn qualifies_with_no_requirements() { + assert!(roles_satisfy_requirements(&HashSet::new(), &make_set(&["nsfw"]))); +} + +#[test] +fn does_not_qualify_with_no_roles() { + assert!(!roles_satisfy_requirements(&make_set(&["nsfw"]), &HashSet::new())); +} + +// Multi-space scenarios + +#[test] +fn multi_space_highest_pl_wins() { + let space_a_roles = make_roles(&[("mod", Some(50))]); + let space_b_roles = make_roles(&[("admin", Some(100))]); + + let user_roles_a = make_set(&["mod"]); + let user_roles_b = make_set(&["admin"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(100)); +} + +#[test] +fn multi_space_one_space_has_no_pl() { + let space_a_roles = make_roles(&[("nsfw", None)]); + let space_b_roles = make_roles(&[("mod", Some(50))]); + + let user_roles_a = make_set(&["nsfw"]); + let user_roles_b = make_set(&["mod"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(50)); +} + +#[test] +fn multi_space_neither_has_pl() { + let space_a_roles = make_roles(&[("nsfw", None)]); + let space_b_roles = make_roles(&[("vip", None)]); + + let user_roles_a = make_set(&["nsfw"]); + let user_roles_b = make_set(&["vip"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, None); +} + +#[test] +fn multi_space_user_only_in_one_space() { + let space_a_roles = make_roles(&[("admin", Some(100))]); + let space_b_roles = make_roles(&[("mod", Some(50))]); + + let user_roles_a = make_set(&["admin"]); + let user_roles_b: HashSet = HashSet::new(); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(100)); +} + +#[test] +fn multi_space_qualifies_in_one_not_other() { + let space_a_reqs = make_set(&["staff"]); + let space_b_reqs = make_set(&["nsfw"]); + + let user_roles = make_set(&["nsfw"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles)); +} + +#[test] +fn multi_space_qualifies_after_role_revoke_via_other_space() { + let space_a_reqs = make_set(&["nsfw"]); + let space_b_reqs = make_set(&["vip"]); + + let user_roles_after_revoke = make_set(&["vip"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles_after_revoke)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles_after_revoke)); +} + +#[test] +fn multi_space_room_has_reqs_in_one_space_only() { + let space_a_reqs = make_set(&["admin"]); + let space_b_reqs: HashSet = HashSet::new(); + + let user_roles = make_set(&["nsfw"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles)); +} + +#[test] +fn multi_space_no_qualification_anywhere() { + let space_a_reqs = make_set(&["staff"]); + let space_b_reqs = make_set(&["admin"]); + + let user_roles = make_set(&["nsfw"]); + + let qualifies_a = roles_satisfy_requirements(&space_a_reqs, &user_roles); + let qualifies_b = roles_satisfy_requirements(&space_b_reqs, &user_roles); + + assert!(!qualifies_a); + assert!(!qualifies_b); + assert!(!(qualifies_a || qualifies_b)); +} + +#[test] +fn multi_space_same_role_different_pl() { + let space_a_roles = make_roles(&[("mod", Some(50))]); + let space_b_roles = make_roles(&[("mod", Some(75))]); + + let user_roles = make_set(&["mod"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(75)); +} diff --git a/src/service/services.rs b/src/service/services.rs index 60a7eeab..6356c6ea 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -94,6 +94,7 @@ impl Services { outlier: build!(rooms::outlier::Service), pdu_metadata: build!(rooms::pdu_metadata::Service), read_receipt: build!(rooms::read_receipt::Service), + roles: build!(rooms::roles::Service), search: build!(rooms::search::Service), short: build!(rooms::short::Service), spaces: build!(rooms::spaces::Service),