From 1a7bda209b657764a876ba078f698629b5292f76 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 27 Oct 2025 07:20:23 +0000 Subject: [PATCH] feat: Implement Dehydrated Devices MSC3814 Co-authored-by: Jade Ellis Signed-off-by: Jason Volk --- Cargo.toml | 1 + src/api/client/dehydrated_device.rs | 121 ++++++++++++++++++++ src/api/client/mod.rs | 2 + src/api/client/sync/v3/mod.rs | 3 +- src/api/client/sync/v5.rs | 1 + src/api/client/unversioned.rs | 1 + src/api/router.rs | 4 + src/database/maps.rs | 4 + src/service/sending/sender.rs | 4 +- src/service/users/dehydrated_device.rs | 149 +++++++++++++++++++++++++ src/service/users/mod.rs | 15 ++- 11 files changed, 299 insertions(+), 6 deletions(-) create mode 100644 src/api/client/dehydrated_device.rs create mode 100644 src/service/users/dehydrated_device.rs diff --git a/Cargo.toml b/Cargo.toml index eda0628b..2a9ab25e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -363,6 +363,7 @@ features = [ "unstable-msc2870", "unstable-msc3026", "unstable-msc3061", + "unstable-msc3814", "unstable-msc3245", "unstable-msc3266", "unstable-msc3381", # polls diff --git a/src/api/client/dehydrated_device.rs b/src/api/client/dehydrated_device.rs new file mode 100644 index 00000000..e665ec41 --- /dev/null +++ b/src/api/client/dehydrated_device.rs @@ -0,0 +1,121 @@ +use axum::extract::State; +use axum_client_ip::InsecureClientIp; +use conduwuit::{Err, Result, at}; +use futures::StreamExt; +use ruma::api::client::dehydrated_device::{ + delete_dehydrated_device::unstable as delete_dehydrated_device, + get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events, + put_dehydrated_device::unstable as put_dehydrated_device, +}; + +use crate::Ruma; + +const MAX_BATCH_EVENTS: usize = 50; + +/// # `PUT /_matrix/client/../dehydrated_device` +/// +/// Creates or overwrites the user's dehydrated device. +#[tracing::instrument(skip_all, fields(%client))] +pub(crate) async fn put_dehydrated_device_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let sender_user = body + .sender_user + .as_deref() + .expect("AccessToken authentication required"); + + let device_id = body.body.device_id.clone(); + + services + .users + .set_dehydrated_device(sender_user, body.body) + .await?; + + Ok(put_dehydrated_device::Response { device_id }) +} + +/// # `DELETE /_matrix/client/../dehydrated_device` +/// +/// Deletes the user's dehydrated device without replacement. +#[tracing::instrument(skip_all, fields(%client))] +pub(crate) async fn delete_dehydrated_device_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let sender_user = body.sender_user(); + + let device_id = services.users.get_dehydrated_device_id(sender_user).await?; + + services.users.remove_device(sender_user, &device_id).await; + + Ok(delete_dehydrated_device::Response { device_id }) +} + +/// # `GET /_matrix/client/../dehydrated_device` +/// +/// Gets the user's dehydrated device +#[tracing::instrument(skip_all, fields(%client))] +pub(crate) async fn get_dehydrated_device_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let sender_user = body.sender_user(); + + let device = services.users.get_dehydrated_device(sender_user).await?; + + Ok(get_dehydrated_device::Response { + device_id: device.device_id, + device_data: device.device_data, + }) +} + +/// # `GET /_matrix/client/../dehydrated_device/{device_id}/events` +/// +/// Paginates the events of the dehydrated device. +#[tracing::instrument(skip_all, fields(%client))] +pub(crate) async fn get_dehydrated_events_route( + State(services): State, + InsecureClientIp(client): InsecureClientIp, + body: Ruma, +) -> Result { + let sender_user = body.sender_user(); + + let device_id = &body.body.device_id; + let existing_id = services.users.get_dehydrated_device_id(sender_user).await; + + if existing_id.as_ref().is_err() + || existing_id + .as_ref() + .is_ok_and(|existing_id| existing_id != device_id) + { + return Err!(Request(Forbidden("Not the dehydrated device_id."))); + } + + let since: Option = body + .body + .next_batch + .as_deref() + .map(str::parse) + .transpose()?; + + let mut next_batch: Option = None; + let events = services + .users + .get_to_device_events(sender_user, device_id, since, None) + .take(MAX_BATCH_EVENTS) + .inspect(|&(count, _)| { + next_batch.replace(count); + }) + .map(at!(1)) + .collect() + .await; + + Ok(get_events::Response { + events, + next_batch: next_batch.as_ref().map(ToString::to_string), + }) +} diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 0014282c..ec64d812 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -6,6 +6,7 @@ pub(super) mod appservice; pub(super) mod backup; pub(super) mod capabilities; pub(super) mod context; +pub(super) mod dehydrated_device; pub(super) mod device; pub(super) mod directory; pub(super) mod filter; @@ -49,6 +50,7 @@ pub(super) use appservice::*; pub(super) use backup::*; pub(super) use capabilities::*; pub(super) use context::*; +pub(super) use dehydrated_device::*; pub(super) use device::*; pub(super) use directory::*; pub(super) use filter::*; diff --git a/src/api/client/sync/v3/mod.rs b/src/api/client/sync/v3/mod.rs index e371d275..247b5c9b 100644 --- a/src/api/client/sync/v3/mod.rs +++ b/src/api/client/sync/v3/mod.rs @@ -11,7 +11,7 @@ use std::{ use axum::extract::State; use axum_client_ip::InsecureClientIp; use conduwuit::{ - Result, extract_variant, + Result, at, extract_variant, utils::{ ReadyExt, TryFutureExtExt, stream::{BroadbandExt, Tools, WidebandExt}, @@ -385,6 +385,7 @@ pub(crate) async fn build_sync_events( last_sync_end_count, Some(current_count), ) + .map(at!(1)) .collect::>(); let device_one_time_keys_count = services diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index ee098296..49bdf1a8 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1029,6 +1029,7 @@ async fn collect_to_device( events: services .users .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) + .map(at!(1)) .collect() .await, }) diff --git a/src/api/client/unversioned.rs b/src/api/client/unversioned.rs index f377ed54..96ee2d51 100644 --- a/src/api/client/unversioned.rs +++ b/src/api/client/unversioned.rs @@ -50,6 +50,7 @@ pub(crate) async fn get_supported_versions_route( ("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */ ("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */ ("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */ + ("org.matrix.msc3814".to_owned(), true), /* dehydrated devices */ ("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */ ("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */ ("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */ diff --git a/src/api/router.rs b/src/api/router.rs index 3166c2af..7b6558a7 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -160,6 +160,10 @@ pub fn build(router: Router, server: &Server) -> Router { .ruma_route(&client::update_device_route) .ruma_route(&client::delete_device_route) .ruma_route(&client::delete_devices_route) + .ruma_route(&client::put_dehydrated_device_route) + .ruma_route(&client::delete_dehydrated_device_route) + .ruma_route(&client::get_dehydrated_device_route) + .ruma_route(&client::get_dehydrated_events_route) .ruma_route(&client::get_tags_route) .ruma_route(&client::update_tag_route) .ruma_route(&client::delete_tag_route) diff --git a/src/database/maps.rs b/src/database/maps.rs index fc6ed8ab..20286b33 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -362,6 +362,10 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "userid_blurhash", ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "userid_dehydrateddevice", + ..descriptor::RANDOM_SMALL + }, Descriptor { name: "userid_devicelistversion", ..descriptor::RANDOM_SMALL diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 2044dc51..e7f9544c 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -10,7 +10,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use conduwuit_core::{ - Error, Event, Result, debug, err, error, + Error, Event, Result, at, debug, err, error, result::LogErr, trace, utils::{ @@ -175,7 +175,7 @@ impl Service { if !new_events.is_empty() { self.db.mark_as_active(new_events.iter()); - let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); + let new_events_vec = new_events.into_iter().map(at!(1)).collect(); futures.push(self.send_events(dest.clone(), new_events_vec)); } else { statuses.remove(dest); diff --git a/src/service/users/dehydrated_device.rs b/src/service/users/dehydrated_device.rs new file mode 100644 index 00000000..cacce5f6 --- /dev/null +++ b/src/service/users/dehydrated_device.rs @@ -0,0 +1,149 @@ +use conduwuit::{Err, Result, implement, trace}; +use conduwuit_database::{Deserialized, Json}; +use ruma::{ + DeviceId, OwnedDeviceId, UserId, + api::client::dehydrated_device::{ + DehydratedDeviceData, put_dehydrated_device::unstable::Request, + }, + serde::Raw, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DehydratedDevice { + /// Unique ID of the device. + pub device_id: OwnedDeviceId, + + /// Contains serialized and encrypted private data. + pub device_data: Raw, +} + +/// Creates or recreates the user's dehydrated device. +#[implement(super::Service)] +#[tracing::instrument( + level = "info", + skip_all, + fields( + %user_id, + device_id = %request.device_id, + display_name = ?request.initial_device_display_name, + ) +)] +pub async fn set_dehydrated_device(&self, user_id: &UserId, request: Request) -> Result { + assert!( + self.exists(user_id).await, + "Tried to create dehydrated device for non-existent user" + ); + + let existing_id = self.get_dehydrated_device_id(user_id).await; + + if existing_id.is_err() + && self + .get_device_metadata(user_id, &request.device_id) + .await + .is_ok() + { + return Err!("A hydrated device already exists with that ID."); + } + + if let Ok(existing_id) = existing_id { + self.remove_device(user_id, &existing_id).await; + } + + self.create_device( + user_id, + &request.device_id, + "", + request.initial_device_display_name.clone(), + None, + ) + .await?; + + trace!(device_data = ?request.device_data); + self.db.userid_dehydrateddevice.raw_put( + user_id, + Json(&DehydratedDevice { + device_id: request.device_id.clone(), + device_data: request.device_data, + }), + ); + + trace!(device_keys = ?request.device_keys); + self.add_device_keys(user_id, &request.device_id, &request.device_keys) + .await; + + trace!(one_time_keys = ?request.one_time_keys); + for (one_time_key_key, one_time_key_value) in &request.one_time_keys { + self.add_one_time_key(user_id, &request.device_id, one_time_key_key, one_time_key_value) + .await?; + } + + Ok(()) +} + +/// Removes a user's dehydrated device. +/// +/// Calling this directly will remove the dehydrated data but leak the frontage +/// device. Thus this is called by the regular device interface such that the +/// dehydrated data will not leak instead. +/// +/// If device_id is given, the user's dehydrated device must match or this is a +/// no-op, but an Err is still returned to indicate that. Otherwise returns the +/// removed dehydrated device_id. +#[implement(super::Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields( + %user_id, + device_id = ?maybe_device_id, + ) +)] +pub(super) async fn remove_dehydrated_device( + &self, + user_id: &UserId, + maybe_device_id: Option<&DeviceId>, +) -> Result { + let Ok(device_id) = self.get_dehydrated_device_id(user_id).await else { + return Err!(Request(NotFound("No dehydrated device for this user."))); + }; + + if let Some(maybe_device_id) = maybe_device_id { + if maybe_device_id != device_id { + return Err!(Request(NotFound("Not the user's dehydrated device."))); + } + } + + self.db.userid_dehydrateddevice.remove(user_id); + + Ok(device_id) +} + +/// Get the device_id of the user's dehydrated device. +#[implement(super::Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%user_id) +)] +pub async fn get_dehydrated_device_id(&self, user_id: &UserId) -> Result { + self.get_dehydrated_device(user_id) + .await + .map(|device| device.device_id) +} + +/// Get the dehydrated device private data +#[implement(super::Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%user_id), + ret, +)] +pub async fn get_dehydrated_device(&self, user_id: &UserId) -> Result { + self.db + .userid_dehydrateddevice + .get(user_id) + .await + .deserialized() +} diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 1b96203c..5f278785 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -1,3 +1,5 @@ +pub(super) mod dehydrated_device; + #[cfg(feature = "ldap")] use std::collections::HashMap; use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc}; @@ -5,7 +7,7 @@ use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc}; #[cfg(feature = "ldap")] use conduwuit::result::LogErr; use conduwuit::{ - Err, Error, Result, Server, at, debug_warn, err, is_equal_to, trace, + Err, Error, Result, Server, debug_warn, err, is_equal_to, trace, utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted}, }; #[cfg(feature = "ldap")] @@ -70,6 +72,7 @@ struct Data { userfilterid_filter: Arc, userid_avatarurl: Arc, userid_blurhash: Arc, + userid_dehydrateddevice: Arc, userid_devicelistversion: Arc, userid_displayname: Arc, userid_lastonetimekeyupdate: Arc, @@ -110,6 +113,7 @@ impl crate::Service for Service { userfilterid_filter: args.db["userfilterid_filter"].clone(), userid_avatarurl: args.db["userid_avatarurl"].clone(), userid_blurhash: args.db["userid_blurhash"].clone(), + userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(), userid_devicelistversion: args.db["userid_devicelistversion"].clone(), userid_displayname: args.db["userid_displayname"].clone(), userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(), @@ -480,6 +484,11 @@ impl Service { /// Removes a device from a user. pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) { + // Remove dehydrated device if this is the dehydrated device + let _: Result<_> = self + .remove_dehydrated_device(user_id, Some(device_id)) + .await; + let userdeviceid = (user_id, device_id); // Remove tokens @@ -1003,7 +1012,7 @@ impl Service { device_id: &'a DeviceId, since: Option, to: Option, - ) -> impl Stream> + Send + 'a { + ) -> impl Stream)> + Send + 'a { type Key<'a> = (&'a UserId, &'a DeviceId, u64); let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1))); @@ -1017,7 +1026,7 @@ impl Service { && device_id == *device_id_ && to.is_none_or(|to| *count <= to) }) - .map(at!(1)) + .map(|((_, _, count), event)| (count, event)) } pub async fn remove_to_device_events(