continuwuity/src/service/rooms/roles/mod.rs
ember33 4df2fe2923
Some checks failed
Documentation / Build and Deploy Documentation (pull_request) Has been skipped
Checks / Prek / Clippy and Cargo Tests (pull_request) Failing after 5s
Update flake hashes / update-flake-hashes (pull_request) Failing after 5s
Checks / Prek / Pre-commit & Formatting (pull_request) Failing after 4s
fix(spaces): remove is_enabled_for_space guard from ensure_default_roles
The guard prevented ensure_default_roles from working in the enable
command — it checked the cascading state event which hasn't been
written yet at that point. Callers should gate this themselves.
2026-03-19 19:55:44 +01:00

1199 lines
30 KiB
Rust

#[cfg(test)]
mod tests;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Write,
sync::Arc,
};
use async_trait::async_trait;
use conduwuit::{
Err, Event, Result, Server, debug, debug_warn, implement, info,
matrix::pdu::{PduBuilder, PduEvent},
warn,
};
use conduwuit_core::{
matrix::space_roles::{
RoleDefinition, SPACE_CASCADING_EVENT_TYPE, SPACE_ROLE_MEMBER_EVENT_TYPE,
SPACE_ROLE_ROOM_EVENT_TYPE, SPACE_ROLES_EVENT_TYPE, SpaceCascadingEventContent,
SpaceRoleMemberEventContent, SpaceRoleRoomEventContent, SpaceRolesEventContent,
},
utils::{
future::TryExtExt,
stream::{BroadbandExt, ReadyExt},
},
};
use futures::{StreamExt, TryFutureExt};
use ruma::{
Int, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
events::{
StateEventType,
room::{
member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent,
},
space::child::SpaceChildEventContent,
},
room::RoomType,
};
use serde_json::value::to_raw_value;
use tokio::sync::{RwLock, Semaphore};
use crate::{Dep, globals, rooms};
#[implement(Service)]
pub async fn flush_space_from_cache(&self, space_id: &RoomId) {
self.roles.write().await.remove(space_id);
self.user_roles.write().await.remove(space_id);
self.room_requirements.write().await.remove(space_id);
let mut room_to_space = self.room_to_space.write().await;
room_to_space.retain(|_, parents| {
parents.remove(space_id);
!parents.is_empty()
});
drop(room_to_space);
self.space_to_rooms.write().await.remove(space_id);
}
#[implement(Service)]
async fn flush_caches(&self) {
self.roles.write().await.clear();
self.user_roles.write().await.clear();
self.room_requirements.write().await.clear();
self.room_to_space.write().await.clear();
self.space_to_rooms.write().await.clear();
}
fn roles_event_type() -> StateEventType {
StateEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned())
}
fn member_event_type() -> StateEventType {
StateEventType::from(SPACE_ROLE_MEMBER_EVENT_TYPE.to_owned())
}
fn room_event_type() -> StateEventType {
StateEventType::from(SPACE_ROLE_ROOM_EVENT_TYPE.to_owned())
}
fn cascading_event_type() -> StateEventType {
StateEventType::from(SPACE_CASCADING_EVENT_TYPE.to_owned())
}
pub struct Service {
services: Services,
server: Arc<Server>,
roles: RwLock<HashMap<OwnedRoomId, BTreeMap<String, RoleDefinition>>>,
user_roles: RwLock<HashMap<OwnedRoomId, HashMap<OwnedUserId, HashSet<String>>>>,
room_requirements: RwLock<HashMap<OwnedRoomId, HashMap<OwnedRoomId, HashSet<String>>>>,
room_to_space: RwLock<HashMap<OwnedRoomId, HashSet<OwnedRoomId>>>,
space_to_rooms: RwLock<HashMap<OwnedRoomId, HashSet<OwnedRoomId>>>,
enforcement_semaphore: Semaphore,
pending_enforcement: RwLock<HashSet<OwnedRoomId>>,
}
struct Services {
globals: Dep<globals::Service>,
metadata: Dep<rooms::metadata::Service>,
state_accessor: Dep<rooms::state_accessor::Service>,
state_cache: Dep<rooms::state_cache::Service>,
state: Dep<rooms::state::Service>,
timeline: Dep<rooms::timeline::Service>,
}
#[async_trait]
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: Services {
globals: args.depend::<globals::Service>("globals"),
metadata: args.depend::<rooms::metadata::Service>("rooms::metadata"),
state_accessor: args
.depend::<rooms::state_accessor::Service>("rooms::state_accessor"),
state_cache: args.depend::<rooms::state_cache::Service>("rooms::state_cache"),
state: args.depend::<rooms::state::Service>("rooms::state"),
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
},
server: args.server.clone(),
roles: RwLock::new(HashMap::new()),
user_roles: RwLock::new(HashMap::new()),
room_requirements: RwLock::new(HashMap::new()),
room_to_space: RwLock::new(HashMap::new()),
space_to_rooms: RwLock::new(HashMap::new()),
enforcement_semaphore: Semaphore::new(4),
pending_enforcement: RwLock::new(HashSet::new()),
}))
}
async fn memory_usage(&self, out: &mut (dyn Write + Send)) -> Result {
if !self.is_enabled() {
return Ok(());
}
let roles = self.roles.read().await.len();
let user_roles = self.user_roles.read().await.len();
let room_requirements = self.room_requirements.read().await.len();
let room_to_space = self.room_to_space.read().await.len();
let space_to_rooms = self.space_to_rooms.read().await.len();
writeln!(out, "space_roles_definitions: {roles}")?;
writeln!(out, "space_user_roles: {user_roles}")?;
writeln!(out, "space_room_requirements: {room_requirements}")?;
writeln!(out, "space_room_to_space_index: {room_to_space}")?;
writeln!(out, "space_space_to_rooms_index: {space_to_rooms}")?;
Ok(())
}
async fn clear_cache(&self) {
if !self.is_enabled() {
return;
}
self.flush_caches().await;
}
async fn worker(self: Arc<Self>) -> Result<()> {
info!("Rebuilding space roles cache from all known rooms");
let mut space_count: usize = 0;
let room_ids: Vec<OwnedRoomId> = self
.services
.metadata
.iter_ids()
.map(ToOwned::to_owned)
.collect()
.await;
for room_id in &room_ids {
match self.services.state_accessor.get_room_type(room_id).await {
| Ok(RoomType::Space) => {
// Check per-Space override — skip spaces where cascading is
// disabled
if !self.is_enabled_for_space(room_id).await {
continue;
}
debug!(room_id = %room_id, "Populating space roles cache");
self.populate_space(room_id).await;
space_count = space_count.saturating_add(1);
},
| _ => continue,
}
}
info!(space_count, "Space roles cache rebuilt");
Ok(())
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
#[implement(Service)]
pub fn is_enabled(&self) -> bool { self.server.config.space_permission_cascading }
#[implement(Service)]
pub async fn is_enabled_for_space(&self, space_id: &RoomId) -> bool {
let cascading_event_type = cascading_event_type();
if let Ok(content) = self
.services
.state_accessor
.room_state_get_content::<SpaceCascadingEventContent>(space_id, &cascading_event_type, "")
.await
{
return content.enabled;
}
self.server.config.space_permission_cascading
}
#[implement(Service)]
pub async fn ensure_default_roles(&self, space_id: &RoomId) -> Result {
let server_user = self.services.globals.server_user.as_ref();
let state_lock = self.services.state.mutex.lock(space_id).await;
let roles_event_type = roles_event_type();
if self
.services
.state_accessor
.room_state_get_content::<SpaceRolesEventContent>(space_id, &roles_event_type, "")
.await
.is_ok()
{
return Ok(());
}
let mut roles = BTreeMap::new();
roles.insert("admin".to_owned(), RoleDefinition {
description: "Space administrator".to_owned(),
power_level: Some(100),
});
roles.insert("mod".to_owned(), RoleDefinition {
description: "Space moderator".to_owned(),
power_level: Some(50),
});
let content = SpaceRolesEventContent { roles };
let pdu = PduBuilder {
event_type: ruma::events::TimelineEventType::from(SPACE_ROLES_EVENT_TYPE.to_owned()),
content: to_raw_value(&content)
.map_err(|e| conduwuit::err!("Failed to serialize SpaceRolesEventContent: {e}"))?,
state_key: Some(String::new().into()),
..PduBuilder::default()
};
self.services
.timeline
.build_and_append_pdu(pdu, server_user, Some(space_id), &state_lock)
.await?;
debug!(space_id = %space_id, event_type = SPACE_ROLES_EVENT_TYPE, "Sent default space roles event");
Ok(())
}
#[implement(Service)]
pub async fn populate_space(&self, space_id: &RoomId) {
if !self.is_enabled_for_space(space_id).await {
return;
}
if self.roles.read().await.len()
>= usize::try_from(self.server.config.space_roles_cache_flush_threshold)
.unwrap_or(usize::MAX)
{
self.flush_caches().await;
debug_warn!("Space roles cache exceeded capacity, cleared");
}
let roles_event_type = roles_event_type();
if let Ok(content) = self
.services
.state_accessor
.room_state_get_content::<SpaceRolesEventContent>(space_id, &roles_event_type, "")
.await
{
self.roles
.write()
.await
.insert(space_id.to_owned(), content.roles);
}
let member_event_type = member_event_type();
let shortstatehash = match self.services.state.get_room_shortstatehash(space_id).await {
| Ok(hash) => hash,
| Err(e) => {
debug_warn!(space_id = %space_id, error = ?e, "Failed to get shortstatehash, cache may be stale");
return;
},
};
{
let mut user_roles_map: HashMap<OwnedUserId, HashSet<String>> = HashMap::new();
self.services
.state_accessor
.state_keys_with_ids(shortstatehash, &member_event_type)
.boxed()
.broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move {
self.services
.timeline
.get_pdu(&event_id)
.map_ok(move |pdu| (state_key, pdu))
.ok()
.await
})
.ready_filter_map(|(state_key, pdu)| {
let content = pdu.get_content::<SpaceRoleMemberEventContent>().ok()?;
let user_id = UserId::parse(&*state_key).ok()?.to_owned();
Some((user_id, content.roles))
})
.for_each(|(user_id, roles)| {
user_roles_map.insert(user_id, roles.into_iter().collect());
async {}
})
.await;
self.user_roles
.write()
.await
.insert(space_id.to_owned(), user_roles_map);
let room_event_type = room_event_type();
let mut room_reqs_map: HashMap<OwnedRoomId, HashSet<String>> = HashMap::new();
self.services
.state_accessor
.state_keys_with_ids(shortstatehash, &room_event_type)
.boxed()
.broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move {
self.services
.timeline
.get_pdu(&event_id)
.map_ok(move |pdu| (state_key, pdu))
.ok()
.await
})
.ready_filter_map(|(state_key, pdu)| {
let content = pdu.get_content::<SpaceRoleRoomEventContent>().ok()?;
let room_id = RoomId::parse(&*state_key).ok()?.to_owned();
Some((room_id, content.required_roles))
})
.for_each(|(room_id, required_roles)| {
room_reqs_map.insert(room_id, required_roles.into_iter().collect());
async {}
})
.await;
self.room_requirements
.write()
.await
.insert(space_id.to_owned(), room_reqs_map);
let mut child_rooms: Vec<OwnedRoomId> = Vec::new();
self.services
.state_accessor
.state_keys_with_ids(shortstatehash, &StateEventType::SpaceChild)
.boxed()
.broad_filter_map(|(state_key, event_id): (_, OwnedEventId)| async move {
self.services
.timeline
.get_pdu(&event_id)
.map_ok(move |pdu| (state_key, pdu))
.ok()
.await
})
.ready_filter_map(|(state_key, pdu)| {
if let Ok(content) = pdu.get_content::<SpaceChildEventContent>() {
if content.via.is_empty() {
return None;
}
} else {
return None;
}
let child_room_id = RoomId::parse(&*state_key).ok()?.to_owned();
Some(child_room_id)
})
.for_each(|child_room_id| {
child_rooms.push(child_room_id);
async {}
})
.await;
{
let mut room_to_space = self.room_to_space.write().await;
room_to_space.retain(|_, parents| {
parents.remove(space_id);
!parents.is_empty()
});
for child_room_id in &child_rooms {
room_to_space
.entry(child_room_id.clone())
.or_default()
.insert(space_id.to_owned());
}
}
{
let mut space_to_rooms = self.space_to_rooms.write().await;
space_to_rooms.insert(space_id.to_owned(), child_rooms.into_iter().collect())
};
}
}
#[must_use]
pub fn compute_user_power_level<S: ::std::hash::BuildHasher>(
role_defs: &BTreeMap<String, RoleDefinition>,
assigned: &HashSet<String, S>,
) -> Option<i64> {
assigned
.iter()
.filter_map(|role_name| role_defs.get(role_name)?.power_level)
.max()
}
#[must_use]
pub fn roles_satisfy_requirements<S: ::std::hash::BuildHasher>(
required: &HashSet<String, S>,
assigned: &HashSet<String, S>,
) -> bool {
required.iter().all(|r| assigned.contains(r))
}
#[implement(Service)]
pub async fn get_user_power_level(&self, space_id: &RoomId, user_id: &UserId) -> Option<i64> {
let role_defs = { self.roles.read().await.get(space_id).cloned()? };
let user_assigned = {
self.user_roles
.read()
.await
.get(space_id)?
.get(user_id)
.cloned()?
};
compute_user_power_level(&role_defs, &user_assigned)
}
#[implement(Service)]
pub async fn user_qualifies_for_room(
&self,
space_id: &RoomId,
room_id: &RoomId,
user_id: &UserId,
) -> bool {
let required = {
let guard = self.room_requirements.read().await;
let Some(space_reqs) = guard.get(space_id) else {
return true;
};
let Some(required) = space_reqs.get(room_id) else {
return true;
};
if required.is_empty() {
return true;
}
required.clone()
};
let user_assigned = {
let guard = self.user_roles.read().await;
let Some(space_users) = guard.get(space_id) else {
return false;
};
let Some(assigned) = space_users.get(user_id) else {
return false;
};
assigned.clone()
};
roles_satisfy_requirements(&required, &user_assigned)
}
#[implement(Service)]
pub async fn get_parent_spaces(&self, room_id: &RoomId) -> Vec<OwnedRoomId> {
let all_parents: Vec<OwnedRoomId> = self
.room_to_space
.read()
.await
.get(room_id)
.map(|set| set.iter().cloned().collect())
.unwrap_or_default();
let mut enabled_parents = Vec::new();
for parent in all_parents {
if self.is_enabled_for_space(&parent).await {
enabled_parents.push(parent);
}
}
enabled_parents
}
#[implement(Service)]
pub async fn get_child_rooms(&self, space_id: &RoomId) -> Vec<OwnedRoomId> {
self.space_to_rooms
.read()
.await
.get(space_id)
.map(|set| set.iter().cloned().collect())
.unwrap_or_default()
}
#[implement(Service)]
pub async fn get_user_roles_in_space(
&self,
space_id: &RoomId,
user_id: &UserId,
) -> Option<HashSet<String>> {
self.user_roles
.read()
.await
.get(space_id)?
.get(user_id)
.cloned()
}
#[implement(Service)]
pub async fn get_room_requirements_in_space(
&self,
space_id: &RoomId,
room_id: &RoomId,
) -> Option<HashSet<String>> {
self.room_requirements
.read()
.await
.get(space_id)?
.get(room_id)
.cloned()
}
#[implement(Service)]
pub async fn sync_power_levels(&self, space_id: &RoomId, room_id: &RoomId) -> Result {
if !self.is_enabled_for_space(space_id).await {
return Ok(());
}
let server_user = self.services.globals.server_user.as_ref();
if !self
.services
.state_cache
.is_joined(server_user, room_id)
.await
{
debug_warn!(room_id = %room_id, "Server user is not joined, skipping PL sync");
return Ok(());
}
let mut power_levels_content: RoomPowerLevelsEventContent = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await
.unwrap_or_default();
let members: Vec<OwnedUserId> = self
.services
.state_cache
.room_members(room_id)
.map(ToOwned::to_owned)
.collect()
.await;
let mut changed = false;
for user_id in &members {
if user_id == server_user {
continue;
}
if let Some(space_pl) = self.get_user_power_level(space_id, user_id).await {
let space_pl_int = Int::new_saturating(space_pl);
let current_pl = power_levels_content
.users
.get(user_id)
.copied()
.unwrap_or(power_levels_content.users_default);
if current_pl != space_pl_int {
power_levels_content
.users
.insert(user_id.clone(), space_pl_int);
changed = true;
}
}
}
if changed {
let state_lock = self.services.state.mutex.lock(room_id).await;
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(String::new(), &power_levels_content),
server_user,
Some(room_id),
&state_lock,
)
.await?;
}
Ok(())
}
#[implement(Service)]
pub async fn auto_join_qualifying_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result {
if !self.is_enabled_for_space(space_id).await {
return Ok(());
}
let server_user = self.services.globals.server_user.as_ref();
if user_id == server_user {
return Ok(());
}
let child_rooms = self.get_child_rooms(space_id).await;
for child_room_id in &child_rooms {
if self
.services
.state_cache
.is_joined(user_id, child_room_id)
.await
{
continue;
}
if !self
.user_qualifies_for_room(space_id, child_room_id, user_id)
.await
{
continue;
}
if !self
.services
.state_cache
.is_joined(server_user, child_room_id)
.await
{
debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join");
continue;
}
if let Err(e) = self
.invite_and_join_user(child_room_id, user_id, server_user)
.await
{
debug_warn!(user_id = %user_id, room_id = %child_room_id, error = ?e, "Failed to auto-join user");
}
}
Ok(())
}
#[implement(Service)]
async fn invite_and_join_user(
&self,
room_id: &RoomId,
user_id: &UserId,
server_user: &UserId,
) -> Result {
let state_lock = self.services.state.mutex.lock(room_id).await;
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(
user_id.to_string(),
&RoomMemberEventContent::new(MembershipState::Invite),
),
server_user,
Some(room_id),
&state_lock,
)
.await?;
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(
user_id.to_string(),
&RoomMemberEventContent::new(MembershipState::Join),
),
user_id,
Some(room_id),
&state_lock,
)
.await?;
Ok(())
}
/// Called from append_pdu after a PDU is persisted. Dispatches to the
/// appropriate handler based on event type.
#[implement(Service)]
pub fn on_pdu_appended(self: &Arc<Self>, room_id: &RoomId, pdu: &PduEvent) {
if let Some(state_key) = pdu.state_key() {
let event_type_str = pdu.event_type().to_string();
match event_type_str.as_str() {
| SPACE_ROLES_EVENT_TYPE
| SPACE_ROLE_MEMBER_EVENT_TYPE
| SPACE_ROLE_ROOM_EVENT_TYPE
| SPACE_CASCADING_EVENT_TYPE => {
self.handle_state_event_change(
room_id.to_owned(),
event_type_str,
state_key.to_owned(),
);
},
| _ => {
if *pdu.kind() == ruma::events::TimelineEventType::SpaceChild {
if let Ok(child_room_id) = RoomId::parse(&*state_key) {
self.handle_space_child_change(
room_id.to_owned(),
child_room_id.to_owned(),
);
}
}
if *pdu.kind() == ruma::events::TimelineEventType::RoomMember {
if let Ok(content) = pdu.get_content::<RoomMemberEventContent>() {
if let Ok(user_id) = UserId::parse(&*state_key) {
match content.membership {
| MembershipState::Join => {
self.handle_space_member_join(
room_id.to_owned(),
user_id.to_owned(),
);
},
| MembershipState::Leave | MembershipState::Ban => {
self.handle_space_member_leave(
room_id.to_owned(),
user_id.to_owned(),
);
},
| _ => {},
}
}
}
}
},
}
}
}
/// Called from build_and_append_pdu to validate PL changes don't conflict
/// with space-granted power levels.
#[implement(Service)]
pub async fn validate_pl_change(
&self,
room_id: &RoomId,
sender: &UserId,
proposed: &RoomPowerLevelsEventContent,
) -> Result {
if sender == self.services.globals.server_user.as_str() {
return Ok(());
}
let parent_spaces = self.get_parent_spaces(room_id).await;
if parent_spaces.is_empty() {
return Ok(());
}
type SpaceEnforcementData =
(Vec<(OwnedUserId, HashSet<String>)>, BTreeMap<String, RoleDefinition>);
let space_data: Vec<SpaceEnforcementData> = {
let roles_guard = self.roles.read().await;
let user_roles_guard = self.user_roles.read().await;
parent_spaces
.iter()
.filter_map(|ps| {
let space_users = user_roles_guard.get(ps)?;
let role_defs = roles_guard.get(ps)?;
Some((
space_users
.iter()
.map(|(u, r)| (u.clone(), r.clone()))
.collect(),
role_defs.clone(),
))
})
.collect()
};
for (space_users, role_defs) in &space_data {
for (user_id, assigned_roles) in space_users {
if !self.services.state_cache.is_joined(user_id, room_id).await {
continue;
}
let space_pl = assigned_roles
.iter()
.filter_map(|r| role_defs.get(r)?.power_level)
.max();
if let Some(space_pl) = space_pl {
match proposed.users.get(user_id) {
| None if i64::from(proposed.users_default) != space_pl => {
debug_warn!(
user_id = %user_id,
room_id = %room_id,
space_pl,
"Rejecting PL change: space-managed user omitted"
);
return Err!(Request(Forbidden(
"Cannot omit a user whose power level is managed by Space roles"
)));
},
| Some(pl) if i64::from(*pl) != space_pl => {
debug_warn!(
user_id = %user_id,
room_id = %room_id,
proposed_pl = i64::from(*pl),
space_pl,
"Rejecting PL change conflicting with space role"
);
return Err!(Request(Forbidden(
"Cannot change power level that is set by Space roles"
)));
},
| _ => {},
}
}
}
}
Ok(())
}
/// Called from join_room_by_id_helper to check if a user has the required
/// Space roles to join a room.
#[implement(Service)]
pub async fn check_join_allowed(&self, room_id: &RoomId, user_id: &UserId) -> Result {
let parent_spaces = self.get_parent_spaces(room_id).await;
if parent_spaces.is_empty() {
return Ok(());
}
for parent_space in &parent_spaces {
if self
.user_qualifies_for_room(parent_space, room_id, user_id)
.await
{
return Ok(());
}
}
Err!(Request(Forbidden("You do not have the required Space roles to join this room")))
}
#[implement(Service)]
async fn sync_power_levels_for_children(&self, space_id: &RoomId) {
let child_rooms = self.get_child_rooms(space_id).await;
for child_room_id in &child_rooms {
if let Err(e) = self.sync_power_levels(space_id, child_room_id).await {
debug_warn!(room_id = %child_room_id, error = ?e, "Failed to sync power levels");
}
}
}
impl Service {
pub fn handle_state_event_change(
self: &Arc<Self>,
space_id: OwnedRoomId,
event_type: String,
state_key: String,
) {
let this = Arc::clone(self);
self.server.runtime().spawn(async move {
if !this.server.running() {
return;
}
if event_type != SPACE_CASCADING_EVENT_TYPE
&& !this.is_enabled_for_space(&space_id).await
{
return;
}
{
let mut pending = this.pending_enforcement.write().await;
if pending.contains(&space_id) {
return;
}
pending.insert(space_id.clone())
};
async {
let Ok(_permit) = this.enforcement_semaphore.acquire().await else {
return;
};
this.populate_space(&space_id).await;
match event_type.as_str() {
| SPACE_ROLES_EVENT_TYPE => {
this.sync_power_levels_for_children(&space_id).await;
let space_members: Vec<OwnedUserId> = this
.services
.state_cache
.room_members(&space_id)
.map(ToOwned::to_owned)
.collect()
.await;
for member in &space_members {
if let Err(e) =
Box::pin(this.kick_unqualified_from_rooms(&space_id, member))
.await
{
debug_warn!(user_id = %member, error = ?e, "Role definition revalidation kick failed");
}
}
},
| SPACE_ROLE_MEMBER_EVENT_TYPE => {
if let Ok(user_id) = UserId::parse(state_key.as_str()) {
if let Err(e) =
this.auto_join_qualifying_rooms(&space_id, user_id).await
{
debug_warn!(user_id = %user_id, error = ?e, "Space role auto-join failed");
}
if let Err(e) =
Box::pin(this.kick_unqualified_from_rooms(&space_id, user_id))
.await
{
debug_warn!(user_id = %user_id, error = ?e, "Space role auto-kick failed");
}
this.sync_power_levels_for_children(&space_id).await;
}
},
| SPACE_ROLE_ROOM_EVENT_TYPE => {
if let Ok(target_room) = RoomId::parse(state_key.as_str()) {
let members: Vec<OwnedUserId> = this
.services
.state_cache
.room_members(target_room)
.map(ToOwned::to_owned)
.collect()
.await;
for member in &members {
if let Err(e) =
Box::pin(this.kick_unqualified_from_rooms(&space_id, member))
.await
{
debug_warn!(user_id = %member, error = ?e, "Space role requirement kick failed");
}
}
}
},
| SPACE_CASCADING_EVENT_TYPE => {
if !this.is_enabled_for_space(&space_id).await {
this.flush_space_from_cache(&space_id).await;
}
},
| _ => {},
}
}
.await;
this.pending_enforcement.write().await.remove(&space_id);
});
}
pub fn handle_space_child_change(
self: &Arc<Self>,
space_id: OwnedRoomId,
child_room_id: OwnedRoomId,
) {
let this = Arc::clone(self);
self.server.runtime().spawn(async move {
if !this.server.running() {
return;
}
if !this.is_enabled_for_space(&space_id).await {
return;
}
let Ok(_permit) = this.enforcement_semaphore.acquire().await else {
return;
};
let child_event_type = StateEventType::SpaceChild;
let is_removal = match this
.services
.state_accessor
.room_state_get_content::<SpaceChildEventContent>(
&space_id,
&child_event_type,
child_room_id.as_str(),
)
.await
{
| Ok(content) => content.via.is_empty(),
| Err(_) => true, // If we can't read it, treat as removal
};
if is_removal {
let mut room_to_space = this.room_to_space.write().await;
if let Some(parents) = room_to_space.get_mut(&child_room_id) {
parents.remove(&space_id);
if parents.is_empty() {
room_to_space.remove(&child_room_id);
}
}
let mut space_to_rooms = this.space_to_rooms.write().await;
if let Some(children) = space_to_rooms.get_mut(&space_id) {
children.remove(&child_room_id);
}
return;
}
this.room_to_space
.write()
.await
.entry(child_room_id.clone())
.or_default()
.insert(space_id.clone());
this.space_to_rooms
.write()
.await
.entry(space_id.clone())
.or_default()
.insert(child_room_id.clone());
let server_user = this.services.globals.server_user.as_ref();
if !this
.services
.state_cache
.is_joined(server_user, &child_room_id)
.await
{
debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping auto-join enforcement for new child");
return;
}
let space_members: Vec<OwnedUserId> = this
.services
.state_cache
.room_members(&space_id)
.map(ToOwned::to_owned)
.collect()
.await;
for member in &space_members {
if this
.user_qualifies_for_room(&space_id, &child_room_id, member)
.await && !this
.services
.state_cache
.is_joined(member, &child_room_id)
.await
{
if let Err(e) = this
.invite_and_join_user(&child_room_id, member, server_user)
.await
{
debug_warn!(user_id = %member, room_id = %child_room_id, error = ?e, "Failed to auto-join user");
}
}
}
});
}
pub fn handle_space_member_join(
self: &Arc<Self>,
space_id: OwnedRoomId,
user_id: OwnedUserId,
) {
if user_id == self.services.globals.server_user {
return;
}
let this = Arc::clone(self);
self.server.runtime().spawn(async move {
if !this.server.running() {
return;
}
if !this.is_enabled_for_space(&space_id).await {
return;
}
let Ok(_permit) = this.enforcement_semaphore.acquire().await else {
return;
};
if let Err(e) = this.auto_join_qualifying_rooms(&space_id, &user_id).await {
debug_warn!(user_id = %user_id, error = ?e, "Auto-join on Space join failed");
}
this.sync_power_levels_for_children(&space_id).await;
});
}
pub fn handle_space_member_leave(
self: &Arc<Self>,
space_id: OwnedRoomId,
user_id: OwnedUserId,
) {
if user_id == self.services.globals.server_user {
return;
}
let this = Arc::clone(self);
self.server.runtime().spawn(async move {
if !this.server.running() {
return;
}
if !this.is_enabled_for_space(&space_id).await {
return;
}
let Ok(_permit) = this.enforcement_semaphore.acquire().await else {
return;
};
if let Err(e) = Box::pin(this.kick_unqualified_from_rooms(&space_id, &user_id)).await
{
debug_warn!(user_id = %user_id, error = ?e, "Kick on Space leave failed");
}
});
}
}
#[implement(Service)]
pub async fn kick_unqualified_from_rooms(&self, space_id: &RoomId, user_id: &UserId) -> Result {
if !self.is_enabled_for_space(space_id).await {
return Ok(());
}
let server_user = self.services.globals.server_user.as_ref();
if user_id == server_user {
return Ok(());
}
let child_rooms: Vec<OwnedRoomId> = self
.room_requirements
.read()
.await
.get(space_id)
.map(|reqs| reqs.keys().cloned().collect())
.unwrap_or_default();
for child_room_id in &child_rooms {
if !self
.services
.state_cache
.is_joined(server_user, child_room_id)
.await
{
debug_warn!(room_id = %child_room_id, "Server user is not joined, skipping kick enforcement");
continue;
}
if !self
.services
.state_cache
.is_joined(user_id, child_room_id)
.await
{
continue;
}
let all_parents = self.get_parent_spaces(child_room_id).await;
let mut qualifies_in_any = false;
for parent in &all_parents {
if self
.user_qualifies_for_room(parent, child_room_id, user_id)
.await
{
qualifies_in_any = true;
break;
}
}
if qualifies_in_any {
continue;
}
let Ok(member_content) = self
.services
.state_accessor
.get_member(child_room_id, user_id)
.await
else {
debug_warn!(user_id = %user_id, room_id = %child_room_id, "Could not get member event, skipping kick");
continue;
};
let state_lock = self.services.state.mutex.lock(child_room_id).await;
if let Err(e) = self
.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason: Some("No longer has required Space roles".into()),
is_direct: None,
join_authorized_via_users_server: None,
third_party_invite: None,
..member_content
}),
server_user,
Some(child_room_id),
&state_lock,
)
.await
{
warn!(user_id = %user_id, room_id = %child_room_id, error = ?e, "Failed to kick user for missing roles");
}
}
Ok(())
}