diff --git a/src/admin/query/raw.rs b/src/admin/query/raw.rs index c88cba0c..97579d84 100644 --- a/src/admin/query/raw.rs +++ b/src/admin/query/raw.rs @@ -209,7 +209,7 @@ pub(super) async fn compact( let parallelism = parallelism.unwrap_or(1); let results = maps .into_iter() - .try_stream() + .try_stream::() .paralleln_and_then(runtime, parallelism, move |map| { map.compact_blocking(options.clone())?; Ok(map.name().to_owned()) diff --git a/src/core/utils/stream/iter_stream.rs b/src/core/utils/stream/iter_stream.rs index e9a91b1c..0bcc8ae1 100644 --- a/src/core/utils/stream/iter_stream.rs +++ b/src/core/utils/stream/iter_stream.rs @@ -3,19 +3,17 @@ use futures::{ stream::{Stream, TryStream}, }; -use crate::{Error, Result}; - pub trait IterStream { /// Convert an Iterator into a Stream fn stream(self) -> impl Stream::Item> + Send; - /// Convert an Iterator into a TryStream - fn try_stream( + /// Convert an Iterator into a TryStream with a generic error type + fn try_stream( self, ) -> impl TryStream< Ok = ::Item, - Error = Error, - Item = Result<::Item, Error>, + Error = E, + Item = Result<::Item, E>, > + Send; } @@ -28,12 +26,12 @@ where fn stream(self) -> impl Stream::Item> + Send { stream::iter(self) } #[inline] - fn try_stream( + fn try_stream( self, ) -> impl TryStream< Ok = ::Item, - Error = Error, - Item = Result<::Item, Error>, + Error = E, + Item = Result<::Item, E>, > + Send { self.stream().map(Ok) } diff --git a/src/core/utils/stream/try_broadband.rs b/src/core/utils/stream/try_broadband.rs index 361b4a92..c5c8e687 100644 --- a/src/core/utils/stream/try_broadband.rs +++ b/src/core/utils/stream/try_broadband.rs @@ -1,9 +1,10 @@ //! Synchronous combinator extensions to futures::TryStream +use std::result::Result; + use futures::{TryFuture, TryStream, TryStreamExt}; use super::automatic_width; -use crate::Result; /// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators /// produce out-of-order diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index dfcb1362..936d1193 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -142,7 +142,7 @@ async fn get_auth_chain_outer( let chunk_cache: Vec<_> = chunk .into_iter() - .try_stream() + .try_stream::() .broad_and_then(|(shortid, event_id)| async move { if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await { return Ok(cached.to_vec());