mirror of
https://github.com/rwf2/Rocket.git
synced 2025-01-17 23:19:06 +00:00
Ensure that all raw HTTP data is transfer-decoded.
This commit is contained in:
parent
1e5a1b8940
commit
0a8de2f0a6
@ -1,22 +1,24 @@
|
|||||||
use std::io::{self, Read, Write, Cursor, BufReader};
|
use std::io::{self, Read, Write, Cursor, BufReader, Chain, Take};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::mem::transmute;
|
|
||||||
|
|
||||||
#[cfg(feature = "tls")] use hyper_rustls::WrappedStream;
|
#[cfg(feature = "tls")] use hyper_rustls::WrappedStream;
|
||||||
|
|
||||||
use super::data_stream::{DataStream, StreamReader, kill_stream};
|
use super::data_stream::DataStream;
|
||||||
use super::net_stream::NetStream;
|
use super::net_stream::NetStream;
|
||||||
use ext::ReadExt;
|
use ext::ReadExt;
|
||||||
|
|
||||||
|
use http::hyper;
|
||||||
use http::hyper::h1::HttpReader;
|
use http::hyper::h1::HttpReader;
|
||||||
use http::hyper::buffer;
|
|
||||||
use http::hyper::h1::HttpReader::*;
|
use http::hyper::h1::HttpReader::*;
|
||||||
use http::hyper::net::{HttpStream, NetworkStream};
|
use http::hyper::net::{HttpStream, NetworkStream};
|
||||||
|
|
||||||
pub type BodyReader<'a, 'b> =
|
pub type HyperBodyReader<'a, 'b> =
|
||||||
self::HttpReader<&'a mut self::buffer::BufReader<&'b mut NetworkStream>>;
|
self::HttpReader<&'a mut hyper::buffer::BufReader<&'b mut NetworkStream>>;
|
||||||
|
|
||||||
|
// |---- from hyper ----|
|
||||||
|
pub type BodyReader = HttpReader<Chain<Take<Cursor<Vec<u8>>>, BufReader<NetStream>>>;
|
||||||
|
|
||||||
/// The number of bytes to read into the "peek" buffer.
|
/// The number of bytes to read into the "peek" buffer.
|
||||||
const PEEK_BYTES: usize = 4096;
|
const PEEK_BYTES: usize = 4096;
|
||||||
@ -51,12 +53,8 @@ const PEEK_BYTES: usize = 4096;
|
|||||||
/// without consuming the `Data` object.
|
/// without consuming the `Data` object.
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
is_done: bool,
|
is_complete: bool,
|
||||||
// TODO: This sucks as it depends on a TCPStream. Oh, hyper.
|
stream: BodyReader,
|
||||||
stream: StreamReader,
|
|
||||||
// Ideally we wouldn't have these, but Hyper forces us to.
|
|
||||||
position: usize,
|
|
||||||
capacity: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
@ -67,39 +65,28 @@ impl Data {
|
|||||||
/// instance. This ensures that a `Data` type _always_ represents _all_ of
|
/// instance. This ensures that a `Data` type _always_ represents _all_ of
|
||||||
/// the data in a request.
|
/// the data in a request.
|
||||||
pub fn open(mut self) -> DataStream {
|
pub fn open(mut self) -> DataStream {
|
||||||
// Swap out the buffer and stream for empty ones so we can move.
|
let buffer = ::std::mem::replace(&mut self.buffer, vec![]);
|
||||||
let mut buffer = vec![];
|
let empty_stream = Cursor::new(vec![]).take(0)
|
||||||
let mut stream = EmptyReader(self.stream.get_ref().clone());
|
.chain(BufReader::new(NetStream::Local(Cursor::new(vec![]))));
|
||||||
::std::mem::swap(&mut buffer, &mut self.buffer);
|
|
||||||
::std::mem::swap(&mut stream, &mut self.stream);
|
|
||||||
|
|
||||||
// Setup the underlying reader at the correct pointers.
|
let empty_http_stream = HttpReader::SizedReader(empty_stream, 0);
|
||||||
let mut cursor = Cursor::new(buffer);
|
let stream = ::std::mem::replace(&mut self.stream, empty_http_stream);
|
||||||
cursor.set_position(self.position as u64);
|
DataStream(Cursor::new(buffer).chain(stream))
|
||||||
|
|
||||||
// Get a reference to the underlying network stream.
|
|
||||||
let network = stream.get_ref().clone();
|
|
||||||
|
|
||||||
// The first part of the stream is the buffer. Then the real steam.
|
|
||||||
let buf = cursor.take((self.capacity - self.position) as u64);
|
|
||||||
let stream = buf.chain(BufReader::new(stream));
|
|
||||||
|
|
||||||
DataStream::new(stream, network)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: This is absolutely terrible (downcasting!), thanks to Hyper.
|
// FIXME: This is absolutely terrible (downcasting!), thanks to Hyper.
|
||||||
pub(crate) fn from_hyp(mut h_body: BodyReader) -> Result<Data, &'static str> {
|
pub(crate) fn from_hyp(mut body: HyperBodyReader) -> Result<Data, &'static str> {
|
||||||
// Create the Data object from hyper's buffer.
|
// Steal the internal, undecoded data buffer and net stream from Hyper.
|
||||||
let (vec, pos, cap) = h_body.get_mut().take_buf();
|
let (hyper_buf, pos, cap) = body.get_mut().take_buf();
|
||||||
let net_stream = h_body.get_ref().get_ref();
|
let hyper_net_stream = body.get_ref().get_ref();
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
fn concrete_stream(stream: &&mut NetworkStream) -> Option<NetStream> {
|
fn concrete_stream(stream: &&mut NetworkStream) -> Option<NetStream> {
|
||||||
stream.downcast_ref::<HttpStream>()
|
stream.downcast_ref::<WrappedStream>()
|
||||||
.map(|s| NetStream::Http(s.clone()))
|
.map(|s| NetStream::Https(s.clone()))
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
stream.downcast_ref::<WrappedStream>()
|
stream.downcast_ref::<HttpStream>()
|
||||||
.map(|s| NetStream::Https(s.clone()))
|
.map(|s| NetStream::Http(s.clone()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,25 +96,32 @@ impl Data {
|
|||||||
.map(|s| NetStream::Http(s.clone()))
|
.map(|s| NetStream::Http(s.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the underlying HTTPStream from Hyper.
|
// Retrieve the underlying Http(s)Stream from Hyper.
|
||||||
let stream = match concrete_stream(net_stream) {
|
let net_stream = match concrete_stream(hyper_net_stream) {
|
||||||
Some(stream) => stream,
|
Some(net_stream) => net_stream,
|
||||||
None => return Err("Stream is not an HTTP(s) stream!")
|
None => return Err("Stream is not an HTTP(s) stream!")
|
||||||
};
|
};
|
||||||
|
|
||||||
// Set the read timeout to 5 seconds.
|
// Set the read timeout to 5 seconds.
|
||||||
stream.set_read_timeout(Some(Duration::from_secs(5))).expect("timeout set");
|
net_stream.set_read_timeout(Some(Duration::from_secs(5))).expect("timeout set");
|
||||||
|
|
||||||
// Create a reader from the stream. Don't read what's already buffered.
|
// TODO: Explain this.
|
||||||
let buffered = (cap - pos) as u64;
|
trace_!("Hyper buffer: [{}..{}] ({} bytes).", pos, cap, cap - pos);
|
||||||
let reader = match h_body {
|
let (start, remaining) = (pos as u64, (cap - pos) as u64);
|
||||||
SizedReader(_, n) => SizedReader(stream, n - buffered),
|
let mut cursor = Cursor::new(hyper_buf);
|
||||||
EofReader(_) => EofReader(stream),
|
cursor.set_position(start);
|
||||||
EmptyReader(_) => EmptyReader(stream),
|
let inner_data = cursor.take(remaining)
|
||||||
ChunkedReader(_, n) => ChunkedReader(stream, n.map(|k| k - buffered)),
|
.chain(BufReader::new(net_stream.clone()));
|
||||||
|
|
||||||
|
// Create an HTTP reader from the stream.
|
||||||
|
let http_stream = match body {
|
||||||
|
SizedReader(_, n) => SizedReader(inner_data, n),
|
||||||
|
EofReader(_) => EofReader(inner_data),
|
||||||
|
EmptyReader(_) => EmptyReader(inner_data),
|
||||||
|
ChunkedReader(_, n) => ChunkedReader(inner_data, n)
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Data::new(vec, pos, cap, reader))
|
Ok(Data::new(http_stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Retrieve the `peek` buffer.
|
/// Retrieve the `peek` buffer.
|
||||||
@ -138,7 +132,7 @@ impl Data {
|
|||||||
/// buffer contains _all_ of the data in the body of the request.
|
/// buffer contains _all_ of the data in the body of the request.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn peek(&self) -> &[u8] {
|
pub fn peek(&self) -> &[u8] {
|
||||||
&self.buffer[self.position..self.capacity]
|
&self.buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if the `peek` buffer contains all of the data in the body
|
/// Returns true if the `peek` buffer contains all of the data in the body
|
||||||
@ -146,7 +140,7 @@ impl Data {
|
|||||||
/// it does.
|
/// it does.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn peek_complete(&self) -> bool {
|
pub fn peek_complete(&self) -> bool {
|
||||||
self.is_done
|
self.is_complete
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A helper method to write the body of the request to any `Write` type.
|
/// A helper method to write the body of the request to any `Write` type.
|
||||||
@ -171,40 +165,32 @@ impl Data {
|
|||||||
// in the buffer is at `pos` and the buffer has `cap` valid bytes. Thus, the
|
// in the buffer is at `pos` and the buffer has `cap` valid bytes. Thus, the
|
||||||
// bytes `vec[pos..cap]` are buffered and unread. The remainder of the data
|
// bytes `vec[pos..cap]` are buffered and unread. The remainder of the data
|
||||||
// bytes can be read from `stream`.
|
// bytes can be read from `stream`.
|
||||||
pub(crate) fn new(mut buf: Vec<u8>,
|
pub(crate) fn new(mut stream: BodyReader) -> Data {
|
||||||
pos: usize,
|
trace_!("Date::new({:?})", stream);
|
||||||
mut cap: usize,
|
let mut peek_buf = vec![0; PEEK_BYTES];
|
||||||
mut stream: StreamReader
|
|
||||||
) -> Data {
|
|
||||||
// Make sure the buffer is large enough for the bytes we want to peek.
|
|
||||||
if buf.len() < PEEK_BYTES {
|
|
||||||
trace_!("Resizing peek buffer from {} to {}.", buf.len(), PEEK_BYTES);
|
|
||||||
buf.resize(PEEK_BYTES, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fill the buffer with as many bytes as possible. If we read less than
|
// Fill the buffer with as many bytes as possible. If we read less than
|
||||||
// that buffer's length, we know we reached the EOF. Otherwise, it's
|
// that buffer's length, we know we reached the EOF. Otherwise, it's
|
||||||
// unclear, so we just say we didn't reach EOF.
|
// unclear, so we just say we didn't reach EOF.
|
||||||
trace!("Init buffer cap: {}", cap);
|
let eof = match stream.read_max(&mut peek_buf[..]) {
|
||||||
let eof = match stream.read_max(&mut buf[cap..]) {
|
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
trace_!("Filled peek buf with {} bytes.", n);
|
trace_!("Filled peek buf with {} bytes.", n);
|
||||||
cap += n;
|
// TODO: Explain this.
|
||||||
cap < buf.len()
|
unsafe { peek_buf.set_len(n); }
|
||||||
|
n < PEEK_BYTES
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error_!("Failed to read into peek buffer: {:?}.", e);
|
error_!("Failed to read into peek buffer: {:?}.", e);
|
||||||
|
unsafe { peek_buf.set_len(0); }
|
||||||
false
|
false
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
trace_!("Peek buffer size: {}, remaining: {}", buf.len(), buf.len() - cap);
|
trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES);
|
||||||
Data {
|
Data {
|
||||||
buffer: buf,
|
buffer: peek_buf,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
is_done: eof,
|
is_complete: eof,
|
||||||
position: pos,
|
|
||||||
capacity: cap,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,24 +204,23 @@ impl Data {
|
|||||||
(data, rest)
|
(data, rest)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (buf_len, stream_len) = (buf.len(), rest.len() as u64);
|
let stream_len = rest.len() as u64;
|
||||||
let stream = NetStream::Local(Cursor::new(rest));
|
let stream = Cursor::new(vec![]).take(0)
|
||||||
|
.chain(BufReader::new(NetStream::Local(Cursor::new(rest))));
|
||||||
|
|
||||||
Data {
|
Data {
|
||||||
buffer: buf,
|
buffer: buf,
|
||||||
stream: HttpReader::SizedReader(stream, stream_len),
|
stream: HttpReader::SizedReader(stream, stream_len),
|
||||||
is_done: stream_len == 0,
|
is_complete: stream_len == 0,
|
||||||
position: 0,
|
|
||||||
capacity: buf_len,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Data {
|
// impl Drop for Data {
|
||||||
fn drop(&mut self) {
|
// fn drop(&mut self) {
|
||||||
// This is okay since the network stream expects to be shared mutably.
|
// // FIXME: Do a read; if > 1024, kill the stream. Need access to the
|
||||||
unsafe {
|
// // internals of `Chain` to do this efficiently/without crazy baggage.
|
||||||
let stream: &mut StreamReader = transmute(self.stream.by_ref());
|
// // https://github.com/rust-lang/rust/pull/41463
|
||||||
kill_stream(stream, self.stream.get_mut());
|
// let _ = io::copy(&mut self.stream, &mut io::sink());
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
use std::io::{self, BufRead, Read, Cursor, BufReader, Chain, Take};
|
use std::io::{self, BufRead, Read, Cursor, BufReader, Chain};
|
||||||
use std::net::Shutdown;
|
|
||||||
|
|
||||||
use super::net_stream::NetStream;
|
use super::data::BodyReader;
|
||||||
|
|
||||||
use http::hyper::net::NetworkStream;
|
// It's very unfortunate that we have to wrap `BodyReader` in a `BufReader`
|
||||||
use http::hyper::h1::HttpReader;
|
// since it already contains another `BufReader`. The issue is that Hyper's
|
||||||
|
// `HttpReader` doesn't implement `BufRead`. Unfortunately, this will likely
|
||||||
pub type StreamReader = HttpReader<NetStream>;
|
// stay "double buffered" until we switch HTTP libraries.
|
||||||
pub type InnerStream = Chain<Take<Cursor<Vec<u8>>>, BufReader<StreamReader>>;
|
// |-- peek buf --|
|
||||||
|
// pub type InnerStream = Chain<Cursor<Vec<u8>>, BufReader<BodyReader>>;
|
||||||
|
pub type InnerStream = Chain<Cursor<Vec<u8>>, BodyReader>;
|
||||||
|
|
||||||
/// Raw data stream of a request body.
|
/// Raw data stream of a request body.
|
||||||
///
|
///
|
||||||
@ -15,55 +16,33 @@ pub type InnerStream = Chain<Take<Cursor<Vec<u8>>>, BufReader<StreamReader>>;
|
|||||||
/// [Data::open](/rocket/data/struct.Data.html#method.open). The stream contains
|
/// [Data::open](/rocket/data/struct.Data.html#method.open). The stream contains
|
||||||
/// all of the data in the body of the request. It exposes no methods directly.
|
/// all of the data in the body of the request. It exposes no methods directly.
|
||||||
/// Instead, it must be used as an opaque `Read` or `BufRead` structure.
|
/// Instead, it must be used as an opaque `Read` or `BufRead` structure.
|
||||||
pub struct DataStream {
|
pub struct DataStream(pub(crate) InnerStream);
|
||||||
stream: InnerStream,
|
|
||||||
network: NetStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DataStream {
|
|
||||||
#[inline(always)]
|
|
||||||
pub(crate) fn new(stream: InnerStream, network: NetStream) -> DataStream {
|
|
||||||
DataStream { stream, network }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Read for DataStream {
|
impl Read for DataStream {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
self.stream.read(buf)
|
trace_!("DataStream::read()");
|
||||||
|
self.0.read(buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufRead for DataStream {
|
// impl BufRead for DataStream {
|
||||||
#[inline(always)]
|
// #[inline(always)]
|
||||||
fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
// fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
||||||
self.stream.fill_buf()
|
// self.0.fill_buf()
|
||||||
}
|
// }
|
||||||
|
|
||||||
#[inline(always)]
|
// #[inline(always)]
|
||||||
fn consume(&mut self, amt: usize) {
|
// fn consume(&mut self, amt: usize) {
|
||||||
self.stream.consume(amt)
|
// self.0.consume(amt)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
// impl Drop for DataStream {
|
||||||
pub fn kill_stream<S: Read, N: NetworkStream>(stream: &mut S, network: &mut N) {
|
// fn drop(&mut self) {
|
||||||
// Take <= 1k from the stream. If there might be more data, force close.
|
// // FIXME: Do a read; if > 1024, kill the stream. Need access to the
|
||||||
const FLUSH_LEN: u64 = 1024;
|
// // internals of `Chain` to do this efficiently/without crazy baggage.
|
||||||
match io::copy(&mut stream.take(FLUSH_LEN), &mut io::sink()) {
|
// // https://github.com/rust-lang/rust/pull/41463
|
||||||
Ok(FLUSH_LEN) | Err(_) => {
|
// let _ = io::copy(&mut self.0, &mut io::sink());
|
||||||
warn_!("Data left unread. Force closing network stream.");
|
// }
|
||||||
if let Err(e) = network.close(Shutdown::Both) {
|
// }
|
||||||
error_!("Failed to close network stream: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(n) => debug!("flushed {} unread bytes", n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for DataStream {
|
|
||||||
// Be a bad citizen and close the TCP stream if there's unread data.
|
|
||||||
fn drop(&mut self) {
|
|
||||||
kill_stream(&mut self.stream, &mut self.network);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -20,17 +20,21 @@ pub enum NetStream {
|
|||||||
impl io::Read for NetStream {
|
impl io::Read for NetStream {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
match *self {
|
trace_!("NetStream::read()");
|
||||||
|
let res = match *self {
|
||||||
Http(ref mut stream) => stream.read(buf),
|
Http(ref mut stream) => stream.read(buf),
|
||||||
Local(ref mut stream) => stream.read(buf),
|
Local(ref mut stream) => stream.read(buf),
|
||||||
#[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf)
|
#[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf)
|
||||||
}
|
};
|
||||||
|
trace_!("NetStream::read() -- complete");
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl io::Write for NetStream {
|
impl io::Write for NetStream {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
trace_!("NetStream::write()");
|
||||||
match *self {
|
match *self {
|
||||||
Http(ref mut stream) => stream.write(buf),
|
Http(ref mut stream) => stream.write(buf),
|
||||||
Local(ref mut stream) => stream.write(buf),
|
Local(ref mut stream) => stream.write(buf),
|
||||||
@ -85,3 +89,20 @@ impl NetworkStream for NetStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// impl Drop for NetStream {
|
||||||
|
// fn drop(&mut self) {
|
||||||
|
// // Take <= 1k from the stream. If there might be more data, force close.
|
||||||
|
// trace_!("Dropping the network stream...");
|
||||||
|
// // const FLUSH_LEN: u64 = 1024;
|
||||||
|
// // match io::copy(&mut self.take(FLUSH_LEN), &mut io::sink()) {
|
||||||
|
// // Ok(FLUSH_LEN) | Err(_) => {
|
||||||
|
// // warn_!("Data left unread. Force closing network stream.");
|
||||||
|
// // if let Err(e) = self.close(Shutdown::Both) {
|
||||||
|
// // error_!("Failed to close network stream: {:?}", e);
|
||||||
|
// // }
|
||||||
|
// // }
|
||||||
|
// // Ok(n) => debug!("flushed {} unread bytes", n)
|
||||||
|
// // }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
Loading…
Reference in New Issue
Block a user