Clean up handling of body data:

* Minor code and comment tweaks
* Remove dynamic dispatch inside Data and DataStream
This commit is contained in:
Jeb Rosen 2019-12-15 11:10:11 -08:00 committed by Sergio Benitez
parent 571e2ac845
commit 49f4641871
3 changed files with 16 additions and 14 deletions

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::AsyncWrite;
use super::data_stream::DataStream; use super::data_stream::DataStream;
@ -47,7 +47,7 @@ const PEEK_BYTES: usize = 512;
pub struct Data { pub struct Data {
buffer: Vec<u8>, buffer: Vec<u8>,
is_complete: bool, is_complete: bool,
stream: Box<dyn AsyncRead + Unpin + Send + Sync>, stream: AsyncReadBody,
} }
impl Data { impl Data {
@ -69,7 +69,7 @@ impl Data {
/// ``` /// ```
pub fn open(mut self) -> DataStream { pub fn open(mut self) -> DataStream {
let buffer = std::mem::replace(&mut self.buffer, vec![]); let buffer = std::mem::replace(&mut self.buffer, vec![]);
let stream = std::mem::replace(&mut self.stream, Box::new(&[][..])); let stream = std::mem::replace(&mut self.stream, AsyncReadBody::empty());
DataStream(buffer, stream) DataStream(buffer, stream)
} }
@ -210,7 +210,7 @@ impl Data {
}; };
trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES); trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES);
Data { buffer: peek_buf, stream: Box::new(stream), is_complete: eof } Data { buffer: peek_buf, stream, is_complete: eof }
} }
/// This creates a `data` object from a local data source `data`. /// This creates a `data` object from a local data source `data`.
@ -218,7 +218,7 @@ impl Data {
pub(crate) fn local(data: Vec<u8>) -> Data { pub(crate) fn local(data: Vec<u8>) -> Data {
Data { Data {
buffer: data, buffer: data,
stream: Box::new(&[][..]), stream: AsyncReadBody::empty(),
is_complete: true, is_complete: true,
} }
} }

View File

@ -3,20 +3,18 @@ use std::task::{Context, Poll};
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
// TODO.async: Consider storing the real type here instead of a Box to avoid use crate::ext::AsyncReadBody;
// the dynamic dispatch
/// Raw data stream of a request body. /// Raw data stream of a request body.
/// ///
/// This stream can only be obtained by calling /// This stream can only be obtained by calling
/// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data /// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data
/// in the body of the request. It exposes no methods directly. Instead, it must /// in the body of the request. It exposes no methods directly. Instead, it must
/// be used as an opaque [`Read`] structure. /// be used as an opaque [`Read`] structure.
pub struct DataStream(pub(crate) Vec<u8>, pub(crate) Box<dyn AsyncRead + Unpin + Send>); pub struct DataStream(pub(crate) Vec<u8>, pub(crate) AsyncReadBody);
// TODO.async: Consider implementing `AsyncBufRead` // TODO.async: Consider implementing `AsyncBufRead`
// TODO: Have a `BufRead` impl for `DataStream`. At the moment, this isn't
// possible since Hyper's `HttpReader` doesn't implement `BufRead`.
impl AsyncRead for DataStream { impl AsyncRead for DataStream {
#[inline(always)] #[inline(always)]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, std::io::Error>> { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, std::io::Error>> {

View File

@ -14,14 +14,13 @@ pub struct IntoBytesStream<R> {
buffer: Vec<u8>, buffer: Vec<u8>,
} }
// TODO.async: Verify correctness of this implementation.
impl<R> Stream for IntoBytesStream<R> impl<R> Stream for IntoBytesStream<R>
where R: AsyncRead + Unpin where R: AsyncRead + Unpin
{ {
type Item = Result<Bytes, io::Error>; type Item = Result<Bytes, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
assert!(self.buffer.len() == self.buf_size); debug_assert!(self.buffer.len() == self.buf_size);
let Self { ref mut inner, ref mut buffer, buf_size } = *self; let Self { ref mut inner, ref mut buffer, buf_size } = *self;
@ -43,7 +42,6 @@ pub trait AsyncReadExt: AsyncRead {
IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] } IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] }
} }
// TODO.async: Verify correctness of this implementation.
fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> BoxFuture<'_, io::Result<usize>> fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> BoxFuture<'_, io::Result<usize>>
where Self: Send + Unpin where Self: Send + Unpin
{ {
@ -52,7 +50,7 @@ pub trait AsyncReadExt: AsyncRead {
while !buf.is_empty() { while !buf.is_empty() {
match self.read(buf).await { match self.read(buf).await {
Ok(0) => break, Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } Ok(n) => buf = &mut buf[n..],
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e), Err(e) => return Err(e),
} }
@ -76,6 +74,12 @@ enum AsyncReadBodyState {
Done, Done,
} }
impl AsyncReadBody {
pub fn empty() -> Self {
Self { inner: hyper::Body::empty(), state: AsyncReadBodyState::Done }
}
}
impl From<hyper::Body> for AsyncReadBody { impl From<hyper::Body> for AsyncReadBody {
fn from(body: hyper::Body) -> Self { fn from(body: hyper::Body) -> Self {
Self { inner: body, state: AsyncReadBodyState::Pending } Self { inner: body, state: AsyncReadBodyState::Pending }