feat: Support federating out media redactions
This commit is contained in:
parent
d90d22c917
commit
3825bff733
4 changed files with 132 additions and 6 deletions
|
|
@ -3,8 +3,11 @@ use std::time::Duration;
|
|||
use axum::extract::State;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use conduwuit::{
|
||||
Err, Result, err,
|
||||
Err, Result,
|
||||
debug::DebugInspect,
|
||||
debug_info, err, info,
|
||||
utils::{self, content_disposition::make_content_disposition, math::ruma_from_usize},
|
||||
warn,
|
||||
};
|
||||
use conduwuit_service::{
|
||||
Services,
|
||||
|
|
@ -12,7 +15,7 @@ use conduwuit_service::{
|
|||
};
|
||||
use reqwest::Url;
|
||||
use ruma::{
|
||||
Mxc, UserId,
|
||||
Mxc, OwnedServerName, UserId,
|
||||
api::client::{
|
||||
authenticated_media,
|
||||
authenticated_media::{
|
||||
|
|
@ -245,6 +248,37 @@ pub(crate) async fn get_media_preview_route(
|
|||
})
|
||||
}
|
||||
|
||||
async fn dispatch_redaction(
|
||||
server_name: OwnedServerName,
|
||||
media_id: String,
|
||||
servers: Vec<OwnedServerName>,
|
||||
services: crate::State,
|
||||
) {
|
||||
for server in servers {
|
||||
if services.globals.server_is_ours(&server) {
|
||||
continue;
|
||||
}
|
||||
|
||||
debug_info!("Asking {server} to redact media mxc://{server_name}/{media_id}");
|
||||
let _ = services
|
||||
.federation
|
||||
.execute(&server, authenticated_media::redact::unstable::Request {
|
||||
server_name: server_name.clone(),
|
||||
media_id: media_id.clone(),
|
||||
})
|
||||
.await
|
||||
.debug_inspect(|_| {
|
||||
debug_info!("Asked {server} to redact media mxc://{server_name}/{media_id}");
|
||||
})
|
||||
.inspect_err(|e| {
|
||||
warn!(
|
||||
"Failed to ask {server} to redact media mxc://{server_name}/{media_id}: {e}"
|
||||
);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "media_redact",
|
||||
level = "debug",
|
||||
|
|
@ -267,7 +301,16 @@ pub(crate) async fn redact_media_route(
|
|||
return Err!(Request(Forbidden("You do not have permission to redact this attachment.")));
|
||||
}
|
||||
|
||||
services.media.delete(&mxc).await?; // TODO: Only delete file and mark as redacted
|
||||
services.media.redact(&mxc).await?;
|
||||
|
||||
// TODO: This should be a persistent background task
|
||||
let servers = services.media.get_interested_servers(&mxc).await;
|
||||
tokio::spawn(dispatch_redaction(
|
||||
mxc.server_name.to_owned(),
|
||||
mxc.media_id.to_owned(),
|
||||
servers,
|
||||
services,
|
||||
));
|
||||
|
||||
Ok(authenticated_media::redact::unstable::Response {})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,6 +108,14 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||
name: "mediaid_user",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "mediaid_redacted",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "mediaid_interestedservername",
|
||||
..descriptor::RANDOM_SMALL
|
||||
},
|
||||
Descriptor {
|
||||
name: "onetimekeyid_onetimekeys",
|
||||
..descriptor::RANDOM_SMALL
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
use std::{str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use conduwuit::{
|
||||
Err, Result, debug, debug_info, err,
|
||||
|
|
@ -6,13 +6,15 @@ use conduwuit::{
|
|||
};
|
||||
use database::{Database, Interfix, Map};
|
||||
use futures::StreamExt;
|
||||
use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition};
|
||||
use ruma::{Mxc, OwnedMxcUri, OwnedServerName, UserId, http_headers::ContentDisposition};
|
||||
|
||||
use super::{preview::UrlPreviewData, thumbnail::Dim};
|
||||
|
||||
pub(crate) struct Data {
|
||||
mediaid_file: Arc<Map>,
|
||||
mediaid_user: Arc<Map>,
|
||||
mediaid_redacted: Arc<Map>,
|
||||
mediaid_interestedservername: Arc<Map>,
|
||||
url_previews: Arc<Map>,
|
||||
}
|
||||
|
||||
|
|
@ -28,6 +30,8 @@ impl Data {
|
|||
Self {
|
||||
mediaid_file: db["mediaid_file"].clone(),
|
||||
mediaid_user: db["mediaid_user"].clone(),
|
||||
mediaid_redacted: db["mediaid_redacted"].clone(),
|
||||
mediaid_interestedservername: db["mediaid_interestedservername"].clone(),
|
||||
url_previews: db["url_previews"].clone(),
|
||||
}
|
||||
}
|
||||
|
|
@ -77,6 +81,22 @@ impl Data {
|
|||
self.mediaid_user.remove(key);
|
||||
})
|
||||
.await;
|
||||
|
||||
self.mediaid_interestedservername
|
||||
.stream_prefix_raw(&prefix)
|
||||
.ignore_err()
|
||||
.ready_for_each(|(key, _)| {
|
||||
debug_assert!(
|
||||
key.starts_with(mxc.to_string().as_bytes()),
|
||||
"key should start with the mxc"
|
||||
);
|
||||
|
||||
debug_info!("Deleting interested server name key {key:?}");
|
||||
|
||||
self.mediaid_interestedservername.remove(key);
|
||||
})
|
||||
.await;
|
||||
// NOTE: Redaction status is kept even after deletion
|
||||
}
|
||||
|
||||
/// Searches for all files with the given MXC
|
||||
|
|
@ -275,4 +295,35 @@ impl Data {
|
|||
image_height,
|
||||
})
|
||||
}
|
||||
|
||||
/// Marks a media item as redacted, preventing it from being served or
|
||||
/// re-used.
|
||||
pub(super) fn mark_redacted(&self, media_id: &str) {
|
||||
self.mediaid_redacted.insert(media_id, []);
|
||||
}
|
||||
|
||||
/// Checks if a media item is redacted.
|
||||
pub(super) async fn is_redacted(&self, media_id: &str) -> bool {
|
||||
self.mediaid_redacted.contains(media_id).await
|
||||
}
|
||||
|
||||
pub(super) fn add_interested_server_name(&self, media_id: &str, server_name: &str) {
|
||||
let key = (media_id, server_name);
|
||||
self.mediaid_interestedservername
|
||||
.insert(&database::serialize_key(&key).expect("key must be serializable"), []);
|
||||
}
|
||||
|
||||
pub(super) async fn interested_server_names(&self, media_id: &str) -> Vec<OwnedServerName> {
|
||||
let prefix = (media_id, Interfix);
|
||||
self.mediaid_interestedservername
|
||||
.stream_prefix_raw(&prefix)
|
||||
.ignore_err()
|
||||
.map(|(key, _)| {
|
||||
let parts: Vec<&[u8]> = key.rsplit(|&b| b == 0xFF).collect();
|
||||
OwnedServerName::parse(string_from_bytes(parts[0]).unwrap_or_default())
|
||||
.unwrap_or_else(|_| OwnedServerName::from_str("invalid.server").unwrap())
|
||||
})
|
||||
.collect()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,9 @@ use conduwuit::{
|
|||
},
|
||||
warn,
|
||||
};
|
||||
use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition};
|
||||
use ruma::{
|
||||
Mxc, OwnedMxcUri, OwnedServerName, ServerName, UserId, http_headers::ContentDisposition,
|
||||
};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
|
|
@ -139,6 +141,28 @@ impl Service {
|
|||
}
|
||||
}
|
||||
|
||||
/// Marks a media ID as redacted, and deletes the associated file.
|
||||
pub async fn redact(&self, mxc: &Mxc<'_>) -> Result<()> {
|
||||
self.db.mark_redacted(mxc.media_id);
|
||||
self.delete(mxc).await
|
||||
}
|
||||
|
||||
/// Checks if a media ID is redacted.
|
||||
pub async fn is_redacted(&self, mxc: &Mxc<'_>) -> bool {
|
||||
self.db.is_redacted(mxc.media_id).await
|
||||
}
|
||||
|
||||
/// Marks a server as "interested" (i.e. has downloaded this media from us).
|
||||
pub fn mark_server_interested(&self, mxc: &Mxc<'_>, server_name: &ServerName) {
|
||||
self.db
|
||||
.add_interested_server_name(mxc.media_id, server_name.as_str());
|
||||
}
|
||||
|
||||
/// Gets all servers interested in this media ID.
|
||||
pub async fn get_interested_servers(&self, mxc: &Mxc<'_>) -> Vec<OwnedServerName> {
|
||||
self.db.interested_server_names(mxc.media_id).await
|
||||
}
|
||||
|
||||
/// Deletes all media by the specified user
|
||||
///
|
||||
/// currently, this is only practical for local users
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue