diff --git a/src/service/rooms/event_handler/policy_server.rs b/src/service/rooms/event_handler/policy_server.rs index 80cb40f2..7539c330 100644 --- a/src/service/rooms/event_handler/policy_server.rs +++ b/src/service/rooms/event_handler/policy_server.rs @@ -5,9 +5,9 @@ use std::time::Duration; -use conduwuit::{Err, Event, PduEvent, Result, debug, implement, warn}; +use conduwuit::{Err, Event, PduEvent, Result, debug, debug_info, implement, trace, warn}; use ruma::{ - RoomId, ServerName, + CanonicalJsonObject, RoomId, ServerName, api::federation::room::policy::v1::Request as PolicyRequest, events::{StateEventType, room::policy::RoomPolicyEventContent}, }; @@ -25,7 +25,12 @@ use ruma::{ /// fail-open operation. #[implement(super::Service)] #[tracing::instrument(skip_all, level = "debug")] -pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Result { +pub async fn ask_policy_server( + &self, + pdu: &PduEvent, + pdu_json: &CanonicalJsonObject, + room_id: &RoomId, +) -> Result { if *pdu.event_type() == StateEventType::RoomPolicy.into() { debug!( room_id = %room_id, @@ -47,12 +52,12 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul let via = match policyserver.via { | Some(ref via) => ServerName::parse(via)?, | None => { - debug!("No policy server configured for room {room_id}"); + trace!("No policy server configured for room {room_id}"); return Ok(true); }, }; if via.is_empty() { - debug!("Policy server is empty for room {room_id}, skipping spam check"); + trace!("Policy server is empty for room {room_id}, skipping spam check"); return Ok(true); } if !self.services.state_cache.server_in_room(via, room_id).await { @@ -66,12 +71,12 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul let outgoing = self .services .sending - .convert_to_outgoing_federation_event(pdu.to_canonical_object()) + .convert_to_outgoing_federation_event(pdu_json.clone()) .await; - debug!( + debug_info!( room_id = %room_id, via = %via, - outgoing = ?outgoing, + outgoing = ?pdu_json, "Checking event for spam with policy server" ); let response = tokio::time::timeout( @@ -85,7 +90,10 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul ) .await; let response = match response { - | Ok(Ok(response)) => response, + | Ok(Ok(response)) => { + debug!("Response from policy server: {:?}", response); + response + }, | Ok(Err(e)) => { warn!( via = %via, @@ -97,16 +105,18 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul // default. return Err(e); }, - | Err(_) => { + | Err(elapsed) => { warn!( - via = %via, + %via, event_id = %pdu.event_id(), - room_id = %room_id, + %room_id, + %elapsed, "Policy server request timed out after 10 seconds" ); return Err!("Request to policy server timed out"); }, }; + trace!("Recommendation from policy server was {}", response.recommendation); if response.recommendation == "spam" { warn!( via = %via, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 10d4dfd5..42f8354b 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -255,7 +255,10 @@ where // 14-pre. If the event is not a state event, ask the policy server about it if incoming_pdu.state_key.is_none() { debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event"); - match self.ask_policy_server(&incoming_pdu, room_id).await { + match self + .ask_policy_server(&incoming_pdu, &incoming_pdu.to_canonical_object(), room_id) + .await + { | Ok(false) => { warn!( event_id = %incoming_pdu.event_id, diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 386e1f02..204e16ad 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -9,6 +9,7 @@ use conduwuit_core::{ state_res::{self, RoomVersion}, }, utils::{self, IterStream, ReadyExt, stream::TryIgnore}, + warn, }; use futures::{StreamExt, TryStreamExt, future, future::ready}; use ruma::{ @@ -19,7 +20,6 @@ use ruma::{ uint, }; use serde_json::value::{RawValue, to_raw_value}; -use tracing::warn; use super::RoomMutexGuard; @@ -267,11 +267,33 @@ pub async fn create_hash_and_sign_event( | _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))), }; } - + // Check with the policy server // Generate event id pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?; - pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into())); + if room_id.is_some() { + trace!( + "Checking event in room {} with policy server", + pdu.room_id.as_ref().map_or("None", |id| id.as_str()) + ); + match self + .services + .event_handler + .ask_policy_server(&pdu, &pdu_json, pdu.room_id().expect("has room ID")) + .await + { + | Ok(true) => {}, + | Ok(false) => { + return Err!(Request(Forbidden(debug_warn!( + "Policy server marked this event as spam" + )))); + }, + | Err(e) => { + // fail open + warn!("Failed to check event with policy server: {e}"); + }, + } + } // Check with the policy server if room_id.is_some() { @@ -283,7 +305,7 @@ pub async fn create_hash_and_sign_event( match self .services .event_handler - .ask_policy_server(&pdu, &pdu.room_id_or_hash()) + .ask_policy_server(&pdu, &pdu_json, &pdu.room_id_or_hash()) .await { | Ok(true) => {},