From 3825bff73316b29ee0701897ff331392e9c29774 Mon Sep 17 00:00:00 2001 From: timedout Date: Sun, 25 Jan 2026 03:56:40 +0000 Subject: [PATCH] feat: Support federating out media redactions --- src/api/client/media.rs | 49 +++++++++++++++++++++++++++++++--- src/database/maps.rs | 8 ++++++ src/service/media/data.rs | 55 +++++++++++++++++++++++++++++++++++++-- src/service/media/mod.rs | 26 +++++++++++++++++- 4 files changed, 132 insertions(+), 6 deletions(-) diff --git a/src/api/client/media.rs b/src/api/client/media.rs index 9f70f924..468291bb 100644 --- a/src/api/client/media.rs +++ b/src/api/client/media.rs @@ -3,8 +3,11 @@ use std::time::Duration; use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Err, Result, err, + Err, Result, + debug::DebugInspect, + debug_info, err, info, utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize}, + warn, }; use conduwuit_service::{ Services, @@ -12,7 +15,7 @@ use conduwuit_service::{ }; use reqwest::Url; use ruma::{ - Mxc, UserId, + Mxc, OwnedServerName, UserId, api::client::{ authenticated_media, authenticated_media::{ @@ -245,6 +248,37 @@ pub(crate) async fn get_media_preview_route( }) } +async fn dispatch_redaction( + server_name: OwnedServerName, + media_id: String, + servers: Vec, + services: crate::State, +) { + for server in servers { + if services.globals.server_is_ours(&server) { + continue; + } + + debug_info!("Asking {server} to redact media mxc://{server_name}/{media_id}"); + let _ = services + .federation + .execute(&server, authenticated_media::redact::unstable::Request { + server_name: server_name.clone(), + media_id: media_id.clone(), + }) + .await + .debug_inspect(|_| { + debug_info!("Asked {server} to redact media mxc://{server_name}/{media_id}"); + }) + .inspect_err(|e| { + warn!( + "Failed to ask {server} to redact media mxc://{server_name}/{media_id}: {e}" + ); + }) + .ok(); + } +} + #[tracing::instrument( name = "media_redact", level = "debug", @@ -267,7 +301,16 @@ pub(crate) async fn redact_media_route( return Err!(Request(Forbidden("You do not have permission to redact this attachment."))); } - services.media.delete(&mxc).await?; // TODO: Only delete file and mark as redacted + services.media.redact(&mxc).await?; + + // TODO: This should be a persistent background task + let servers = services.media.get_interested_servers(&mxc).await; + tokio::spawn(dispatch_redaction( + mxc.server_name.to_owned(), + mxc.media_id.to_owned(), + servers, + services, + )); Ok(authenticated_media::redact::unstable::Response {}) } diff --git a/src/database/maps.rs b/src/database/maps.rs index fc6ed8ab..cf98a303 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -108,6 +108,14 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "mediaid_user", ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "mediaid_redacted", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "mediaid_interestedservername", + ..descriptor::RANDOM_SMALL + }, Descriptor { name: "onetimekeyid_onetimekeys", ..descriptor::RANDOM_SMALL diff --git a/src/service/media/data.rs b/src/service/media/data.rs index a5d667ee..fb2b6c46 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use conduwuit::{ Err, Result, debug, debug_info, err, @@ -6,13 +6,15 @@ use conduwuit::{ }; use database::{Database, Interfix, Map}; use futures::StreamExt; -use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition}; +use ruma::{Mxc, OwnedMxcUri, OwnedServerName, UserId, http_headers::ContentDisposition}; use super::{preview::UrlPreviewData, thumbnail::Dim}; pub(crate) struct Data { mediaid_file: Arc, mediaid_user: Arc, + mediaid_redacted: Arc, + mediaid_interestedservername: Arc, url_previews: Arc, } @@ -28,6 +30,8 @@ impl Data { Self { mediaid_file: db["mediaid_file"].clone(), mediaid_user: db["mediaid_user"].clone(), + mediaid_redacted: db["mediaid_redacted"].clone(), + mediaid_interestedservername: db["mediaid_interestedservername"].clone(), url_previews: db["url_previews"].clone(), } } @@ -77,6 +81,22 @@ impl Data { self.mediaid_user.remove(key); }) .await; + + self.mediaid_interestedservername + .stream_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|(key, _)| { + debug_assert!( + key.starts_with(mxc.to_string().as_bytes()), + "key should start with the mxc" + ); + + debug_info!("Deleting interested server name key {key:?}"); + + self.mediaid_interestedservername.remove(key); + }) + .await; + // NOTE: Redaction status is kept even after deletion } /// Searches for all files with the given MXC @@ -275,4 +295,35 @@ impl Data { image_height, }) } + + /// Marks a media item as redacted, preventing it from being served or + /// re-used. + pub(super) fn mark_redacted(&self, media_id: &str) { + self.mediaid_redacted.insert(media_id, []); + } + + /// Checks if a media item is redacted. + pub(super) async fn is_redacted(&self, media_id: &str) -> bool { + self.mediaid_redacted.contains(media_id).await + } + + pub(super) fn add_interested_server_name(&self, media_id: &str, server_name: &str) { + let key = (media_id, server_name); + self.mediaid_interestedservername + .insert(&database::serialize_key(&key).expect("key must be serializable"), []); + } + + pub(super) async fn interested_server_names(&self, media_id: &str) -> Vec { + let prefix = (media_id, Interfix); + self.mediaid_interestedservername + .stream_prefix_raw(&prefix) + .ignore_err() + .map(|(key, _)| { + let parts: Vec<&[u8]> = key.rsplit(|&b| b == 0xFF).collect(); + OwnedServerName::parse(string_from_bytes(parts[0]).unwrap_or_default()) + .unwrap_or_else(|_| OwnedServerName::from_str("invalid.server").unwrap()) + }) + .collect() + .await + } } diff --git a/src/service/media/mod.rs b/src/service/media/mod.rs index 958e6b68..b1af9a8e 100644 --- a/src/service/media/mod.rs +++ b/src/service/media/mod.rs @@ -17,7 +17,9 @@ use conduwuit::{ }, warn, }; -use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition}; +use ruma::{ + Mxc, OwnedMxcUri, OwnedServerName, ServerName, UserId, http_headers::ContentDisposition, +}; use tokio::{ fs, io::{AsyncReadExt, AsyncWriteExt, BufReader}, @@ -139,6 +141,28 @@ impl Service { } } + /// Marks a media ID as redacted, and deletes the associated file. + pub async fn redact(&self, mxc: &Mxc<'_>) -> Result<()> { + self.db.mark_redacted(mxc.media_id); + self.delete(mxc).await + } + + /// Checks if a media ID is redacted. + pub async fn is_redacted(&self, mxc: &Mxc<'_>) -> bool { + self.db.is_redacted(mxc.media_id).await + } + + /// Marks a server as "interested" (i.e. has downloaded this media from us). + pub fn mark_server_interested(&self, mxc: &Mxc<'_>, server_name: &ServerName) { + self.db + .add_interested_server_name(mxc.media_id, server_name.as_str()); + } + + /// Gets all servers interested in this media ID. + pub async fn get_interested_servers(&self, mxc: &Mxc<'_>) -> Vec { + self.db.interested_server_names(mxc.media_id).await + } + /// Deletes all media by the specified user /// /// currently, this is only practical for local users