fix: Policy server calls use the correct JSON object (#1126)

Fixes #1060

Reviewed-on: https://forgejo.ellis.link/continuwuation/continuwuity/pulls/1126
Reviewed-by: Jacob Taylor <aranjedeath@noreply.forgejo.ellis.link>
Co-authored-by: timedout <git@nexy7574.co.uk>
Co-committed-by: timedout <git@nexy7574.co.uk>
This commit is contained in:
timedout 2025-10-16 21:06:54 +00:00 committed by nex
parent 09f24745c3
commit 26b700bf51
3 changed files with 52 additions and 17 deletions

View file

@ -5,9 +5,9 @@
use std::time::Duration;
use conduwuit::{Err, Event, PduEvent, Result, debug, implement, warn};
use conduwuit::{Err, Event, PduEvent, Result, debug, debug_info, implement, trace, warn};
use ruma::{
RoomId, ServerName,
CanonicalJsonObject, RoomId, ServerName,
api::federation::room::policy::v1::Request as PolicyRequest,
events::{StateEventType, room::policy::RoomPolicyEventContent},
};
@ -25,7 +25,12 @@ use ruma::{
/// fail-open operation.
#[implement(super::Service)]
#[tracing::instrument(skip_all, level = "debug")]
pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Result<bool> {
pub async fn ask_policy_server(
&self,
pdu: &PduEvent,
pdu_json: &CanonicalJsonObject,
room_id: &RoomId,
) -> Result<bool> {
if *pdu.event_type() == StateEventType::RoomPolicy.into() {
debug!(
room_id = %room_id,
@ -47,12 +52,12 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul
let via = match policyserver.via {
| Some(ref via) => ServerName::parse(via)?,
| None => {
debug!("No policy server configured for room {room_id}");
trace!("No policy server configured for room {room_id}");
return Ok(true);
},
};
if via.is_empty() {
debug!("Policy server is empty for room {room_id}, skipping spam check");
trace!("Policy server is empty for room {room_id}, skipping spam check");
return Ok(true);
}
if !self.services.state_cache.server_in_room(via, room_id).await {
@ -66,12 +71,12 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul
let outgoing = self
.services
.sending
.convert_to_outgoing_federation_event(pdu.to_canonical_object())
.convert_to_outgoing_federation_event(pdu_json.clone())
.await;
debug!(
debug_info!(
room_id = %room_id,
via = %via,
outgoing = ?outgoing,
outgoing = ?pdu_json,
"Checking event for spam with policy server"
);
let response = tokio::time::timeout(
@ -85,7 +90,10 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul
)
.await;
let response = match response {
| Ok(Ok(response)) => response,
| Ok(Ok(response)) => {
debug!("Response from policy server: {:?}", response);
response
},
| Ok(Err(e)) => {
warn!(
via = %via,
@ -97,16 +105,18 @@ pub async fn ask_policy_server(&self, pdu: &PduEvent, room_id: &RoomId) -> Resul
// default.
return Err(e);
},
| Err(_) => {
| Err(elapsed) => {
warn!(
via = %via,
%via,
event_id = %pdu.event_id(),
room_id = %room_id,
%room_id,
%elapsed,
"Policy server request timed out after 10 seconds"
);
return Err!("Request to policy server timed out");
},
};
trace!("Recommendation from policy server was {}", response.recommendation);
if response.recommendation == "spam" {
warn!(
via = %via,

View file

@ -255,7 +255,10 @@ where
// 14-pre. If the event is not a state event, ask the policy server about it
if incoming_pdu.state_key.is_none() {
debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event");
match self.ask_policy_server(&incoming_pdu, room_id).await {
match self
.ask_policy_server(&incoming_pdu, &incoming_pdu.to_canonical_object(), room_id)
.await
{
| Ok(false) => {
warn!(
event_id = %incoming_pdu.event_id,

View file

@ -9,6 +9,7 @@ use conduwuit_core::{
state_res::{self, RoomVersion},
},
utils::{self, IterStream, ReadyExt, stream::TryIgnore},
warn,
};
use futures::{StreamExt, TryStreamExt, future, future::ready};
use ruma::{
@ -19,7 +20,6 @@ use ruma::{
uint,
};
use serde_json::value::{RawValue, to_raw_value};
use tracing::warn;
use super::RoomMutexGuard;
@ -267,11 +267,33 @@ pub async fn create_hash_and_sign_event(
| _ => Err!(Request(Unknown(warn!("Signing event failed: {e}")))),
};
}
// Check with the policy server
// Generate event id
pdu.event_id = gen_event_id(&pdu_json, &room_version_id)?;
pdu_json.insert("event_id".into(), CanonicalJsonValue::String(pdu.event_id.clone().into()));
if room_id.is_some() {
trace!(
"Checking event in room {} with policy server",
pdu.room_id.as_ref().map_or("None", |id| id.as_str())
);
match self
.services
.event_handler
.ask_policy_server(&pdu, &pdu_json, pdu.room_id().expect("has room ID"))
.await
{
| Ok(true) => {},
| Ok(false) => {
return Err!(Request(Forbidden(debug_warn!(
"Policy server marked this event as spam"
))));
},
| Err(e) => {
// fail open
warn!("Failed to check event with policy server: {e}");
},
}
}
// Check with the policy server
if room_id.is_some() {
@ -283,7 +305,7 @@ pub async fn create_hash_and_sign_event(
match self
.services
.event_handler
.ask_policy_server(&pdu, &pdu.room_id_or_hash())
.ask_policy_server(&pdu, &pdu_json, &pdu.room_id_or_hash())
.await
{
| Ok(true) => {},