Implement a more performant 'ReaderStream'.

This commit is contained in:
Sergio Benitez 2021-04-13 17:45:09 -07:00
parent dd1b51f681
commit d7d7bb91ec
2 changed files with 46 additions and 25 deletions

View File

@ -47,7 +47,7 @@ async-trait = "0.1.43"
[dependencies.state]
git = "https://github.com/SergioBenitez/state.git"
rev = "504ef71a"
rev = "8f94dc"
[dependencies.rocket_codegen]
version = "0.5.0-dev"
@ -67,6 +67,14 @@ features = ["tokio-io"]
version = "1.0"
features = ["fs", "io-std", "io-util", "rt-multi-thread", "sync", "signal", "macros"]
[dependencies.tokio-util]
version = "0.6"
default-features = false
features = ["io"]
[dependencies.bytes]
version = "1.0"
[build-dependencies]
yansi = "0.5"
version_check = "0.9.1"

View File

@ -2,47 +2,60 @@ use std::io;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::{ready, stream::Stream};
use bytes::BytesMut;
use tokio::io::{AsyncRead, ReadBuf};
use pin_project_lite::pin_project;
use futures::{ready, stream::Stream};
use crate::http::hyper::Bytes;
pub struct IntoBytesStream<R> {
inner: R,
buf_size: usize,
buffer: Vec<u8>,
pin_project! {
pub struct ReaderStream<R> {
#[pin]
reader: Option<R>,
buf: BytesMut,
cap: usize,
}
}
impl<R> Stream for IntoBytesStream<R>
where R: AsyncRead + Unpin
{
type Item = Result<Bytes, io::Error>;
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
debug_assert!(self.buffer.len() == self.buf_size);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use tokio_util::io::poll_read_buf;
let Self { ref mut inner, ref mut buffer, buf_size } = *self;
let mut this = self.as_mut().project();
let mut buf = ReadBuf::new(&mut buffer[..]);
match Pin::new(inner).poll_read(cx, &mut buf) {
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
this.buf.reserve(*this.cap);
}
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Ready(Ok(())) if buf.filled().is_empty() => Poll::Ready(None),
Poll::Ready(Ok(())) => {
let n = buf.filled().len();
// FIXME(perf).
let mut next = std::mem::replace(buffer, vec![0; buf_size]);
next.truncate(n);
Poll::Ready(Some(Ok(Bytes::from(next))))
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}
pub trait AsyncReadExt: AsyncRead + Sized {
fn into_bytes_stream(self, buf_size: usize) -> IntoBytesStream<Self> {
IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] }
fn into_bytes_stream(self, cap: usize) -> ReaderStream<Self> {
ReaderStream { reader: Some(self), cap, buf: BytesMut::with_capacity(cap) }
}
}