refactor: Make stream utils generic over the error type

This commit is contained in:
Jade Ellis 2026-02-22 01:10:11 +00:00 committed by timedout
parent 6637e4c6a7
commit 47e2733ea1
No known key found for this signature in database
GPG key ID: 0FA334385D0B689F
4 changed files with 11 additions and 12 deletions

View file

@ -209,7 +209,7 @@ pub(super) async fn compact(
let parallelism = parallelism.unwrap_or(1);
let results = maps
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.paralleln_and_then(runtime, parallelism, move |map| {
map.compact_blocking(options.clone())?;
Ok(map.name().to_owned())

View file

@ -3,19 +3,17 @@ use futures::{
stream::{Stream, TryStream},
};
use crate::{Error, Result};
pub trait IterStream<I: IntoIterator + Send> {
/// Convert an Iterator into a Stream
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send;
/// Convert an Iterator into a TryStream
fn try_stream(
/// Convert an Iterator into a TryStream with a generic error type
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send;
}
@ -28,12 +26,12 @@ where
fn stream(self) -> impl Stream<Item = <I as IntoIterator>::Item> + Send { stream::iter(self) }
#[inline]
fn try_stream(
fn try_stream<E>(
self,
) -> impl TryStream<
Ok = <I as IntoIterator>::Item,
Error = Error,
Item = Result<<I as IntoIterator>::Item, Error>,
Error = E,
Item = Result<<I as IntoIterator>::Item, E>,
> + Send {
self.stream().map(Ok)
}

View file

@ -1,9 +1,10 @@
//! Synchronous combinator extensions to futures::TryStream
use std::result::Result;
use futures::{TryFuture, TryStream, TryStreamExt};
use super::automatic_width;
use crate::Result;
/// Concurrency extensions to augment futures::TryStreamExt. broad_ combinators
/// produce out-of-order

View file

@ -142,7 +142,7 @@ async fn get_auth_chain_outer(
let chunk_cache: Vec<_> = chunk
.into_iter()
.try_stream()
.try_stream::<conduwuit::Error>()
.broad_and_then(|(shortid, event_id)| async move {
if let Ok(cached) = self.get_cached_eventid_authchain(&[shortid]).await {
return Ok(cached.to_vec());