diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index 00a5ff78..da0f9be6 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::io; use std::path::Path; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncWrite; use super::data_stream::DataStream; @@ -47,7 +47,7 @@ const PEEK_BYTES: usize = 512; pub struct Data { buffer: Vec, is_complete: bool, - stream: Box, + stream: AsyncReadBody, } impl Data { @@ -69,7 +69,7 @@ impl Data { /// ``` pub fn open(mut self) -> DataStream { let buffer = std::mem::replace(&mut self.buffer, vec![]); - let stream = std::mem::replace(&mut self.stream, Box::new(&[][..])); + let stream = std::mem::replace(&mut self.stream, AsyncReadBody::empty()); DataStream(buffer, stream) } @@ -210,7 +210,7 @@ impl Data { }; trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES); - Data { buffer: peek_buf, stream: Box::new(stream), is_complete: eof } + Data { buffer: peek_buf, stream, is_complete: eof } } /// This creates a `data` object from a local data source `data`. @@ -218,7 +218,7 @@ impl Data { pub(crate) fn local(data: Vec) -> Data { Data { buffer: data, - stream: Box::new(&[][..]), + stream: AsyncReadBody::empty(), is_complete: true, } } diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index 9e17da45..db4e782f 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -3,20 +3,18 @@ use std::task::{Context, Poll}; use tokio::io::AsyncRead; -// TODO.async: Consider storing the real type here instead of a Box to avoid -// the dynamic dispatch +use crate::ext::AsyncReadBody; + /// Raw data stream of a request body. /// /// This stream can only be obtained by calling /// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data /// in the body of the request. It exposes no methods directly. Instead, it must /// be used as an opaque [`Read`] structure. -pub struct DataStream(pub(crate) Vec, pub(crate) Box); +pub struct DataStream(pub(crate) Vec, pub(crate) AsyncReadBody); // TODO.async: Consider implementing `AsyncBufRead` -// TODO: Have a `BufRead` impl for `DataStream`. At the moment, this isn't -// possible since Hyper's `HttpReader` doesn't implement `BufRead`. impl AsyncRead for DataStream { #[inline(always)] fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index bf5a6264..9e9368ef 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -14,14 +14,13 @@ pub struct IntoBytesStream { buffer: Vec, } -// TODO.async: Verify correctness of this implementation. impl Stream for IntoBytesStream where R: AsyncRead + Unpin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ - assert!(self.buffer.len() == self.buf_size); + debug_assert!(self.buffer.len() == self.buf_size); let Self { ref mut inner, ref mut buffer, buf_size } = *self; @@ -43,7 +42,6 @@ pub trait AsyncReadExt: AsyncRead { IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] } } - // TODO.async: Verify correctness of this implementation. fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> BoxFuture<'_, io::Result> where Self: Send + Unpin { @@ -52,7 +50,7 @@ pub trait AsyncReadExt: AsyncRead { while !buf.is_empty() { match self.read(buf).await { Ok(0) => break, - Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } + Ok(n) => buf = &mut buf[n..], Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } @@ -76,6 +74,12 @@ enum AsyncReadBodyState { Done, } +impl AsyncReadBody { + pub fn empty() -> Self { + Self { inner: hyper::Body::empty(), state: AsyncReadBodyState::Done } + } +} + impl From for AsyncReadBody { fn from(body: hyper::Body) -> Self { Self { inner: body, state: AsyncReadBodyState::Pending }