From 59401e17867ed220cfcef90fd217ea24d4503a99 Mon Sep 17 00:00:00 2001 From: ember33 Date: Fri, 20 Mar 2026 08:52:05 +0100 Subject: [PATCH] feat(spaces): add space roles service with enforcement and caching Implement the roles service with well-factored helper methods: - Cache population via load_user_roles, load_room_requirements, load_child_rooms_index helpers - Enforcement dispatch via enforce_roles_change, enforce_member_change, enforce_room_change, enforce_cascading_toggle - Child management via handle_child_added, handle_child_removed - Kick logic via user_qualifies_in_any_parent, kick_user_from_room - PL computation via compute_effective_pl (highest-wins across spaces) - Per-space enable/disable, graceful shutdown guards --- src/service/rooms/mod.rs | 2 + src/service/rooms/roles/mod.rs | 1257 ++++++++++++++++++++++++++++++ src/service/rooms/roles/tests.rs | 204 +++++ src/service/services.rs | 1 + 4 files changed, 1464 insertions(+) create mode 100644 src/service/rooms/roles/mod.rs create mode 100644 src/service/rooms/roles/tests.rs diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 44a83582..bf4304f7 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -7,6 +7,7 @@ pub mod metadata; pub mod outlier; pub mod pdu_metadata; pub mod read_receipt; +pub mod roles; pub mod search; pub mod short; pub mod spaces; @@ -31,6 +32,7 @@ pub struct Service { pub outlier: Arc, pub pdu_metadata: Arc, pub read_receipt: Arc, + pub roles: Arc, pub search: Arc, pub short: Arc, pub spaces: Arc, diff --git a/src/service/rooms/roles/mod.rs b/src/service/rooms/roles/mod.rs new file mode 100644 index 00000000..dfb22566 --- /dev/null +++ b/src/service/rooms/roles/mod.rs @@ -0,0 +1,1257 @@ +#[cfg(test)] +mod tests; + +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + fmt::Write, + sync::Arc, +}; + +use async_trait::async_trait; +use conduwuit::{ + Err, Event, Result, Server, debug, debug_warn, implement, info, + matrix::pdu::{PduBuilder, PduEvent}, + warn, +}; +use conduwuit_core::matrix::space_roles::{ + RoleDefinition, SPACE_CASCADING_EVENT_TYPE, SPACE_ROLE_MEMBER_EVENT_TYPE, + SPACE_ROLE_ROOM_EVENT_TYPE, SPACE_ROLES_EVENT_TYPE, SpaceCascadingEventContent, + SpaceRoleMemberEventContent, SpaceRoleRoomEventContent, SpaceRolesEventContent, +}; +use futures::StreamExt; +use ruma::{ + Int, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + events::{ + StateEventType, + room::{ + member::{MembershipState, RoomMemberEventContent}, + power_levels::RoomPowerLevelsEventContent, + }, + space::child::SpaceChildEventContent, + }, + room::RoomType, +}; +use serde_json::value::to_raw_value; +use tokio::sync::{RwLock, Semaphore}; + +use crate::{Dep, globals, rooms}; + +#[implement(Service)] +pub async fn flush_space_from_cache(&self, space_id: &RoomId) { + self.roles.write().await.remove(space_id); + self.user_roles.write().await.remove(space_id); + self.room_requirements.write().await.remove(space_id); + let mut room_to_space = self.room_to_space.write().await; + room_to_space.retain(|_, parents| { + parents.remove(space_id); + !parents.is_empty() + }); + drop(room_to_space); + self.space_to_rooms.write().await.remove(space_id); +} + +#[implement(Service)] +async fn flush_caches(&self) { + self.roles.write().await.clear(); + self.user_roles.write().await.clear(); + self.room_requirements.write().await.clear(); + self.room_to_space.write().await.clear(); + self.space_to_rooms.write().await.clear(); +} + +fn roles_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned()) +} + +fn member_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLE_MEMBER_EVENT_TYPE.to_owned()) +} + +fn room_event_type() -> StateEventType { + StateEventType::from(SPACE_ROLE_ROOM_EVENT_TYPE.to_owned()) +} + +fn cascading_event_type() -> StateEventType { + StateEventType::from(SPACE_CASCADING_EVENT_TYPE.to_owned()) +} + +pub struct Service { + services: Services, + server: Arc, + roles: RwLock>>, + user_roles: RwLock>>>, + room_requirements: RwLock>>>, + room_to_space: RwLock>>, + space_to_rooms: RwLock>>, + enforcement_semaphore: Semaphore, + pending_enforcement: RwLock>, +} + +struct Services { + globals: Dep, + metadata: Dep, + state_accessor: Dep, + state_cache: Dep, + state: Dep, + timeline: Dep, +} + +#[async_trait] +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: Services { + globals: args.depend::("globals"), + metadata: args.depend::("rooms::metadata"), + state_accessor: args + .depend::("rooms::state_accessor"), + state_cache: args.depend::("rooms::state_cache"), + state: args.depend::("rooms::state"), + timeline: args.depend::("rooms::timeline"), + }, + server: args.server.clone(), + roles: RwLock::new(HashMap::new()), + user_roles: RwLock::new(HashMap::new()), + room_requirements: RwLock::new(HashMap::new()), + room_to_space: RwLock::new(HashMap::new()), + space_to_rooms: RwLock::new(HashMap::new()), + enforcement_semaphore: Semaphore::new(4), + pending_enforcement: RwLock::new(HashSet::new()), + })) + } + + async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result { + if !self.is_enabled() { + return Ok(()); + } + + let roles = self.roles.read().await.len(); + let user_roles = self.user_roles.read().await.len(); + let room_requirements = self.room_requirements.read().await.len(); + let room_to_space = self.room_to_space.read().await.len(); + let space_to_rooms = self.space_to_rooms.read().await.len(); + + writeln!(out, "space_roles_definitions: {roles}")?; + writeln!(out, "space_user_roles: {user_roles}")?; + writeln!(out, "space_room_requirements: {room_requirements}")?; + writeln!(out, "space_room_to_space_index: {room_to_space}")?; + writeln!(out, "space_space_to_rooms_index: {space_to_rooms}")?; + + Ok(()) + } + + async fn clear_cache(&self) { + if !self.is_enabled() { + return; + } + self.flush_caches().await; + } + + async fn worker(self: Arc) -> Result<()> { + info!("Rebuilding space roles cache from all known rooms"); + + let mut space_count: usize = 0; + let room_ids: Vec = self + .services + .metadata + .iter_ids() + .map(ToOwned::to_owned) + .collect() + .await; + + for room_id in &room_ids { + match self.services.state_accessor.get_room_type(room_id).await { + | Ok(RoomType::Space) => { + // Check per-Space override — skip spaces where cascading is + // disabled + if !self.is_enabled_for_space(room_id).await { + continue; + } + debug!(room_id = %room_id, "Populating space roles cache"); + self.populate_space(room_id).await; + space_count = space_count.saturating_add(1); + }, + | _ => continue, + } + } + + info!(space_count, "Space roles cache rebuilt"); + Ok(()) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +#[implement(Service)] +pub fn is_enabled(&self) -> bool { self.server.config.space_permission_cascading } + +#[implement(Service)] +pub async fn is_enabled_for_space(&self, space_id: &RoomId) -> bool { + let cascading_event_type = cascading_event_type(); + if let Ok(content) = self + .services + .state_accessor + .room_state_get_content::(space_id, &cascading_event_type, "") + .await + { + return content.enabled; + } + + self.server.config.space_permission_cascading +} + +#[implement(Service)] +pub async fn ensure_default_roles(&self, space_id: &RoomId) -> Result { + let server_user = self.services.globals.server_user.as_ref(); + let state_lock = self.services.state.mutex.lock(space_id).await; + + let roles_event_type = roles_event_type(); + if self + .services + .state_accessor + .room_state_get_content::(space_id, &roles_event_type, "") + .await + .is_ok() + { + return Ok(()); + } + + let mut roles = BTreeMap::new(); + roles.insert("admin".to_owned(), RoleDefinition { + description: "Space administrator".to_owned(), + power_level: Some(100), + }); + roles.insert("mod".to_owned(), RoleDefinition { + description: "Space moderator".to_owned(), + power_level: Some(50), + }); + + let content = SpaceRolesEventContent { roles }; + + let pdu = PduBuilder { + event_type: ruma::events::TimelineEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned()), + content: to_raw_value(&content) + .map_err(|e| conduwuit::err!("Failed to serialize SpaceRolesEventContent: {e}"))?, + state_key: Some(String::new().into()), + ..PduBuilder::default() + }; + + self.services + .timeline + .build_and_append_pdu(pdu, server_user, Some(space_id), &state_lock) + .await?; + + debug!(space_id = %space_id, event_type = SPACE_ROLES_EVENT_TYPE, "Sent default space roles event"); + + Ok(()) +} + +/// Returns all `(state_key, pdu)` pairs for state events of the given type +/// within the room state identified by `shortstatehash`. +#[implement(Service)] +async fn load_state_pdus( + &self, + shortstatehash: u64, + event_type: &StateEventType, +) -> Vec<(conduwuit_core::matrix::StateKey, PduEvent)> { + let entries: Vec<(_, OwnedEventId)> = self + .services + .state_accessor + .state_keys_with_ids(shortstatehash, event_type) + .collect() + .await; + + let mut result = Vec::with_capacity(entries.len()); + for (state_key, event_id) in entries { + if let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await { + result.push((state_key, pdu)); + } + } + result +} + +/// Loads all `SpaceRoleMember` state events and writes the resulting +/// user-to-roles mapping into the `user_roles` cache. +#[implement(Service)] +async fn load_user_roles(&self, space_id: &RoomId, shortstatehash: u64) { + let member_event_type = member_event_type(); + let mut user_roles_map: HashMap> = HashMap::new(); + + for (state_key, pdu) in self + .load_state_pdus(shortstatehash, &member_event_type) + .await + { + if let Ok(content) = pdu.get_content::() { + if let Ok(user_id) = UserId::parse(&*state_key) { + user_roles_map.insert(user_id.to_owned(), content.roles.into_iter().collect()); + } + } + } + + self.user_roles + .write() + .await + .insert(space_id.to_owned(), user_roles_map); +} + +/// Loads all `SpaceRoleRoom` state events and writes the resulting +/// room-to-required-roles mapping into the `room_requirements` cache. +#[implement(Service)] +async fn load_room_requirements(&self, space_id: &RoomId, shortstatehash: u64) { + let room_event_type = room_event_type(); + let mut room_reqs_map: HashMap> = HashMap::new(); + + for (state_key, pdu) in self.load_state_pdus(shortstatehash, &room_event_type).await { + if let Ok(content) = pdu.get_content::() { + if let Ok(room_id) = RoomId::parse(&*state_key) { + room_reqs_map + .insert(room_id.to_owned(), content.required_roles.into_iter().collect()); + } + } + } + + self.room_requirements + .write() + .await + .insert(space_id.to_owned(), room_reqs_map); +} + +/// Loads all `SpaceChild` state events and updates both the +/// `room_to_space` and `space_to_rooms` indexes. +#[implement(Service)] +async fn load_child_rooms_index(&self, space_id: &RoomId, shortstatehash: u64) { + let mut child_rooms: Vec = Vec::new(); + + for (state_key, pdu) in self + .load_state_pdus(shortstatehash, &StateEventType::SpaceChild) + .await + { + if let Ok(content) = pdu.get_content::() { + if content.via.is_empty() { + continue; + } + } else { + continue; + } + if let Ok(child_room_id) = RoomId::parse(&*state_key) { + child_rooms.push(child_room_id.to_owned()); + } + } + + { + let mut room_to_space = self.room_to_space.write().await; + room_to_space.retain(|_, parents| { + parents.remove(space_id); + !parents.is_empty() + }); + for child_room_id in &child_rooms { + room_to_space + .entry(child_room_id.clone()) + .or_default() + .insert(space_id.to_owned()); + } + } + + { + let mut space_to_rooms = self.space_to_rooms.write().await; + space_to_rooms.insert(space_id.to_owned(), child_rooms.into_iter().collect()) + }; +} + +#[implement(Service)] +pub async fn populate_space(&self, space_id: &RoomId) { + if !self.is_enabled_for_space(space_id).await { + return; + } + + if self.roles.read().await.len() + >= usize::try_from(self.server.config.space_roles_cache_flush_threshold) + .unwrap_or(usize::MAX) + { + self.flush_caches().await; + debug_warn!("Space roles cache exceeded capacity, cleared"); + } + + let roles_event_type = roles_event_type(); + if let Ok(content) = self + .services + .state_accessor + .room_state_get_content::(space_id, &roles_event_type, "") + .await + { + self.roles + .write() + .await + .insert(space_id.to_owned(), content.roles); + } + + let shortstatehash = match self.services.state.get_room_shortstatehash(space_id).await { + | Ok(hash) => hash, + | Err(e) => { + debug_warn!(space_id = %space_id, error = ?e, "Failed to get shortstatehash, cache may be stale"); + return; + }, + }; + + self.load_user_roles(space_id, shortstatehash).await; + self.load_room_requirements(space_id, shortstatehash).await; + self.load_child_rooms_index(space_id, shortstatehash).await; +} + +#[must_use] +pub fn compute_user_power_level( + role_defs: &BTreeMap, + assigned: &HashSet, +) -> Option { + assigned + .iter() + .filter_map(|role_name| role_defs.get(role_name)?.power_level) + .max() +} + +#[must_use] +pub fn roles_satisfy_requirements( + required: &HashSet, + assigned: &HashSet, +) -> bool { + required.iter().all(|r| assigned.contains(r)) +} + +#[implement(Service)] +pub async fn get_user_power_level(&self, space_id: &RoomId, user_id: &UserId) -> Option { + let role_defs = { self.roles.read().await.get(space_id).cloned()? }; + let user_assigned = { + self.user_roles + .read() + .await + .get(space_id)? + .get(user_id) + .cloned()? + }; + compute_user_power_level(&role_defs, &user_assigned) +} + +#[implement(Service)] +pub async fn user_qualifies_for_room( + &self, + space_id: &RoomId, + room_id: &RoomId, + user_id: &UserId, +) -> bool { + let required = { + let guard = self.room_requirements.read().await; + let Some(space_reqs) = guard.get(space_id) else { + return true; + }; + let Some(required) = space_reqs.get(room_id) else { + return true; + }; + if required.is_empty() { + return true; + } + required.clone() + }; + let user_assigned = { + let guard = self.user_roles.read().await; + let Some(space_users) = guard.get(space_id) else { + return false; + }; + let Some(assigned) = space_users.get(user_id) else { + return false; + }; + assigned.clone() + }; + roles_satisfy_requirements(&required, &user_assigned) +} + +#[implement(Service)] +pub async fn get_parent_spaces(&self, room_id: &RoomId) -> Vec { + let all_parents: Vec = self + .room_to_space + .read() + .await + .get(room_id) + .map(|set| set.iter().cloned().collect()) + .unwrap_or_default(); + + let mut enabled_parents = Vec::new(); + for parent in all_parents { + if self.is_enabled_for_space(&parent).await { + enabled_parents.push(parent); + } + } + enabled_parents +} + +#[implement(Service)] +pub async fn get_child_rooms(&self, space_id: &RoomId) -> Vec { + self.space_to_rooms + .read() + .await + .get(space_id) + .map(|set| set.iter().cloned().collect()) + .unwrap_or_default() +} + +#[implement(Service)] +pub async fn get_user_roles_in_space( + &self, + space_id: &RoomId, + user_id: &UserId, +) -> Option> { + self.user_roles + .read() + .await + .get(space_id)? + .get(user_id) + .cloned() +} + +#[implement(Service)] +pub async fn get_room_requirements_in_space( + &self, + space_id: &RoomId, + room_id: &RoomId, +) -> Option> { + self.room_requirements + .read() + .await + .get(space_id)? + .get(room_id) + .cloned() +} + +#[implement(Service)] +pub async fn sync_power_levels(&self, room_id: &RoomId) -> Result { + let server_user = self.services.globals.server_user.as_ref(); + if !self + .services + .state_cache + .is_joined(server_user, room_id) + .await + { + debug_warn!(room_id = %room_id, "Server user is not joined, skipping PL sync"); + return Ok(()); + } + + let mut power_levels_content: RoomPowerLevelsEventContent = self + .services + .state_accessor + .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") + .await + .unwrap_or_default(); + + let members: Vec = self + .services + .state_cache + .room_members(room_id) + .map(ToOwned::to_owned) + .collect() + .await; + + let all_parents = self.get_parent_spaces(room_id).await; + + let mut changed = false; + for user_id in &members { + if user_id == server_user { + continue; + } + + if let Some(effective_pl) = self.compute_effective_pl(&all_parents, user_id).await { + let effective_pl_int = Int::new_saturating(effective_pl); + let current_pl = power_levels_content + .users + .get(user_id) + .copied() + .unwrap_or(power_levels_content.users_default); + + if current_pl != effective_pl_int { + power_levels_content + .users + .insert(user_id.clone(), effective_pl_int); + changed = true; + } + } + } + + if changed { + let state_lock = self.services.state.mutex.lock(room_id).await; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(String::new(), &power_levels_content), + server_user, + Some(room_id), + &state_lock, + ) + .await?; + } + + Ok(()) +} + +/// Computes the maximum effective power level for a user across all given +/// parent spaces. Returns `None` if the user has no power level defined in +/// any parent. +#[implement(Service)] +async fn compute_effective_pl( + &self, + parent_spaces: &[OwnedRoomId], + user_id: &UserId, +) -> Option { + let mut max_pl: Option = None; + for parent in parent_spaces { + if let Some(pl) = self.get_user_power_level(parent, user_id).await { + max_pl = Some(max_pl.map_or(pl, |current| current.max(pl))); + } + } + max_pl +} + +#[implement(Service)] +pub async fn auto_join_qualifying_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result { + if !self.is_enabled_for_space(space_id).await { + return Ok(()); + } + + let server_user = self.services.globals.server_user.as_ref(); + if user_id == server_user { + return Ok(()); + } + + let child_rooms = self.get_child_rooms(space_id).await; + + for child_room_id in &child_rooms { + if self + .services + .state_cache + .is_joined(user_id, child_room_id) + .await + { + continue; + } + + if !self + .user_qualifies_for_room(space_id, child_room_id, user_id) + .await + { + continue; + } + + if !self + .services + .state_cache + .is_joined(server_user, child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join"); + continue; + } + + if let Err(e) = self + .invite_and_join_user(child_room_id, user_id, server_user) + .await + { + debug_warn!(user_id = %user_id, room_id = %child_room_id, error = ?e, "Failed to auto-join user"); + } + } + + Ok(()) +} + +#[implement(Service)] +async fn invite_and_join_user( + &self, + room_id: &RoomId, + user_id: &UserId, + server_user: &UserId, +) -> Result { + let state_lock = self.services.state.mutex.lock(room_id).await; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state( + user_id.to_string(), + &RoomMemberEventContent::new(MembershipState::Invite), + ), + server_user, + Some(room_id), + &state_lock, + ) + .await?; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state( + user_id.to_string(), + &RoomMemberEventContent::new(MembershipState::Join), + ), + user_id, + Some(room_id), + &state_lock, + ) + .await?; + + Ok(()) +} + +/// Called from append_pdu after a PDU is persisted. Dispatches to the +/// appropriate handler based on event type. +#[implement(Service)] +pub fn on_pdu_appended(self: &Arc, room_id: &RoomId, pdu: &PduEvent) { + if let Some(state_key) = pdu.state_key() { + let event_type_str = pdu.event_type().to_string(); + match event_type_str.as_str() { + | SPACE_ROLES_EVENT_TYPE + | SPACE_ROLE_MEMBER_EVENT_TYPE + | SPACE_ROLE_ROOM_EVENT_TYPE + | SPACE_CASCADING_EVENT_TYPE => { + self.handle_state_event_change( + room_id.to_owned(), + event_type_str, + state_key.to_owned(), + ); + }, + | _ => { + if *pdu.kind() == ruma::events::TimelineEventType::SpaceChild { + if let Ok(child_room_id) = RoomId::parse(&*state_key) { + self.handle_space_child_change( + room_id.to_owned(), + child_room_id.to_owned(), + ); + } + } + if *pdu.kind() == ruma::events::TimelineEventType::RoomMember { + if let Ok(content) = pdu.get_content::() { + if let Ok(user_id) = UserId::parse(&*state_key) { + match content.membership { + | MembershipState::Join => { + self.handle_space_member_join( + room_id.to_owned(), + user_id.to_owned(), + ); + }, + | MembershipState::Leave | MembershipState::Ban => { + self.handle_space_member_leave( + room_id.to_owned(), + user_id.to_owned(), + ); + }, + | _ => {}, + } + } + } + } + }, + } + } +} + +/// Called from build_and_append_pdu to validate PL changes don't conflict +/// with space-granted power levels. +#[implement(Service)] +pub async fn validate_pl_change( + &self, + room_id: &RoomId, + sender: &UserId, + proposed: &RoomPowerLevelsEventContent, +) -> Result { + if sender == self.services.globals.server_user.as_str() { + return Ok(()); + } + + let parent_spaces = self.get_parent_spaces(room_id).await; + if parent_spaces.is_empty() { + return Ok(()); + } + + let mut effective_pls: HashMap = HashMap::new(); + { + let roles_guard = self.roles.read().await; + let user_roles_guard = self.user_roles.read().await; + for ps in &parent_spaces { + let Some(space_users) = user_roles_guard.get(ps) else { + continue; + }; + let Some(role_defs) = roles_guard.get(ps) else { + continue; + }; + for (user_id, assigned_roles) in space_users { + let pl = assigned_roles + .iter() + .filter_map(|r| role_defs.get(r)?.power_level) + .max(); + if let Some(pl) = pl { + effective_pls + .entry(user_id.clone()) + .and_modify(|current| *current = (*current).max(pl)) + .or_insert(pl); + } + } + } + } + + for (user_id, effective_pl) in &effective_pls { + if !self.services.state_cache.is_joined(user_id, room_id).await { + continue; + } + match proposed.users.get(user_id) { + | None if i64::from(proposed.users_default) != *effective_pl => { + debug_warn!( + user_id = %user_id, + room_id = %room_id, + effective_pl, + "Rejecting PL change: space-managed user omitted" + ); + return Err!(Request(Forbidden( + "Cannot omit a user whose power level is managed by Space roles" + ))); + }, + | Some(pl) if i64::from(*pl) != *effective_pl => { + debug_warn!( + user_id = %user_id, + room_id = %room_id, + proposed_pl = i64::from(*pl), + effective_pl, + "Rejecting PL change conflicting with space role" + ); + return Err!(Request(Forbidden( + "Cannot change power level that is set by Space roles" + ))); + }, + | _ => {}, + } + } + + Ok(()) +} + +/// Called from join_room_by_id_helper to check if a user has the required +/// Space roles to join a room. +#[implement(Service)] +pub async fn check_join_allowed(&self, room_id: &RoomId, user_id: &UserId) -> Result { + let parent_spaces = self.get_parent_spaces(room_id).await; + if parent_spaces.is_empty() { + return Ok(()); + } + + for parent_space in &parent_spaces { + if self + .user_qualifies_for_room(parent_space, room_id, user_id) + .await + { + return Ok(()); + } + } + + Err!(Request(Forbidden("You do not have the required Space roles to join this room"))) +} + +#[implement(Service)] +async fn sync_power_levels_for_children(&self, space_id: &RoomId) { + let child_rooms = self.get_child_rooms(space_id).await; + for child_room_id in &child_rooms { + if let Err(e) = self.sync_power_levels(child_room_id).await { + debug_warn!(room_id = %child_room_id, error = ?e, "Failed to sync power levels"); + } + } +} + +/// Enforces a change to the space role definitions: syncs power levels to +/// all child rooms and kicks members who no longer qualify. +#[implement(Service)] +async fn enforce_roles_change(&self, space_id: &RoomId) { + self.sync_power_levels_for_children(space_id).await; + let space_members: Vec = self + .services + .state_cache + .room_members(space_id) + .map(ToOwned::to_owned) + .collect() + .await; + for member in &space_members { + if let Err(e) = Box::pin(self.kick_unqualified_from_rooms(space_id, member)).await { + debug_warn!(user_id = %member, error = ?e, "Role definition revalidation kick failed"); + } + } +} + +/// Enforces a change to a user's role membership: auto-joins qualifying +/// rooms, kicks from rooms they no longer qualify for, and syncs power +/// levels. +#[implement(Service)] +async fn enforce_member_change(&self, space_id: &RoomId, user_id: &UserId) { + if let Err(e) = self.auto_join_qualifying_rooms(space_id, user_id).await { + debug_warn!(user_id = %user_id, error = ?e, "Space role auto-join failed"); + } + if let Err(e) = Box::pin(self.kick_unqualified_from_rooms(space_id, user_id)).await { + debug_warn!(user_id = %user_id, error = ?e, "Space role auto-kick failed"); + } + self.sync_power_levels_for_children(space_id).await; +} + +/// Enforces a change to a room's role requirements: kicks all members of +/// the target room who no longer meet the updated requirements. +#[implement(Service)] +async fn enforce_room_change(&self, space_id: &RoomId, target_room: &RoomId) { + let members: Vec = self + .services + .state_cache + .room_members(target_room) + .map(ToOwned::to_owned) + .collect() + .await; + for member in &members { + if let Err(e) = Box::pin(self.kick_unqualified_from_rooms(space_id, member)).await { + debug_warn!(user_id = %member, error = ?e, "Space role requirement kick failed"); + } + } +} + +/// Enforces a toggle of the cascading feature: if cascading was disabled, +/// flushes the space from the cache. +#[implement(Service)] +async fn enforce_cascading_toggle(&self, space_id: &RoomId) { + if !self.is_enabled_for_space(space_id).await { + self.flush_space_from_cache(space_id).await; + } +} + +impl Service { + pub fn handle_state_event_change( + self: &Arc, + space_id: OwnedRoomId, + event_type: String, + state_key: String, + ) { + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if event_type != SPACE_CASCADING_EVENT_TYPE + && !this.is_enabled_for_space(&space_id).await + { + return; + } + { + let mut pending = this.pending_enforcement.write().await; + if pending.contains(&space_id) { + return; + } + pending.insert(space_id.clone()) + }; + + async { + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + this.populate_space(&space_id).await; + + match event_type.as_str() { + | SPACE_ROLES_EVENT_TYPE => { + this.enforce_roles_change(&space_id).await; + }, + | SPACE_ROLE_MEMBER_EVENT_TYPE => { + if let Ok(user_id) = UserId::parse(state_key.as_str()) { + this.enforce_member_change(&space_id, user_id).await; + } + }, + | SPACE_ROLE_ROOM_EVENT_TYPE => { + if let Ok(target_room) = RoomId::parse(state_key.as_str()) { + this.enforce_room_change(&space_id, target_room).await; + } + }, + | SPACE_CASCADING_EVENT_TYPE => { + this.enforce_cascading_toggle(&space_id).await; + }, + | _ => {}, + } + } + .await; + + this.pending_enforcement.write().await.remove(&space_id); + }); + } + + pub fn handle_space_child_change( + self: &Arc, + space_id: OwnedRoomId, + child_room_id: OwnedRoomId, + ) { + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + let child_event_type = StateEventType::SpaceChild; + let is_removal = match this + .services + .state_accessor + .room_state_get_content::( + &space_id, + &child_event_type, + child_room_id.as_str(), + ) + .await + { + | Ok(content) => content.via.is_empty(), + | Err(_) => true, // If we can't read it, treat as removal + }; + + if is_removal { + this.handle_child_removed(&space_id, &child_room_id).await; + } else { + this.handle_child_added(&space_id, &child_room_id).await; + } + }); + } + + pub fn handle_space_member_join( + self: &Arc, + space_id: OwnedRoomId, + user_id: OwnedUserId, + ) { + if user_id == self.services.globals.server_user { + return; + } + + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + if let Err(e) = this.auto_join_qualifying_rooms(&space_id, &user_id).await { + debug_warn!(user_id = %user_id, error = ?e, "Auto-join on Space join failed"); + } + this.sync_power_levels_for_children(&space_id).await; + }); + } + + pub fn handle_space_member_leave( + self: &Arc, + space_id: OwnedRoomId, + user_id: OwnedUserId, + ) { + if user_id == self.services.globals.server_user { + return; + } + + let this = Arc::clone(self); + self.server.runtime().spawn(async move { + if !this.server.running() { + return; + } + if !this.is_enabled_for_space(&space_id).await { + return; + } + + let Ok(_permit) = this.enforcement_semaphore.acquire().await else { + return; + }; + + if let Err(e) = Box::pin(this.kick_unqualified_from_rooms(&space_id, &user_id)).await + { + debug_warn!(user_id = %user_id, error = ?e, "Kick on Space leave failed"); + } + }); + } +} + +#[implement(Service)] +async fn handle_child_removed(&self, space_id: &RoomId, child_room_id: &RoomId) { + let mut room_to_space = self.room_to_space.write().await; + if let Some(parents) = room_to_space.get_mut(child_room_id) { + parents.remove(space_id); + if parents.is_empty() { + room_to_space.remove(child_room_id); + } + } + let mut space_to_rooms = self.space_to_rooms.write().await; + if let Some(children) = space_to_rooms.get_mut(space_id) { + children.remove(child_room_id); + } +} + +#[implement(Service)] +async fn handle_child_added(&self, space_id: &RoomId, child_room_id: &RoomId) { + self.room_to_space + .write() + .await + .entry(child_room_id.to_owned()) + .or_default() + .insert(space_id.to_owned()); + + self.space_to_rooms + .write() + .await + .entry(space_id.to_owned()) + .or_default() + .insert(child_room_id.to_owned()); + + let server_user = self.services.globals.server_user.as_ref(); + if !self + .services + .state_cache + .is_joined(server_user, child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join enforcement for new child"); + return; + } + + let space_members: Vec = self + .services + .state_cache + .room_members(space_id) + .map(ToOwned::to_owned) + .collect() + .await; + + for member in &space_members { + if self + .user_qualifies_for_room(space_id, child_room_id, member) + .await && !self + .services + .state_cache + .is_joined(member, child_room_id) + .await + { + if let Err(e) = self + .invite_and_join_user(child_room_id, member, server_user) + .await + { + debug_warn!(user_id = %member, room_id = %child_room_id, error = ?e, "Failed to auto-join user"); + } + } + } +} + +#[implement(Service)] +pub async fn kick_unqualified_from_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result { + if !self.is_enabled_for_space(space_id).await { + return Ok(()); + } + + let server_user = self.services.globals.server_user.as_ref(); + if user_id == server_user { + return Ok(()); + } + + let child_rooms: Vec = self + .room_requirements + .read() + .await + .get(space_id) + .map(|reqs| reqs.keys().cloned().collect()) + .unwrap_or_default(); + + for child_room_id in &child_rooms { + if !self + .services + .state_cache + .is_joined(server_user, child_room_id) + .await + { + debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping kick enforcement"); + continue; + } + if !self + .services + .state_cache + .is_joined(user_id, child_room_id) + .await + { + continue; + } + + if self + .user_qualifies_in_any_parent(child_room_id, user_id) + .await + { + continue; + } + + self.kick_user_from_room(child_room_id, user_id, server_user) + .await?; + } + + Ok(()) +} + +/// Checks whether the user qualifies for the given room through any of its +/// parent spaces. +#[implement(Service)] +async fn user_qualifies_in_any_parent(&self, room_id: &RoomId, user_id: &UserId) -> bool { + let all_parents = self.get_parent_spaces(room_id).await; + for parent in &all_parents { + if self.user_qualifies_for_room(parent, room_id, user_id).await { + return true; + } + } + false +} + +/// Sends a kick (membership=leave) PDU to remove a user from a room for +/// missing required Space roles. +#[implement(Service)] +async fn kick_user_from_room( + &self, + room_id: &RoomId, + user_id: &UserId, + server_user: &UserId, +) -> Result { + let Ok(member_content) = self + .services + .state_accessor + .get_member(room_id, user_id) + .await + else { + debug_warn!(user_id = %user_id, room_id = %room_id, "Could not get member event, skipping kick"); + return Ok(()); + }; + + let state_lock = self.services.state.mutex.lock(room_id).await; + + if let Err(e) = self + .services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason: Some("No longer has required Space roles".into()), + is_direct: None, + join_authorized_via_users_server: None, + third_party_invite: None, + ..member_content + }), + server_user, + Some(room_id), + &state_lock, + ) + .await + { + warn!(user_id = %user_id, room_id = %room_id, error = ?e, "Failed to kick user for missing roles"); + } + + Ok(()) +} diff --git a/src/service/rooms/roles/tests.rs b/src/service/rooms/roles/tests.rs new file mode 100644 index 00000000..fedcfd79 --- /dev/null +++ b/src/service/rooms/roles/tests.rs @@ -0,0 +1,204 @@ +use std::collections::{BTreeMap, HashSet}; + +use conduwuit_core::matrix::space_roles::RoleDefinition; + +use super::{compute_user_power_level, roles_satisfy_requirements}; + +pub(super) fn make_roles(entries: &[(&str, Option)]) -> BTreeMap { + entries + .iter() + .map(|(name, pl)| { + ((*name).to_owned(), RoleDefinition { + description: format!("{name} role"), + power_level: *pl, + }) + }) + .collect() +} + +pub(super) fn make_set(items: &[&str]) -> HashSet { + items.iter().map(|s| (*s).to_owned()).collect() +} + +#[test] +fn power_level_single_role() { + let roles = make_roles(&[("admin", Some(100)), ("mod", Some(50))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["admin"])), Some(100)); +} + +#[test] +fn power_level_multiple_roles_takes_highest() { + let roles = make_roles(&[("admin", Some(100)), ("mod", Some(50)), ("helper", Some(25))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["mod", "helper"])), Some(50)); +} + +#[test] +fn power_level_no_power_roles() { + let roles = make_roles(&[("nsfw", None), ("vip", None)]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["nsfw", "vip"])), None); +} + +#[test] +fn power_level_mixed_roles() { + let roles = make_roles(&[("mod", Some(50)), ("nsfw", None)]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["mod", "nsfw"])), Some(50)); +} + +#[test] +fn power_level_no_roles_assigned() { + let roles = make_roles(&[("admin", Some(100))]); + assert_eq!(compute_user_power_level(&roles, &HashSet::new()), None); +} + +#[test] +fn power_level_unknown_role_ignored() { + let roles = make_roles(&[("admin", Some(100))]); + assert_eq!(compute_user_power_level(&roles, &make_set(&["nonexistent"])), None); +} + +#[test] +fn qualifies_with_all_required_roles() { + assert!(roles_satisfy_requirements( + &make_set(&["nsfw", "vip"]), + &make_set(&["nsfw", "vip", "extra"]), + )); +} + +#[test] +fn does_not_qualify_missing_one_role() { + assert!(!roles_satisfy_requirements(&make_set(&["nsfw", "vip"]), &make_set(&["nsfw"]),)); +} + +#[test] +fn qualifies_with_no_requirements() { + assert!(roles_satisfy_requirements(&HashSet::new(), &make_set(&["nsfw"]))); +} + +#[test] +fn does_not_qualify_with_no_roles() { + assert!(!roles_satisfy_requirements(&make_set(&["nsfw"]), &HashSet::new())); +} + +// Multi-space scenarios + +#[test] +fn multi_space_highest_pl_wins() { + let space_a_roles = make_roles(&[("mod", Some(50))]); + let space_b_roles = make_roles(&[("admin", Some(100))]); + + let user_roles_a = make_set(&["mod"]); + let user_roles_b = make_set(&["admin"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(100)); +} + +#[test] +fn multi_space_one_space_has_no_pl() { + let space_a_roles = make_roles(&[("nsfw", None)]); + let space_b_roles = make_roles(&[("mod", Some(50))]); + + let user_roles_a = make_set(&["nsfw"]); + let user_roles_b = make_set(&["mod"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(50)); +} + +#[test] +fn multi_space_neither_has_pl() { + let space_a_roles = make_roles(&[("nsfw", None)]); + let space_b_roles = make_roles(&[("vip", None)]); + + let user_roles_a = make_set(&["nsfw"]); + let user_roles_b = make_set(&["vip"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, None); +} + +#[test] +fn multi_space_user_only_in_one_space() { + let space_a_roles = make_roles(&[("admin", Some(100))]); + let space_b_roles = make_roles(&[("mod", Some(50))]); + + let user_roles_a = make_set(&["admin"]); + let user_roles_b: HashSet = HashSet::new(); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles_a); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles_b); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(100)); +} + +#[test] +fn multi_space_qualifies_in_one_not_other() { + let space_a_reqs = make_set(&["staff"]); + let space_b_reqs = make_set(&["nsfw"]); + + let user_roles = make_set(&["nsfw"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles)); +} + +#[test] +fn multi_space_qualifies_after_role_revoke_via_other_space() { + let space_a_reqs = make_set(&["nsfw"]); + let space_b_reqs = make_set(&["vip"]); + + let user_roles_after_revoke = make_set(&["vip"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles_after_revoke)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles_after_revoke)); +} + +#[test] +fn multi_space_room_has_reqs_in_one_space_only() { + let space_a_reqs = make_set(&["admin"]); + let space_b_reqs: HashSet = HashSet::new(); + + let user_roles = make_set(&["nsfw"]); + + assert!(!roles_satisfy_requirements(&space_a_reqs, &user_roles)); + assert!(roles_satisfy_requirements(&space_b_reqs, &user_roles)); +} + +#[test] +fn multi_space_no_qualification_anywhere() { + let space_a_reqs = make_set(&["staff"]); + let space_b_reqs = make_set(&["admin"]); + + let user_roles = make_set(&["nsfw"]); + + let qualifies_a = roles_satisfy_requirements(&space_a_reqs, &user_roles); + let qualifies_b = roles_satisfy_requirements(&space_b_reqs, &user_roles); + + assert!(!qualifies_a); + assert!(!qualifies_b); + assert!(!(qualifies_a || qualifies_b)); +} + +#[test] +fn multi_space_same_role_different_pl() { + let space_a_roles = make_roles(&[("mod", Some(50))]); + let space_b_roles = make_roles(&[("mod", Some(75))]); + + let user_roles = make_set(&["mod"]); + + let pl_a = compute_user_power_level(&space_a_roles, &user_roles); + let pl_b = compute_user_power_level(&space_b_roles, &user_roles); + + let effective = [pl_a, pl_b].into_iter().flatten().max(); + assert_eq!(effective, Some(75)); +} diff --git a/src/service/services.rs b/src/service/services.rs index 60a7eeab..6356c6ea 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -94,6 +94,7 @@ impl Services { outlier: build!(rooms::outlier::Service), pdu_metadata: build!(rooms::pdu_metadata::Service), read_receipt: build!(rooms::read_receipt::Service), + roles: build!(rooms::roles::Service), search: build!(rooms::search::Service), short: build!(rooms::short::Service), spaces: build!(rooms::spaces::Service),