diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 2c7ed2f6..74c2de1e 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -47,7 +47,7 @@ async-trait = "0.1.43" [dependencies.state] git = "https://github.com/SergioBenitez/state.git" -rev = "504ef71a" +rev = "8f94dc" [dependencies.rocket_codegen] version = "0.5.0-dev" @@ -67,6 +67,14 @@ features = ["tokio-io"] version = "1.0" features = ["fs", "io-std", "io-util", "rt-multi-thread", "sync", "signal", "macros"] +[dependencies.tokio-util] +version = "0.6" +default-features = false +features = ["io"] + +[dependencies.bytes] +version = "1.0" + [build-dependencies] yansi = "0.5" version_check = "0.9.1" diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 111deb95..65d297be 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -2,47 +2,60 @@ use std::io; use std::pin::Pin; use std::task::{Poll, Context}; -use futures::{ready, stream::Stream}; +use bytes::BytesMut; use tokio::io::{AsyncRead, ReadBuf}; use pin_project_lite::pin_project; +use futures::{ready, stream::Stream}; use crate::http::hyper::Bytes; -pub struct IntoBytesStream { - inner: R, - buf_size: usize, - buffer: Vec, +pin_project! { + pub struct ReaderStream { + #[pin] + reader: Option, + buf: BytesMut, + cap: usize, + } } -impl Stream for IntoBytesStream - where R: AsyncRead + Unpin -{ - type Item = Result; +impl Stream for ReaderStream { + type Item = std::io::Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ - debug_assert!(self.buffer.len() == self.buf_size); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use tokio_util::io::poll_read_buf; - let Self { ref mut inner, ref mut buffer, buf_size } = *self; + let mut this = self.as_mut().project(); - let mut buf = ReadBuf::new(&mut buffer[..]); - match Pin::new(inner).poll_read(cx, &mut buf) { + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + + if this.buf.capacity() == 0 { + this.buf.reserve(*this.cap); + } + + match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), - Poll::Ready(Ok(())) if buf.filled().is_empty() => Poll::Ready(None), - Poll::Ready(Ok(())) => { - let n = buf.filled().len(); - // FIXME(perf). - let mut next = std::mem::replace(buffer, vec![0; buf_size]); - next.truncate(n); - Poll::Ready(Some(Ok(Bytes::from(next)))) + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) } } } } pub trait AsyncReadExt: AsyncRead + Sized { - fn into_bytes_stream(self, buf_size: usize) -> IntoBytesStream { - IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] } + fn into_bytes_stream(self, cap: usize) -> ReaderStream { + ReaderStream { reader: Some(self), cap, buf: BytesMut::with_capacity(cap) } } }