From 6f103939df5afb9fcb76ccf4986a23c5eebadac2 Mon Sep 17 00:00:00 2001 From: timedout Date: Wed, 4 Mar 2026 05:36:19 +0000 Subject: [PATCH] feat: Update policy server implementation to be closer to stable MSC4284 Untested --- Cargo.lock | 29 +- Cargo.toml | 2 +- src/core/error/mod.rs | 20 +- src/service/Cargo.toml | 1 + .../rooms/event_handler/policy_server.rs | 336 +++++++++++------- .../event_handler/upgrade_outlier_pdu.rs | 5 +- src/service/rooms/timeline/create.rs | 25 +- 7 files changed, 244 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d495c08..4bb2be71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1113,6 +1113,7 @@ dependencies = [ "conduwuit_core", "conduwuit_database", "const-str", + "ed25519-dalek", "either", "futures", "hickory-resolver", @@ -1221,7 +1222,7 @@ dependencies = [ [[package]] name = "continuwuity-admin-api" version = "0.1.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "ruma-common", "serde", @@ -1600,7 +1601,7 @@ dependencies = [ [[package]] name = "draupnir-antispam" version = "0.1.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "ruma-common", "serde", @@ -3002,7 +3003,7 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "meowlnir-antispam" version = "0.1.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "ruma-common", "serde", @@ -4096,7 +4097,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.10.1" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "assign", "continuwuity-admin-api", @@ -4119,7 +4120,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.10.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "js_int", "ruma-common", @@ -4131,7 +4132,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.18.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "as_variant", "assign", @@ -4154,7 +4155,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.13.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "as_variant", "base64 0.22.1", @@ -4186,7 +4187,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.28.1" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "as_variant", "indexmap", @@ -4211,7 +4212,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "bytes", "headers", @@ -4233,7 +4234,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.9.5" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "js_int", "thiserror 2.0.18", @@ -4242,7 +4243,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "js_int", "ruma-common", @@ -4252,7 +4253,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.13.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "cfg-if", "proc-macro-crate", @@ -4267,7 +4268,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.9.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "js_int", "ruma-common", @@ -4279,7 +4280,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.15.0" -source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=ad8c47c3e39a700ce6c72b03b185bda6568ff954#ad8c47c3e39a700ce6c72b03b185bda6568ff954" +source = "git+https://forgejo.ellis.link/continuwuation/ruwuma?rev=6c65b295d2c109ab31165c2db016097f3e74d02e#6c65b295d2c109ab31165c2db016097f3e74d02e" dependencies = [ "base64 0.22.1", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index bfdf8752..40ee261f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -344,7 +344,7 @@ version = "0.1.2" [workspace.dependencies.ruma] git = "https://forgejo.ellis.link/continuwuation/ruwuma" #branch = "conduwuit-changes" -rev = "ad8c47c3e39a700ce6c72b03b185bda6568ff954" +rev = "6c65b295d2c109ab31165c2db016097f3e74d02e" features = [ "compat", "rand", diff --git a/src/core/error/mod.rs b/src/core/error/mod.rs index 03967e62..00c069cd 100644 --- a/src/core/error/mod.rs +++ b/src/core/error/mod.rs @@ -4,7 +4,9 @@ mod panic; mod response; mod serde; -use std::{any::Any, borrow::Cow, convert::Infallible, sync::PoisonError}; +use std::{any::Any, borrow::Cow, convert::Infallible, sync::PoisonError, time::Duration}; + +use ruma::api::client::error::{ErrorKind, RetryAfter::Delay}; pub use self::{err::visit, log::*}; @@ -91,7 +93,7 @@ pub enum Error { #[error("Arithmetic operation failed: {0}")] Arithmetic(Cow<'static, str>), #[error("{0}: {1}")] - BadRequest(ruma::api::client::error::ErrorKind, &'static str), //TODO: remove + BadRequest(ErrorKind, &'static str), //TODO: remove #[error("{0}")] BadServerResponse(Cow<'static, str>), #[error(transparent)] @@ -121,7 +123,7 @@ pub enum Error { #[error("from {0}: {1}")] Redaction(ruma::OwnedServerName, ruma::canonical_json::RedactionError), #[error("{0}: {1}")] - Request(ruma::api::client::error::ErrorKind, Cow<'static, str>, http::StatusCode), + Request(ErrorKind, Cow<'static, str>, http::StatusCode), #[error(transparent)] Ruma(#[from] ruma::api::client::error::Error), #[error(transparent)] @@ -166,7 +168,7 @@ impl Error { /// Returns the Matrix error code / error kind #[inline] - pub fn kind(&self) -> ruma::api::client::error::ErrorKind { + pub fn kind(&self) -> ErrorKind { use ruma::api::client::error::ErrorKind::{FeatureDisabled, Unknown}; match self { @@ -201,6 +203,16 @@ impl Error { /// Result where Ok(None) is instead Err(e) if e.is_not_found(). #[inline] pub fn is_not_found(&self) -> bool { self.status_code() == http::StatusCode::NOT_FOUND } + + pub fn retry_after(&self) -> Option { + match self { + | Self::BadRequest( + ErrorKind::LimitExceeded { retry_after: Some(Delay(retry_after)) }, + .., + ) => Some(*retry_after), + | _ => None, + } + } } impl std::fmt::Debug for Error { diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index 8591496b..513ffe89 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -123,6 +123,7 @@ blurhash.workspace = true blurhash.optional = true recaptcha-verify = { version = "0.1.5", default-features = false } yansi.workspace = true +ed25519-dalek = "2.2.0" [target.'cfg(all(unix, target_os = "linux"))'.dependencies] sd-notify.workspace = true diff --git a/src/service/rooms/event_handler/policy_server.rs b/src/service/rooms/event_handler/policy_server.rs index ca5dadea..87a84876 100644 --- a/src/service/rooms/event_handler/policy_server.rs +++ b/src/service/rooms/event_handler/policy_server.rs @@ -6,18 +6,63 @@ use std::{collections::BTreeMap, time::Duration}; use conduwuit::{ - Err, Event, PduEvent, Result, debug, debug_error, debug_info, debug_warn, implement, trace, - warn, + Err, Error, Event, PduEvent, Result, debug, debug_error, debug_info, error, implement, info, + trace, warn, }; +use ed25519_dalek::{Signature, Verifier, VerifyingKey}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, SigningKeyId, - api::federation::room::{ - policy_check::unstable::Request as PolicyCheckRequest, - policy_sign::unstable::Request as PolicySignRequest, - }, + CanonicalJsonObject, CanonicalJsonValue, KeyId, RoomId, ServerName, + api::federation::room::policy_sign::unstable::Request as PolicySignRequest, events::{StateEventType, room::policy::RoomPolicyEventContent}, + serde::{Base64, base64::UrlSafe}, + signatures::canonical_json, }; use serde_json::value::RawValue; +use tokio::time::sleep; + +pub(super) fn verify_policy_signature( + via: &ServerName, + ps_key: &Base64>, + pdu_json: &CanonicalJsonObject, +) -> bool { + let signature = pdu_json + .get("signatures") + .and_then(|sigs| sigs.as_object()) + .and_then(|sigs_map| sigs_map.get(via.as_str())) + .and_then(|sigs_for_server| sigs_for_server.as_object()) + .and_then(|sigs_for_server_map| sigs_for_server_map.get("ed25519:policy_server")) + .and_then(|sig| sig.as_str()) + .and_then(|sig_str| Base64::>::parse(sig_str).ok()) + .and_then(|sig_b64| { + Signature::from_slice(sig_b64.as_bytes()) + .map(Some) + .unwrap_or(None) + }); + let vk = match VerifyingKey::try_from(ps_key.as_bytes()) { + | Ok(vk) => vk, + | Err(e) => { + debug!( + error=%e, + "Failed to parse policy server public key; cannot verify signature" + ); + return false; + }, + }; + let cj = match canonical_json(pdu_json.clone()) { + | Ok(cj) => cj, + | Err(e) => { + debug!( + error=%e, + "Failed to convert event JSON to canonical form; cannot verify policy server signature" + ); + return false; + }, + }; + match signature { + | Some(ref sig) => vk.verify(cj.as_bytes(), sig).is_ok(), + | None => false, + } +} /// Asks a remote policy server if the event is allowed. /// @@ -31,29 +76,24 @@ use serde_json::value::RawValue; /// contacted for whatever reason, Err(e) is returned, which generally is a /// fail-open operation. #[implement(super::Service)] -#[tracing::instrument(skip(self, pdu, pdu_json, room_id), level = "info")] -pub async fn ask_policy_server( +#[tracing::instrument(skip(self, pdu, pdu_json), level = "info")] +pub async fn policy_server_allows_event( &self, pdu: &PduEvent, pdu_json: &mut CanonicalJsonObject, room_id: &RoomId, incoming: bool, -) -> Result { - if !self.services.server.config.enable_msc4284_policy_servers { - trace!("policy server checking is disabled"); - return Ok(true); // don't ever contact policy servers - } - +) -> Result<()> { if *pdu.event_type() == StateEventType::RoomPolicy.into() { debug!( room_id = %room_id, event_type = ?pdu.event_type(), "Skipping spam check for policy server meta-event" ); - return Ok(true); + return Ok(()); } - let Ok(policyserver) = self + let Ok(ps) = self .services .state_accessor .room_state_get_content(room_id, &StateEventType::RoomPolicy, "") @@ -65,128 +105,144 @@ pub async fn ask_policy_server( }) .map(|c: RoomPolicyEventContent| c) else { - debug!("room has no policy server configured"); - return Ok(true); + debug!("room has no policy server configured, skipping spam check"); + return Ok(()); }; - if self.services.server.config.policy_server_check_own_events - && !incoming - && policyserver.public_key.is_none() - { - // don't contact policy servers for locally generated events, but only when the - // policy server does not require signatures - trace!("won't contact policy server for locally generated event"); - return Ok(true); - } - - let via = match policyserver.via { - | Some(ref via) => ServerName::parse(via)?, - | None => { - trace!("No policy server configured for room {room_id}"); - return Ok(true); + let ps_key = match ps.effective_key() { + | Ok(key) => key, + | Err(e) => { + debug!( + error=%e, + "room has a policy server configured, but no valid public keys; skipping spam check" + ); + return Ok(()); }, }; + + let Some(via) = ps + .via + .as_ref() + .and_then(|via| ServerName::parse(via).map(Some).unwrap_or(None)) + else { + trace!("No via configured for room policy server, skipping spam check"); + return Ok(()); + }; + if via.is_empty() { trace!("Policy server is empty for room {room_id}, skipping spam check"); - return Ok(true); + return Ok(()); } + if !self.services.state_cache.server_in_room(via, room_id).await { debug!( via = %via, "Policy server is not in the room, skipping spam check" ); - return Ok(true); + return Ok(()); } + + if incoming { + // Verify the signature instead of calling a check + if verify_policy_signature(via, &ps_key, pdu_json) { + debug!( + via = %via, + "Event is incoming and has a valid policy server signature" + ); + return Ok(()); + } + debug_info!( + via = %via, + "Event is incoming but does not have a valid policy server signature; asking policy \ + server to sign it now" + ); + } + let outgoing = self .services .sending .convert_to_outgoing_federation_event(pdu_json.clone()) .await; - if policyserver.public_key.is_some() { - if !incoming { - debug_info!( - via = %via, - outgoing = ?pdu_json, - "Getting policy server signature on event" - ); - return self - .fetch_policy_server_signature(pdu, pdu_json, via, outgoing, room_id) - .await; - } - // for incoming events, is it signed by with the key - // "ed25519:policy_server"? - if let Some(CanonicalJsonValue::Object(sigs)) = pdu_json.get("signatures") { - if let Some(CanonicalJsonValue::Object(server_sigs)) = sigs.get(via.as_str()) { - let wanted_key_id: &KeyId = - SigningKeyId::parse("ed25519:policy_server")?; - if let Some(CanonicalJsonValue::String(_sig_value)) = - server_sigs.get(wanted_key_id.as_str()) - { - // TODO: verify signature - } - } - } - debug!( - "Event is not local and has no policy server signature, performing legacy spam check" - ); - } - debug_info!( + + info!( via = %via, - "Checking event for spam with policy server via legacy check" + "Asking policy server to sign event" ); - let response = tokio::time::timeout( - Duration::from_secs(self.services.server.config.policy_server_request_timeout), - self.services - .sending - .send_federation_request(via, PolicyCheckRequest { - event_id: pdu.event_id().to_owned(), - pdu: Some(outgoing), - }), - ) - .await; - let response = match response { - | Ok(Ok(response)) => { - debug!("Response from policy server: {:?}", response); - response - }, - | Ok(Err(e)) => { + self.fetch_policy_server_signature(pdu, pdu_json, via, outgoing, room_id, ps_key, 0) + .await +} +#[allow(clippy::too_many_arguments)] +#[implement(super::Service)] +async fn handle_policy_server_error( + &self, + error: Error, + pdu: &PduEvent, + pdu_json: &mut CanonicalJsonObject, + via: &ServerName, + outgoing: Box, + room_id: &RoomId, + policy_server_key: Base64>, + retries: u8, + timeout: Duration, +) -> Result<()> { + if let Some(retry_after) = error.retry_after() { + if retries >= 3 { warn!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, - "Failed to contact policy server: {e}" + retries, + "Policy server rate-limited us too many times; giving up" ); - // Network or policy server errors are treated as non-fatal: event is allowed by - // default. - return Err(e); - }, - | Err(elapsed) => { - warn!( - %via, - event_id = %pdu.event_id(), - %room_id, - %elapsed, - "Policy server request timed out after 10 seconds" - ); - return Err!("Request to policy server timed out"); - }, - }; - trace!("Recommendation from policy server was {}", response.recommendation); - if response.recommendation == "spam" { + return Err(error); // Error should be passed to c2s + } + let saturated = retry_after.min(timeout); + // ^ don't wait more than 60 seconds warn!( via = %via, event_id = %pdu.event_id(), room_id = %room_id, - "Event was marked as spam by policy server", + retry_after = %saturated.as_secs(), + retries, + "Policy server rate-limited us; retrying after {retry_after:?}" + ); + // TODO: select between this sleep and shutdown signal + sleep(saturated).await; + return Box::pin(self.fetch_policy_server_signature( + pdu, + pdu_json, + via, + outgoing, + room_id, + policy_server_key, + retries.saturating_add(1), + )) + .await; + } + if error.status_code().is_client_error() { + warn!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + error = ?error, + "Policy server marked the event as spam" + ); + } else { + info!( + via = %via, + event_id = %pdu.event_id(), + room_id = %room_id, + error = ?error, + "Failed to contact policy server" ); - return Ok(false); } - Ok(true) + Err(error) } /// Asks a remote policy server for a signature on this event. /// If the policy server signs this event, the original data is mutated. +#[allow(clippy::too_many_arguments)] #[implement(super::Service)] #[tracing::instrument(skip_all, fields(event_id=%pdu.event_id(), via=%via), level = "info")] pub async fn fetch_policy_server_signature( @@ -196,13 +252,16 @@ pub async fn fetch_policy_server_signature( via: &ServerName, outgoing: Box, room_id: &RoomId, -) -> Result { + policy_server_key: Base64>, + retries: u8, +) -> Result<()> { + let timeout = Duration::from_secs(self.services.server.config.policy_server_request_timeout); debug!("Requesting policy server signature"); let response = tokio::time::timeout( - Duration::from_secs(self.services.server.config.policy_server_request_timeout), + timeout, self.services .sending - .send_federation_request(via, PolicySignRequest { pdu: outgoing }), + .send_federation_request(via, PolicySignRequest { pdu: outgoing.clone() }), ) .await; @@ -212,15 +271,19 @@ pub async fn fetch_policy_server_signature( response }, | Ok(Err(e)) => { - warn!( - via = %via, - event_id = %pdu.event_id(), - room_id = %room_id, - "Failed to contact policy server: {e}" - ); - // Network or policy server errors are treated as non-fatal: event is allowed by - // default. - return Err(e); + return self + .handle_policy_server_error( + e, + pdu, + pdu_json, + via, + outgoing, + room_id, + policy_server_key, + retries, + timeout, + ) + .await; }, | Err(elapsed) => { warn!( @@ -228,34 +291,34 @@ pub async fn fetch_policy_server_signature( event_id = %pdu.event_id(), %room_id, %elapsed, - "Policy server request timed out after 10 seconds" + "Policy server signature request timed out" ); - return Err!("Request to policy server timed out"); + return Err!(Request(Forbidden("Policy server did not respond in time"))); }, }; - if response.signatures.is_none() { - debug!("Policy server refused to sign event"); - return Ok(false); - } - let sigs: ruma::Signatures = - response.signatures.unwrap(); - if !sigs.contains_key(via) { - debug_warn!( + + if !response.signatures.contains_key(via) { + error!( "Policy server returned signatures, but did not include the expected server name \ '{}': {:?}", - via, - sigs + via, response.signatures ); - return Ok(false); + return Err!(BadServerResponse( + "Policy server did not include expected server name in signatures" + )); } - let keypairs = sigs.get(via).unwrap(); + let keypairs = response.signatures.get(via).unwrap(); + // TODO: need to be able to verify other algorithms let wanted_key_id = KeyId::parse("ed25519:policy_server")?; if !keypairs.contains_key(wanted_key_id) { - debug_warn!( + error!( + signatures = ?response.signatures, "Policy server returned signature, but did not use the key ID \ 'ed25519:policy_server'." ); - return Ok(false); + return Err!(BadServerResponse( + "Policy server signed the event, but did not use the expected key ID" + )); } let signatures_entry = pdu_json .entry("signatures".to_owned()) @@ -273,12 +336,12 @@ pub async fn fetch_policy_server_signature( ); }, | Some(_) => { - debug_warn!( + // This should never happen + unreachable!( "Existing `signatures[{}]` field is not an object; cannot insert policy \ signature", via ); - return Ok(false); }, | None => { let mut inner = BTreeMap::new(); @@ -293,11 +356,12 @@ pub async fn fetch_policy_server_signature( signatures_map.insert(via.as_str().to_owned(), CanonicalJsonValue::Object(inner)); }, } + // TODO: verify signature value was made with the policy_server_key + // rather than the expected key. } else { - debug_warn!( + unreachable!( "Existing `signatures` field is not an object; cannot insert policy signature" ); - return Ok(false); } - Ok(true) + Ok(()) } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index a60c2b66..bad275b9 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -256,7 +256,7 @@ where if incoming_pdu.state_key.is_none() { debug!(event_id = %incoming_pdu.event_id, "Checking policy server for event"); match self - .ask_policy_server( + .policy_server_allows_event( &incoming_pdu, &mut incoming_pdu.to_canonical_object(), room_id, @@ -264,9 +264,10 @@ where ) .await { - | Ok(false) => { + | Err(e) => { warn!( event_id = %incoming_pdu.event_id, + error = %e, "Event has been marked as spam by policy server" ); soft_fail = true; diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 40e41b08..3eb822e7 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -9,7 +9,6 @@ use conduwuit_core::{ state_res::{self, RoomVersion}, }, utils::{self, IterStream, ReadyExt, stream::TryIgnore}, - warn, }; use futures::{StreamExt, TryStreamExt, future, future::ready}; use ruma::{ @@ -298,23 +297,15 @@ pub async fn create_hash_and_sign_event( "Checking event in room {} with policy server", pdu.room_id.as_ref().map_or("None", |id| id.as_str()) ); - match self - .services + self.services .event_handler - .ask_policy_server(&pdu, &mut pdu_json, pdu.room_id().expect("has room ID"), false) - .await - { - | Ok(true) => {}, - | Ok(false) => { - return Err!(Request(Forbidden(debug_warn!( - "Policy server marked this event as spam" - )))); - }, - | Err(e) => { - // fail open - warn!("Failed to check event with policy server: {e}"); - }, - } + .policy_server_allows_event( + &pdu, + &mut pdu_json, + pdu.room_id().expect("has room ID"), + false, + ) + .await?; } // Generate short event id