diff --git a/conduwuit-example.toml b/conduwuit-example.toml index bd0fe69a..d3afc870 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -389,7 +389,15 @@ # #appservice_idle_timeout = 300 -# Notification gateway pusher idle connection pool timeout. +# Notification gateway pusher request connection timeout (seconds). +# +#pusher_conn_timeout = 15 + +# Notification gateway pusher total request timeout (seconds). +# +#pusher_timeout = 60 + +# Notification gateway pusher idle connection pool timeout (seconds). # #pusher_idle_timeout = 15 @@ -1446,6 +1454,11 @@ # #url_preview_max_spider_size = 256000 +# Total request timeout for URL previews (seconds). This includes +# connection, request, and response body reading time. +# +#url_preview_timeout = 120 + # Option to decide whether you would like to run the domain allowlist # checks (contains and explicit) on the root domain or not. Does not apply # to URL contains allowlist. Defaults to false. diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index c0875b31..7efd89f6 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -501,7 +501,19 @@ pub struct Config { #[serde(default = "default_appservice_idle_timeout")] pub appservice_idle_timeout: u64, - /// Notification gateway pusher idle connection pool timeout. + /// Notification gateway pusher request connection timeout (seconds). + /// + /// default: 15 + #[serde(default = "default_pusher_conn_timeout")] + pub pusher_conn_timeout: u64, + + /// Notification gateway pusher total request timeout (seconds). + /// + /// default: 60 + #[serde(default = "default_pusher_timeout")] + pub pusher_timeout: u64, + + /// Notification gateway pusher idle connection pool timeout (seconds). /// /// default: 15 #[serde(default = "default_pusher_idle_timeout")] @@ -1663,6 +1675,13 @@ pub struct Config { #[serde(default = "default_url_preview_max_spider_size")] pub url_preview_max_spider_size: usize, + /// Total request timeout for URL previews (seconds). This includes + /// connection, request, and response body reading time. + /// + /// default: 120 + #[serde(default = "default_url_preview_timeout")] + pub url_preview_timeout: u64, + /// Option to decide whether you would like to run the domain allowlist /// checks (contains and explicit) on the root domain or not. Does not apply /// to URL contains allowlist. Defaults to false. @@ -2473,6 +2492,10 @@ fn default_appservice_timeout() -> u64 { 35 } fn default_appservice_idle_timeout() -> u64 { 300 } +fn default_pusher_conn_timeout() -> u64 { 15 } + +fn default_pusher_timeout() -> u64 { 60 } + fn default_pusher_idle_timeout() -> u64 { 15 } fn default_max_fetch_prev_events() -> u16 { 192_u16 } @@ -2600,6 +2623,8 @@ fn default_url_preview_max_spider_size() -> usize { 256_000 // 256KB } +fn default_url_preview_timeout() -> u64 { 120 } + fn default_new_user_displayname_suffix() -> String { "🏳️‍⚧️".to_owned() } fn default_sentry_endpoint() -> Option { None } diff --git a/src/service/client/mod.rs b/src/service/client/mod.rs index 239340ba..85e6845f 100644 --- a/src/service/client/mod.rs +++ b/src/service/client/mod.rs @@ -47,6 +47,7 @@ impl crate::Service for Service { })? .local_address(url_preview_bind_addr) .dns_resolver(resolver.resolver.clone()) + .timeout(Duration::from_secs(config.url_preview_timeout)) .redirect(redirect::Policy::limited(3)) .build()?, @@ -68,6 +69,11 @@ impl crate::Service for Service { .dns_resolver(resolver.resolver.hooked.clone()) .connect_timeout(Duration::from_secs(config.federation_conn_timeout)) .read_timeout(Duration::from_secs(config.federation_timeout)) + .timeout(Duration::from_secs( + config + .federation_timeout + .saturating_add(config.federation_conn_timeout), + )) .pool_max_idle_per_host(config.federation_idle_per_host.into()) .pool_idle_timeout(Duration::from_secs(config.federation_idle_timeout)) .redirect(redirect::Policy::limited(3)) @@ -77,6 +83,7 @@ impl crate::Service for Service { .dns_resolver(resolver.resolver.hooked.clone()) .connect_timeout(Duration::from_secs(config.federation_conn_timeout)) .read_timeout(Duration::from_secs(305)) + .timeout(Duration::from_secs(120)) .pool_max_idle_per_host(0) .redirect(redirect::Policy::limited(3)) .build()?, @@ -103,6 +110,8 @@ impl crate::Service for Service { pusher: base(config)? .dns_resolver(resolver.resolver.clone()) + .connect_timeout(Duration::from_secs(config.pusher_conn_timeout)) + .timeout(Duration::from_secs(config.pusher_timeout)) .pool_max_idle_per_host(1) .pool_idle_timeout(Duration::from_secs(config.pusher_idle_timeout)) .redirect(redirect::Policy::limited(2)) diff --git a/src/service/federation/execute.rs b/src/service/federation/execute.rs index 1126fc56..33c6d292 100644 --- a/src/service/federation/execute.rs +++ b/src/service/federation/execute.rs @@ -90,7 +90,9 @@ where debug!(%method, %url, "Sending request"); match client.execute(request).await { - | Ok(response) => handle_response::(dest, actual, &method, &url, response).await, + | Ok(response) => + self.handle_response::(dest, actual, &method, &url, response) + .await, | Err(error) => Err(handle_error(actual, &method, &url, error).expect_err("always returns error")), } @@ -119,7 +121,9 @@ fn validate_url(&self, url: &Url) -> Result<()> { Ok(()) } +#[implement(super::Service)] async fn handle_response( + &self, dest: &ServerName, actual: &ActualDest, method: &Method, @@ -162,7 +166,6 @@ async fn into_http_response( .expect("http::response::Builder is usable"), ); - // TODO: handle timeout trace!("Waiting for response body..."); let body = response .bytes() diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 5b315bd5..10090f8d 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -245,7 +245,7 @@ impl Service { .expect("http::response::Builder is usable"), ); - let body = response.bytes().await?; // TODO: handle timeout + let body = response.bytes().await?; if !status.is_success() { debug_warn!("Push gateway response body: {:?}", string_from_bytes(&body)); @@ -288,7 +288,7 @@ impl Service { let mut notify = None; let mut tweaks = Vec::new(); if event.room_id().is_none() { - // TODO(hydra): does this matter? + // This only affects v12+ create events return Ok(()); } diff --git a/src/service/sending/appservice.rs b/src/service/sending/appservice.rs index c7fae11f..dcaf33d5 100644 --- a/src/service/sending/appservice.rs +++ b/src/service/sending/appservice.rs @@ -1,8 +1,7 @@ use std::{fmt::Debug, mem}; use bytes::BytesMut; -use conduwuit::{Err, Result, debug_error, err, trace, utils, warn}; -use reqwest::Client; +use conduwuit::{Err, Result, debug_error, err, implement, trace, utils, warn}; use ruma::api::{ IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, appservice::Registration, }; @@ -11,8 +10,9 @@ use ruma::api::{ /// /// Only returns Ok(None) if there is no url specified in the appservice /// registration file -pub(crate) async fn send_request( - client: &Client, +#[implement(super::Service)] +pub async fn send_appservice_request( + &self, registration: Registration, request: T, ) -> Result> @@ -58,6 +58,8 @@ where let reqwest_request = reqwest::Request::try_from(http_request)?; + let client = &self.services.client.appservice; + let mut response = client.execute(reqwest_request).await.map_err(|e| { warn!("Could not send request to appservice \"{}\" at {dest}: {e:?}", registration.id); e @@ -75,7 +77,7 @@ where .expect("http::response::Builder is usable"), ); - let body = response.bytes().await?; // TODO: handle timeout + let body = response.bytes().await?; if !status.is_success() { debug_error!("Appservice response bytes: {:?}", utils::string_from_bytes(&body)); diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index a0ec50ed..27f3fccc 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -19,10 +19,7 @@ use conduwuit::{ warn, }; use futures::{FutureExt, Stream, StreamExt}; -use ruma::{ - RoomId, ServerName, UserId, - api::{OutgoingRequest, appservice::Registration}, -}; +use ruma::{RoomId, ServerName, UserId, api::OutgoingRequest}; use tokio::{task, task::JoinSet}; use self::data::Data; @@ -319,22 +316,6 @@ impl Service { .await } - /// Sends a request to an appservice - /// - /// Only returns None if there is no url specified in the appservice - /// registration file - pub async fn send_appservice_request( - &self, - registration: Registration, - request: T, - ) -> Result> - where - T: OutgoingRequest + Debug + Send, - { - let client = &self.services.client.appservice; - appservice::send_request(client, registration, request).await - } - /// Clean up queued sending event data /// /// Used after we remove an appservice registration or a user deletes a push diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 86e0cd7d..2044dc51 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -50,9 +50,7 @@ use ruma::{ }; use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; -use super::{ - Destination, EduBuf, EduVec, Msg, SendingEvent, Service, appservice, data::QueueItem, -}; +use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem}; #[derive(Debug)] enum TransactionStatus { @@ -720,18 +718,18 @@ impl Service { //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty // transaction"); - let client = &self.services.client.appservice; - match appservice::send_request( - client, - appservice, - ruma::api::appservice::event::push_events::v1::Request { - events: pdu_jsons, - txn_id: txn_id.into(), - ephemeral: edu_jsons, - to_device: Vec::new(), // TODO - }, - ) - .await + + match self + .send_appservice_request( + appservice, + ruma::api::appservice::event::push_events::v1::Request { + events: pdu_jsons, + txn_id: txn_id.into(), + ephemeral: edu_jsons, + to_device: Vec::new(), // TODO + }, + ) + .await { | Ok(_) => Ok(Destination::Appservice(id)), | Err(e) => Err((Destination::Appservice(id), e)),