//! Broadband stream combinator extensions to futures::Stream use std::convert::identity; use futures::{ stream::{Stream, StreamExt}, Future, }; use super::{automatic_width, ReadyExt}; /// Concurrency extensions to augment futures::StreamExt. broad_ combinators /// produce out-of-order pub trait BroadbandExt where Self: Stream + Send + Sized, { fn broadn_all(self, n: N, f: F) -> impl Future + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send; fn broadn_any(self, n: N, f: F) -> impl Future + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send; /// Concurrent filter_map(); unordered results fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send; fn broadn_then(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send; #[inline] fn broad_all(self, f: F) -> impl Future + Send where F: Fn(Item) -> Fut + Send, Fut: Future + Send, { self.broadn_all(None, f) } #[inline] fn broad_any(self, f: F) -> impl Future + Send where F: Fn(Item) -> Fut + Send, Fut: Future + Send, { self.broadn_any(None, f) } #[inline] fn broad_filter_map(self, f: F) -> impl Stream + Send where F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send, { self.broadn_filter_map(None, f) } #[inline] fn broad_then(self, f: F) -> impl Stream + Send where F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send, { self.broadn_then(None, f) } } impl BroadbandExt for S where S: Stream + Send + Sized, { #[inline] fn broadn_all(self, n: N, f: F) -> impl Future + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, { self.map(f) .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_all(identity) } #[inline] fn broadn_any(self, n: N, f: F) -> impl Future + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, { self.map(f) .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_any(identity) } #[inline] fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send, { self.map(f) .buffer_unordered(n.into().unwrap_or_else(automatic_width)) .ready_filter_map(identity) } #[inline] fn broadn_then(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send, { self.map(f) .buffer_unordered(n.into().unwrap_or_else(automatic_width)) } }