feat: Implement Dehydrated Devices MSC3814

Co-authored-by: Jade Ellis <jade@ellis.link>
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-10-27 07:20:23 +00:00 committed by Jade Ellis
parent 7e1950b3d2
commit 1a7bda209b
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2
11 changed files with 299 additions and 6 deletions

View file

@ -363,6 +363,7 @@ features = [
"unstable-msc2870", "unstable-msc2870",
"unstable-msc3026", "unstable-msc3026",
"unstable-msc3061", "unstable-msc3061",
"unstable-msc3814",
"unstable-msc3245", "unstable-msc3245",
"unstable-msc3266", "unstable-msc3266",
"unstable-msc3381", # polls "unstable-msc3381", # polls

View file

@ -0,0 +1,121 @@
use axum::extract::State;
use axum_client_ip::InsecureClientIp;
use conduwuit::{Err, Result, at};
use futures::StreamExt;
use ruma::api::client::dehydrated_device::{
delete_dehydrated_device::unstable as delete_dehydrated_device,
get_dehydrated_device::unstable as get_dehydrated_device, get_events::unstable as get_events,
put_dehydrated_device::unstable as put_dehydrated_device,
};
use crate::Ruma;
const MAX_BATCH_EVENTS: usize = 50;
/// # `PUT /_matrix/client/../dehydrated_device`
///
/// Creates or overwrites the user's dehydrated device.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn put_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<put_dehydrated_device::Request>,
) -> Result<put_dehydrated_device::Response> {
let sender_user = body
.sender_user
.as_deref()
.expect("AccessToken authentication required");
let device_id = body.body.device_id.clone();
services
.users
.set_dehydrated_device(sender_user, body.body)
.await?;
Ok(put_dehydrated_device::Response { device_id })
}
/// # `DELETE /_matrix/client/../dehydrated_device`
///
/// Deletes the user's dehydrated device without replacement.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn delete_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<delete_dehydrated_device::Request>,
) -> Result<delete_dehydrated_device::Response> {
let sender_user = body.sender_user();
let device_id = services.users.get_dehydrated_device_id(sender_user).await?;
services.users.remove_device(sender_user, &device_id).await;
Ok(delete_dehydrated_device::Response { device_id })
}
/// # `GET /_matrix/client/../dehydrated_device`
///
/// Gets the user's dehydrated device
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn get_dehydrated_device_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<get_dehydrated_device::Request>,
) -> Result<get_dehydrated_device::Response> {
let sender_user = body.sender_user();
let device = services.users.get_dehydrated_device(sender_user).await?;
Ok(get_dehydrated_device::Response {
device_id: device.device_id,
device_data: device.device_data,
})
}
/// # `GET /_matrix/client/../dehydrated_device/{device_id}/events`
///
/// Paginates the events of the dehydrated device.
#[tracing::instrument(skip_all, fields(%client))]
pub(crate) async fn get_dehydrated_events_route(
State(services): State<crate::State>,
InsecureClientIp(client): InsecureClientIp,
body: Ruma<get_events::Request>,
) -> Result<get_events::Response> {
let sender_user = body.sender_user();
let device_id = &body.body.device_id;
let existing_id = services.users.get_dehydrated_device_id(sender_user).await;
if existing_id.as_ref().is_err()
|| existing_id
.as_ref()
.is_ok_and(|existing_id| existing_id != device_id)
{
return Err!(Request(Forbidden("Not the dehydrated device_id.")));
}
let since: Option<u64> = body
.body
.next_batch
.as_deref()
.map(str::parse)
.transpose()?;
let mut next_batch: Option<u64> = None;
let events = services
.users
.get_to_device_events(sender_user, device_id, since, None)
.take(MAX_BATCH_EVENTS)
.inspect(|&(count, _)| {
next_batch.replace(count);
})
.map(at!(1))
.collect()
.await;
Ok(get_events::Response {
events,
next_batch: next_batch.as_ref().map(ToString::to_string),
})
}

View file

@ -6,6 +6,7 @@ pub(super) mod appservice;
pub(super) mod backup; pub(super) mod backup;
pub(super) mod capabilities; pub(super) mod capabilities;
pub(super) mod context; pub(super) mod context;
pub(super) mod dehydrated_device;
pub(super) mod device; pub(super) mod device;
pub(super) mod directory; pub(super) mod directory;
pub(super) mod filter; pub(super) mod filter;
@ -49,6 +50,7 @@ pub(super) use appservice::*;
pub(super) use backup::*; pub(super) use backup::*;
pub(super) use capabilities::*; pub(super) use capabilities::*;
pub(super) use context::*; pub(super) use context::*;
pub(super) use dehydrated_device::*;
pub(super) use device::*; pub(super) use device::*;
pub(super) use directory::*; pub(super) use directory::*;
pub(super) use filter::*; pub(super) use filter::*;

View file

@ -11,7 +11,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use conduwuit::{ use conduwuit::{
Result, extract_variant, Result, at, extract_variant,
utils::{ utils::{
ReadyExt, TryFutureExtExt, ReadyExt, TryFutureExtExt,
stream::{BroadbandExt, Tools, WidebandExt}, stream::{BroadbandExt, Tools, WidebandExt},
@ -385,6 +385,7 @@ pub(crate) async fn build_sync_events(
last_sync_end_count, last_sync_end_count,
Some(current_count), Some(current_count),
) )
.map(at!(1))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let device_one_time_keys_count = services let device_one_time_keys_count = services

View file

@ -1029,6 +1029,7 @@ async fn collect_to_device(
events: services events: services
.users .users
.get_to_device_events(sender_user, sender_device, None, Some(next_batch)) .get_to_device_events(sender_user, sender_device, None, Some(next_batch))
.map(at!(1))
.collect() .collect()
.await, .await,
}) })

View file

@ -50,6 +50,7 @@ pub(crate) async fn get_supported_versions_route(
("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */ ("org.matrix.msc2836".to_owned(), true), /* threading/threads (https://github.com/matrix-org/matrix-spec-proposals/pull/2836) */
("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */ ("org.matrix.msc2946".to_owned(), true), /* spaces/hierarchy summaries (https://github.com/matrix-org/matrix-spec-proposals/pull/2946) */
("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */ ("org.matrix.msc3026.busy_presence".to_owned(), true), /* busy presence status (https://github.com/matrix-org/matrix-spec-proposals/pull/3026) */
("org.matrix.msc3814".to_owned(), true), /* dehydrated devices */
("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */ ("org.matrix.msc3827".to_owned(), true), /* filtering of /publicRooms by room type (https://github.com/matrix-org/matrix-spec-proposals/pull/3827) */
("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */ ("org.matrix.msc3952_intentional_mentions".to_owned(), true), /* intentional mentions (https://github.com/matrix-org/matrix-spec-proposals/pull/3952) */
("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */ ("org.matrix.msc3916.stable".to_owned(), true), /* authenticated media (https://github.com/matrix-org/matrix-spec-proposals/pull/3916) */

View file

@ -160,6 +160,10 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
.ruma_route(&client::update_device_route) .ruma_route(&client::update_device_route)
.ruma_route(&client::delete_device_route) .ruma_route(&client::delete_device_route)
.ruma_route(&client::delete_devices_route) .ruma_route(&client::delete_devices_route)
.ruma_route(&client::put_dehydrated_device_route)
.ruma_route(&client::delete_dehydrated_device_route)
.ruma_route(&client::get_dehydrated_device_route)
.ruma_route(&client::get_dehydrated_events_route)
.ruma_route(&client::get_tags_route) .ruma_route(&client::get_tags_route)
.ruma_route(&client::update_tag_route) .ruma_route(&client::update_tag_route)
.ruma_route(&client::delete_tag_route) .ruma_route(&client::delete_tag_route)

View file

@ -362,6 +362,10 @@ pub(super) static MAPS: &[Descriptor] = &[
name: "userid_blurhash", name: "userid_blurhash",
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL
}, },
Descriptor {
name: "userid_dehydrateddevice",
..descriptor::RANDOM_SMALL
},
Descriptor { Descriptor {
name: "userid_devicelistversion", name: "userid_devicelistversion",
..descriptor::RANDOM_SMALL ..descriptor::RANDOM_SMALL

View file

@ -10,7 +10,7 @@ use std::{
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use conduwuit_core::{ use conduwuit_core::{
Error, Event, Result, debug, err, error, Error, Event, Result, at, debug, err, error,
result::LogErr, result::LogErr,
trace, trace,
utils::{ utils::{
@ -175,7 +175,7 @@ impl Service {
if !new_events.is_empty() { if !new_events.is_empty() {
self.db.mark_as_active(new_events.iter()); self.db.mark_as_active(new_events.iter());
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); let new_events_vec = new_events.into_iter().map(at!(1)).collect();
futures.push(self.send_events(dest.clone(), new_events_vec)); futures.push(self.send_events(dest.clone(), new_events_vec));
} else { } else {
statuses.remove(dest); statuses.remove(dest);

View file

@ -0,0 +1,149 @@
use conduwuit::{Err, Result, implement, trace};
use conduwuit_database::{Deserialized, Json};
use ruma::{
DeviceId, OwnedDeviceId, UserId,
api::client::dehydrated_device::{
DehydratedDeviceData, put_dehydrated_device::unstable::Request,
},
serde::Raw,
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DehydratedDevice {
/// Unique ID of the device.
pub device_id: OwnedDeviceId,
/// Contains serialized and encrypted private data.
pub device_data: Raw<DehydratedDeviceData>,
}
/// Creates or recreates the user's dehydrated device.
#[implement(super::Service)]
#[tracing::instrument(
level = "info",
skip_all,
fields(
%user_id,
device_id = %request.device_id,
display_name = ?request.initial_device_display_name,
)
)]
pub async fn set_dehydrated_device(&self, user_id: &UserId, request: Request) -> Result {
assert!(
self.exists(user_id).await,
"Tried to create dehydrated device for non-existent user"
);
let existing_id = self.get_dehydrated_device_id(user_id).await;
if existing_id.is_err()
&& self
.get_device_metadata(user_id, &request.device_id)
.await
.is_ok()
{
return Err!("A hydrated device already exists with that ID.");
}
if let Ok(existing_id) = existing_id {
self.remove_device(user_id, &existing_id).await;
}
self.create_device(
user_id,
&request.device_id,
"",
request.initial_device_display_name.clone(),
None,
)
.await?;
trace!(device_data = ?request.device_data);
self.db.userid_dehydrateddevice.raw_put(
user_id,
Json(&DehydratedDevice {
device_id: request.device_id.clone(),
device_data: request.device_data,
}),
);
trace!(device_keys = ?request.device_keys);
self.add_device_keys(user_id, &request.device_id, &request.device_keys)
.await;
trace!(one_time_keys = ?request.one_time_keys);
for (one_time_key_key, one_time_key_value) in &request.one_time_keys {
self.add_one_time_key(user_id, &request.device_id, one_time_key_key, one_time_key_value)
.await?;
}
Ok(())
}
/// Removes a user's dehydrated device.
///
/// Calling this directly will remove the dehydrated data but leak the frontage
/// device. Thus this is called by the regular device interface such that the
/// dehydrated data will not leak instead.
///
/// If device_id is given, the user's dehydrated device must match or this is a
/// no-op, but an Err is still returned to indicate that. Otherwise returns the
/// removed dehydrated device_id.
#[implement(super::Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(
%user_id,
device_id = ?maybe_device_id,
)
)]
pub(super) async fn remove_dehydrated_device(
&self,
user_id: &UserId,
maybe_device_id: Option<&DeviceId>,
) -> Result<OwnedDeviceId> {
let Ok(device_id) = self.get_dehydrated_device_id(user_id).await else {
return Err!(Request(NotFound("No dehydrated device for this user.")));
};
if let Some(maybe_device_id) = maybe_device_id {
if maybe_device_id != device_id {
return Err!(Request(NotFound("Not the user's dehydrated device.")));
}
}
self.db.userid_dehydrateddevice.remove(user_id);
Ok(device_id)
}
/// Get the device_id of the user's dehydrated device.
#[implement(super::Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%user_id)
)]
pub async fn get_dehydrated_device_id(&self, user_id: &UserId) -> Result<OwnedDeviceId> {
self.get_dehydrated_device(user_id)
.await
.map(|device| device.device_id)
}
/// Get the dehydrated device private data
#[implement(super::Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%user_id),
ret,
)]
pub async fn get_dehydrated_device(&self, user_id: &UserId) -> Result<DehydratedDevice> {
self.db
.userid_dehydrateddevice
.get(user_id)
.await
.deserialized()
}

View file

@ -1,3 +1,5 @@
pub(super) mod dehydrated_device;
#[cfg(feature = "ldap")] #[cfg(feature = "ldap")]
use std::collections::HashMap; use std::collections::HashMap;
use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc}; use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
@ -5,7 +7,7 @@ use std::{collections::BTreeMap, mem, net::IpAddr, sync::Arc};
#[cfg(feature = "ldap")] #[cfg(feature = "ldap")]
use conduwuit::result::LogErr; use conduwuit::result::LogErr;
use conduwuit::{ use conduwuit::{
Err, Error, Result, Server, at, debug_warn, err, is_equal_to, trace, Err, Error, Result, Server, debug_warn, err, is_equal_to, trace,
utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted}, utils::{self, ReadyExt, stream::TryIgnore, string::Unquoted},
}; };
#[cfg(feature = "ldap")] #[cfg(feature = "ldap")]
@ -70,6 +72,7 @@ struct Data {
userfilterid_filter: Arc<Map>, userfilterid_filter: Arc<Map>,
userid_avatarurl: Arc<Map>, userid_avatarurl: Arc<Map>,
userid_blurhash: Arc<Map>, userid_blurhash: Arc<Map>,
userid_dehydrateddevice: Arc<Map>,
userid_devicelistversion: Arc<Map>, userid_devicelistversion: Arc<Map>,
userid_displayname: Arc<Map>, userid_displayname: Arc<Map>,
userid_lastonetimekeyupdate: Arc<Map>, userid_lastonetimekeyupdate: Arc<Map>,
@ -110,6 +113,7 @@ impl crate::Service for Service {
userfilterid_filter: args.db["userfilterid_filter"].clone(), userfilterid_filter: args.db["userfilterid_filter"].clone(),
userid_avatarurl: args.db["userid_avatarurl"].clone(), userid_avatarurl: args.db["userid_avatarurl"].clone(),
userid_blurhash: args.db["userid_blurhash"].clone(), userid_blurhash: args.db["userid_blurhash"].clone(),
userid_dehydrateddevice: args.db["userid_dehydrateddevice"].clone(),
userid_devicelistversion: args.db["userid_devicelistversion"].clone(), userid_devicelistversion: args.db["userid_devicelistversion"].clone(),
userid_displayname: args.db["userid_displayname"].clone(), userid_displayname: args.db["userid_displayname"].clone(),
userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(), userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(),
@ -480,6 +484,11 @@ impl Service {
/// Removes a device from a user. /// Removes a device from a user.
pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) { pub async fn remove_device(&self, user_id: &UserId, device_id: &DeviceId) {
// Remove dehydrated device if this is the dehydrated device
let _: Result<_> = self
.remove_dehydrated_device(user_id, Some(device_id))
.await;
let userdeviceid = (user_id, device_id); let userdeviceid = (user_id, device_id);
// Remove tokens // Remove tokens
@ -1003,7 +1012,7 @@ impl Service {
device_id: &'a DeviceId, device_id: &'a DeviceId,
since: Option<u64>, since: Option<u64>,
to: Option<u64>, to: Option<u64>,
) -> impl Stream<Item = Raw<AnyToDeviceEvent>> + Send + 'a { ) -> impl Stream<Item = (u64, Raw<AnyToDeviceEvent>)> + Send + 'a {
type Key<'a> = (&'a UserId, &'a DeviceId, u64); type Key<'a> = (&'a UserId, &'a DeviceId, u64);
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1))); let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
@ -1017,7 +1026,7 @@ impl Service {
&& device_id == *device_id_ && device_id == *device_id_
&& to.is_none_or(|to| *count <= to) && to.is_none_or(|to| *count <= to)
}) })
.map(at!(1)) .map(|((_, _, count), event)| (count, event))
} }
pub async fn remove_to_device_events<Until>( pub async fn remove_to_device_events<Until>(