Add 'Join' stream combinator extension.

This commit is contained in:
Sergio Benitez 2021-06-01 11:46:17 -07:00
parent ed3cc13b84
commit bcd62e5373
1 changed files with 85 additions and 2 deletions

View File

@ -8,7 +8,7 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::time::{sleep, Sleep}; use tokio::time::{sleep, Sleep};
use futures::stream::Stream; use futures::stream::Stream;
use futures::future::{Future, Fuse, FutureExt}; use futures::future::{self, Future, FutureExt};
use crate::http::hyper::Bytes; use crate::http::hyper::Bytes;
@ -150,7 +150,7 @@ pin_project! {
#[pin] #[pin]
io: I, io: I,
#[pin] #[pin]
trigger: Fuse<F>, trigger: future::Fuse<F>,
state: State, state: State,
grace: Duration, grace: Duration,
mercy: Duration, mercy: Duration,
@ -336,3 +336,86 @@ impl<L: Listener, F: Future + Clone> Listener for CancellableListener<F, L> {
})) }))
} }
} }
pub trait StreamExt: Sized + Stream {
fn join<U>(self, other: U) -> Join<Self, U>
where U: Stream<Item = Self::Item>;
}
impl<S: Stream> StreamExt for S {
fn join<U>(self, other: U) -> Join<Self, U>
where U: Stream<Item = Self::Item>
{
Join::new(self, other)
}
}
pin_project! {
/// Stream returned by the [`join`](super::StreamExt::join) method.
pub struct Join<T, U> {
#[pin]
a: T,
#[pin]
b: U,
// When `true`, poll `a` first, otherwise, `poll` b`.
toggle: bool,
// Set when either `a` or `b` return `None`.
done: bool,
}
}
impl<T, U> Join<T, U> {
pub(super) fn new(a: T, b: U) -> Join<T, U>
where T: Stream, U: Stream,
{
Join { a, b, toggle: false, done: false, }
}
fn poll_next<A: Stream, B: Stream<Item = A::Item>>(
first: Pin<&mut A>,
second: Pin<&mut B>,
done: &mut bool,
cx: &mut Context<'_>,
) -> Poll<Option<A::Item>> {
match first.poll_next(cx) {
Poll::Ready(opt) => { *done = opt.is_none(); Poll::Ready(opt) }
Poll::Pending => match second.poll_next(cx) {
Poll::Ready(opt) => { *done = opt.is_none(); Poll::Ready(opt) }
Poll::Pending => Poll::Pending
}
}
}
}
impl<T, U> Stream for Join<T, U>
where T: Stream,
U: Stream<Item = T::Item>,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
if self.done {
return Poll::Ready(None);
}
let me = self.project();
*me.toggle = !*me.toggle;
match *me.toggle {
true => Self::poll_next(me.a, me.b, me.done, cx),
false => Self::poll_next(me.b, me.a, me.done, cx),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (left_low, left_high) = self.a.size_hint();
let (right_low, right_high) = self.b.size_hint();
let low = left_low.saturating_add(right_low);
let high = match (left_high, right_high) {
(Some(h1), Some(h2)) => h1.checked_add(h2),
_ => None,
};
(low, high)
}
}