Upgrade to hyper 0.12:

- Use hyper's MakeService implementation with futures API
- Use tokio runtime to serve HTTP backend
This commit is contained in:
Marc Schreiber 2019-05-19 19:58:19 +02:00 committed by Jeb Rosen
parent 4e6a7ddd5f
commit c067fd396f
15 changed files with 422 additions and 410 deletions

View File

@ -22,7 +22,9 @@ private-cookies = ["cookie/secure"]
[dependencies]
smallvec = "0.6"
percent-encoding = "1"
hyper = { version = "0.10.13", default-features = false }
hyper = { version = "0.12.31", default-features = false, features = ["tokio"] }
http = "0.1.17"
mime = "0.3.13"
time = "0.1"
indexmap = "1.0"
rustls = { version = "0.15", optional = true }

View File

@ -4,72 +4,66 @@
//! These types will, with certainty, be removed with time, but they reside here
//! while necessary.
#[doc(hidden)] pub use hyper::server::Request as Request;
#[doc(hidden)] pub use hyper::server::Response as Response;
#[doc(hidden)] pub use hyper::server::Server as Server;
#[doc(hidden)] pub use hyper::server::Handler as Handler;
#[doc(hidden)] pub use hyper::net;
#[doc(hidden)] pub use hyper::method::Method;
#[doc(hidden)] pub use hyper::status::StatusCode;
#[doc(hidden)] pub use hyper::{Body, Request, Response};
#[doc(hidden)] pub use hyper::body::Payload as Payload;
#[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::uri::RequestUri;
#[doc(hidden)] pub use hyper::http::h1;
#[doc(hidden)] pub use hyper::buffer;
#[doc(hidden)] pub use hyper::server::Server;
#[doc(hidden)] pub use hyper::service::{MakeService, Service};
#[doc(hidden)] pub use hyper::Chunk;
#[doc(hidden)] pub use http::header::HeaderName as HeaderName;
#[doc(hidden)] pub use http::header::HeaderValue as HeaderValue;
#[doc(hidden)] pub use http::method::Method;
#[doc(hidden)] pub use http::request::Parts;
#[doc(hidden)] pub use http::status::StatusCode;
#[doc(hidden)] pub use http::uri::Uri;
/// Type alias to `hyper::Response<'a, hyper::net::Fresh>`.
#[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>;
// TODO #[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>;
/// Reexported Hyper header types.
pub mod header {
use crate::Header;
use hyper::header::Header as HyperHeaderTrait;
macro_rules! import_hyper_items {
($($item:ident),*) => ($(pub use hyper::header::$item;)*)
}
macro_rules! import_hyper_headers {
($($name:ident),*) => ($(
impl std::convert::From<self::$name> for Header<'static> {
fn from(header: self::$name) -> Header<'static> {
Header::new($name::header_name(), header.to_string())
}
}
pub use http::header::$name as $name;
)*)
}
import_hyper_items! {
Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges,
AccessControlAllowCredentials, AccessControlAllowHeaders,
AccessControlAllowMethods, AccessControlExposeHeaders,
AccessControlMaxAge, AccessControlRequestHeaders,
AccessControlRequestMethod, Allow, Authorization, Basic, Bearer,
CacheControl, Connection, ContentDisposition, ContentEncoding,
ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag,
EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince,
IfUnmodifiedSince, LastModified, Location, Origin, Prefer,
PreferenceApplied, Protocol, Quality, QualityItem, Referer,
StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent,
AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset,
ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType,
Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference,
ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem
}
// import_hyper_items! {
// Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges,
// AccessControlAllowCredentials, AccessControlAllowHeaders,
// AccessControlAllowMethods, AccessControlExposeHeaders,
// AccessControlMaxAge, AccessControlRequestHeaders,
// AccessControlRequestMethod, Allow, Authorization, Basic, Bearer,
// CacheControl, Connection, ContentDisposition, ContentEncoding,
// ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag,
// EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince,
// IfUnmodifiedSince, LastModified, Location, Origin, Prefer,
// PreferenceApplied, Protocol, Quality, QualityItem, Referer,
// StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent,
// AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset,
// ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType,
// Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference,
// ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem
// }
//
import_hyper_headers! {
Accept, AccessControlAllowCredentials, AccessControlAllowHeaders,
AccessControlAllowMethods, AccessControlAllowOrigin,
AccessControlExposeHeaders, AccessControlMaxAge,
AccessControlRequestHeaders, AccessControlRequestMethod, AcceptCharset,
AcceptEncoding, AcceptLanguage, AcceptRanges, Allow, CacheControl,
Connection, ContentDisposition, ContentEncoding, ContentLanguage,
ContentLength, ContentRange, Date, ETag, Expect, Expires, Host, IfMatch,
IfModifiedSince, IfNoneMatch, IfRange, IfUnmodifiedSince, LastModified,
Location, Origin, Pragma, Prefer, PreferenceApplied, Range, Referer,
ReferrerPolicy, StrictTransportSecurity, TransferEncoding, Upgrade,
UserAgent, Vary
ACCEPT, ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS,
ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE,
ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ACCEPT_CHARSET,
ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES, ALLOW, CACHE_CONTROL,
CONNECTION, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, DATE, ETAG, EXPECT, EXPIRES, HOST, IF_MATCH,
IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED,
LOCATION, ORIGIN, PRAGMA, RANGE, REFERER,
REFERRER_POLICY, STRICT_TRANSPORT_SECURITY, TRANSFER_ENCODING, UPGRADE,
USER_AGENT, VARY
}
}

View File

@ -1,3 +1,5 @@
extern crate http;
use std::fmt;
use std::str::FromStr;
@ -24,18 +26,18 @@ pub enum Method {
impl Method {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)]
pub fn from_hyp(method: &hyper::Method) -> Option<Method> {
pub fn from_hyp(method: &http::method::Method) -> Option<Method> {
match *method {
hyper::Method::Get => Some(Get),
hyper::Method::Put => Some(Put),
hyper::Method::Post => Some(Post),
hyper::Method::Delete => Some(Delete),
hyper::Method::Options => Some(Options),
hyper::Method::Head => Some(Head),
hyper::Method::Trace => Some(Trace),
hyper::Method::Connect => Some(Connect),
hyper::Method::Patch => Some(Patch),
hyper::Method::Extension(_) => None,
http::method::Method::GET => Some(Get),
http::method::Method::PUT => Some(Put),
http::method::Method::POST => Some(Post),
http::method::Method::DELETE => Some(Delete),
http::method::Method::OPTIONS => Some(Options),
http::method::Method::HEAD => Some(Head),
http::method::Method::TRACE => Some(Trace),
http::method::Method::CONNECT => Some(Connect),
http::method::Method::PATCH => Some(Patch),
_ => None,
}
}

View File

@ -94,6 +94,20 @@ impl<'a> Uri<'a> {
crate::parse::uri::from_str(string)
}
// pub fn from_hyp(uri: &'a hyper::Uri) -> Uri<'a> {
// match uri.is_absolute() {
// true => Uri::Absolute(Absolute::new(
// uri.scheme().unwrap(),
// match uri.host() {
// Some(host) => Some(Authority::new(None, Host::Raw(host), uri.port())),
// None => None
// },
// None
// )),
// false => Uri::Asterisk
// }
// }
/// Returns the internal instance of `Origin` if `self` is a `Uri::Origin`.
/// Otherwise, returns `None`.
///

View File

@ -24,8 +24,10 @@ tls = ["rocket_http/tls"]
private-cookies = ["rocket_http/private-cookies"]
[dependencies]
futures = "0.1"
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" }
tokio = "0.1.16"
yansi = "0.5"
log = "0.4"
toml = "0.4.7"

View File

@ -59,6 +59,7 @@ use yansi::Color::*;
///
/// A function decorated with `catch` must take exactly zero or one arguments.
/// If the catcher takes an argument, it must be of type [`&Request`](Request).
#[derive(Clone)]
pub struct Catcher {
/// The HTTP status code to match against.
pub code: u16,

View File

@ -5,20 +5,12 @@ use std::time::Duration;
#[cfg(feature = "tls")] use super::net_stream::HttpsStream;
use super::data_stream::{DataStream, kill_stream};
use super::data_stream::{DataStream, /* TODO kill_stream */};
use super::net_stream::NetStream;
use crate::ext::ReadExt;
use crate::http::hyper;
use crate::http::hyper::h1::HttpReader;
use crate::http::hyper::h1::HttpReader::*;
use crate::http::hyper::net::{HttpStream, NetworkStream};
pub type HyperBodyReader<'a, 'b> =
self::HttpReader<&'a mut hyper::buffer::BufReader<&'b mut dyn NetworkStream>>;
// |---- from hyper ----|
pub type BodyReader = HttpReader<Chain<Cursor<Vec<u8>>, NetStream>>;
use crate::http::hyper::{self, Payload};
use futures::{Async, Future};
/// The number of bytes to read into the "peek" buffer.
const PEEK_BYTES: usize = 512;
@ -56,9 +48,7 @@ const PEEK_BYTES: usize = 512;
/// body data. This enables partially or fully reading from a `Data` object
/// without consuming the `Data` object.
pub struct Data {
buffer: Vec<u8>,
is_complete: bool,
stream: BodyReader,
body: Vec<u8>,
}
impl Data {
@ -79,62 +69,11 @@ impl Data {
/// }
/// ```
pub fn open(mut self) -> DataStream {
let buffer = std::mem::replace(&mut self.buffer, vec![]);
let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty);
// FIXME: Insert a `BufReader` in front of the `NetStream` with capacity
// 4096. We need the new `Chain` methods to get the inner reader to
// actually do this, however.
let empty_http_stream = HttpReader::SizedReader(empty_stream, 0);
let stream = std::mem::replace(&mut self.stream, empty_http_stream);
DataStream(Cursor::new(buffer).chain(stream))
}
// FIXME: This is absolutely terrible (downcasting!), thanks to Hyper.
pub(crate) fn from_hyp(mut body: HyperBodyReader<'_, '_>) -> Result<Data, &'static str> {
#[inline(always)]
#[cfg(feature = "tls")]
fn concrete_stream(stream: &mut dyn NetworkStream) -> Option<NetStream> {
stream.downcast_ref::<HttpsStream>()
.map(|s| NetStream::Https(s.clone()))
.or_else(|| {
stream.downcast_ref::<HttpStream>()
.map(|s| NetStream::Http(s.clone()))
})
}
#[inline(always)]
#[cfg(not(feature = "tls"))]
fn concrete_stream(stream: &mut dyn NetworkStream) -> Option<NetStream> {
stream.downcast_ref::<HttpStream>()
.map(|s| NetStream::Http(s.clone()))
}
// Retrieve the underlying Http(s)Stream from Hyper.
let net_stream = match concrete_stream(*body.get_mut().get_mut()) {
Some(net_stream) => net_stream,
None => return Err("Stream is not an HTTP(s) stream!")
};
// Set the read timeout to 5 seconds.
let _ = net_stream.set_read_timeout(Some(Duration::from_secs(5)));
// Steal the internal, undecoded data buffer from Hyper.
let (mut hyper_buf, pos, cap) = body.get_mut().take_buf();
hyper_buf.truncate(cap); // slow, but safe
let mut cursor = Cursor::new(hyper_buf);
cursor.set_position(pos as u64);
// Create an HTTP reader from the buffer + stream.
let inner_data = cursor.chain(net_stream);
let http_stream = match body {
SizedReader(_, n) => SizedReader(inner_data, n),
EofReader(_) => EofReader(inner_data),
EmptyReader(_) => EmptyReader(inner_data),
ChunkedReader(_, n) => ChunkedReader(inner_data, n)
};
Ok(Data::new(http_stream))
let stream = ::std::mem::replace(&mut self.body, vec![]);
DataStream(Cursor::new(stream))
}
/// Retrieve the `peek` buffer.
@ -155,10 +94,10 @@ impl Data {
/// ```
#[inline(always)]
pub fn peek(&self) -> &[u8] {
if self.buffer.len() > PEEK_BYTES {
&self.buffer[..PEEK_BYTES]
if self.body.len() > PEEK_BYTES {
&self.body[..PEEK_BYTES]
} else {
&self.buffer
&self.body
}
}
@ -179,7 +118,8 @@ impl Data {
/// ```
#[inline(always)]
pub fn peek_complete(&self) -> bool {
self.is_complete
// TODO self.is_complete
true
}
/// A helper method to write the body of the request to any `Write` type.
@ -230,49 +170,8 @@ impl Data {
// bytes `vec[pos..cap]` are buffered and unread. The remainder of the data
// bytes can be read from `stream`.
#[inline(always)]
pub(crate) fn new(mut stream: BodyReader) -> Data {
trace_!("Data::new({:?})", stream);
let mut peek_buf: Vec<u8> = vec![0; PEEK_BYTES];
// Fill the buffer with as many bytes as possible. If we read less than
// that buffer's length, we know we reached the EOF. Otherwise, it's
// unclear, so we just say we didn't reach EOF.
let eof = match stream.read_max(&mut peek_buf[..]) {
Ok(n) => {
trace_!("Filled peek buf with {} bytes.", n);
// We can use `set_len` here instead of `truncate`, but we'll
// take the performance hit to avoid `unsafe`. All of this code
// should go away when we migrate away from hyper 0.10.x.
peek_buf.truncate(n);
n < PEEK_BYTES
}
Err(e) => {
error_!("Failed to read into peek buffer: {:?}.", e);
// Likewise here as above.
peek_buf.truncate(0);
false
},
};
trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES);
Data { buffer: peek_buf, stream, is_complete: eof }
pub(crate) fn new(body: Vec<u8>) -> Data {
Data { body }
}
/// This creates a `data` object from a local data source `data`.
#[inline]
pub(crate) fn local(data: Vec<u8>) -> Data {
let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty);
Data {
buffer: data,
stream: HttpReader::SizedReader(empty_stream, 0),
is_complete: true,
}
}
}
impl Drop for Data {
fn drop(&mut self) {
kill_stream(&mut self.stream);
}
}

View File

@ -1,12 +1,7 @@
use std::io::{self, Read, Cursor, Chain};
use std::io::{self, Chain, Cursor, Read, Write};
use std::net::Shutdown;
use super::data::BodyReader;
use crate::http::hyper::net::NetworkStream;
use crate::http::hyper::h1::HttpReader;
// |-- peek buf --|
pub type InnerStream = Chain<Cursor<Vec<u8>>, BodyReader>;
pub type InnerStream = Cursor<Vec<u8>>;
/// Raw data stream of a request body.
///
@ -26,9 +21,9 @@ impl Read for DataStream {
}
}
pub fn kill_stream(stream: &mut BodyReader) {
/* pub fn kill_stream(stream: &mut BodyReader) {
// Only do the expensive reading if we're not sure we're done.
use self::HttpReader::*;
// TODO use self::HttpReader::*;
match *stream {
SizedReader(_, n) | ChunkedReader(_, Some(n)) if n > 0 => { /* continue */ },
_ => return
@ -46,10 +41,10 @@ pub fn kill_stream(stream: &mut BodyReader) {
}
Ok(n) => debug!("flushed {} unread bytes", n)
}
}
}*/
impl Drop for DataStream {
fn drop(&mut self) {
kill_stream(&mut self.0.get_mut().1);
// TODO kill_stream(&mut self.0.get_mut().1);
}
}

View File

@ -3,7 +3,7 @@ use std::net::{SocketAddr, Shutdown};
use std::time::Duration;
#[cfg(feature = "tls")] use crate::http::tls::{WrappedStream, ServerSession};
use crate::http::hyper::net::{HttpStream, NetworkStream};
// TODO use http::hyper::net::{HttpStream, NetworkStream};
use self::NetStream::*;
@ -13,7 +13,7 @@ use self::NetStream::*;
// This really shouldn't be necessary, but, you know, Hyper.
#[derive(Clone)]
pub enum NetStream {
Http(HttpStream),
Http/* TODO (HttpStream) */,
#[cfg(feature = "tls")]
Https(HttpsStream),
Empty,
@ -24,7 +24,7 @@ impl io::Read for NetStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace_!("NetStream::read()");
let res = match *self {
Http(ref mut stream) => stream.read(buf),
Http/*(ref mut stream)*/ => Ok(0) /* TODO stream.read(buf)*/,
#[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf),
Empty => Ok(0),
};
@ -39,7 +39,7 @@ impl io::Write for NetStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
trace_!("NetStream::write()");
match *self {
Http(ref mut stream) => stream.write(buf),
Http/* TODO (ref mut stream) => stream.write(buf)*/ => Ok(0),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.write(buf),
Empty => Ok(0),
}
@ -48,47 +48,47 @@ impl io::Write for NetStream {
#[inline(always)]
fn flush(&mut self) -> io::Result<()> {
match *self {
Http(ref mut stream) => stream.flush(),
Http/* TODO (ref mut stream) => stream.flush()*/ => Ok(()),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.flush(),
Empty => Ok(()),
}
}
}
impl NetworkStream for NetStream {
#[inline(always)]
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
match *self {
Http(ref mut stream) => stream.peer_addr(),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(),
Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)),
}
}
#[inline(always)]
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
match *self {
Http(ref stream) => stream.set_read_timeout(dur),
#[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur),
Empty => Ok(()),
}
}
#[inline(always)]
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
match *self {
Http(ref stream) => stream.set_write_timeout(dur),
#[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur),
Empty => Ok(()),
}
}
#[inline(always)]
fn close(&mut self, how: Shutdown) -> io::Result<()> {
match *self {
Http(ref mut stream) => stream.close(how),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how),
Empty => Ok(()),
}
}
}
//impl NetworkStream for NetStream {
// #[inline(always)]
// fn peer_addr(&mut self) -> io::Result<SocketAddr> {
// match *self {
// Http/* TODO (ref mut stream) => stream.peer_addr()*/ => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)),
// #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(),
// Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)),
// }
// }
//
// #[inline(always)]
// fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
// match *self {
// Http/* TODO (ref stream) => stream.set_read_timeout(dur)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur),
// Empty => Ok(()),
// }
// }
//
// #[inline(always)]
// fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
// match *self {
// Http/* TODO (ref stream) => stream.set_write_timeout(dur)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur),
// Empty => Ok(()),
// }
// }
//
// #[inline(always)]
// fn close(&mut self, how: Shutdown) -> io::Result<()> {
// match *self {
// Http/* TODO (ref mut stream) => stream.close(how)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how),
// Empty => Ok(()),
// }
// }
//}

View File

@ -19,7 +19,7 @@ use crate::router::Route;
#[derive(Debug)]
pub enum LaunchErrorKind {
/// Binding to the provided address/port failed.
Bind(hyper::Error),
Bind(std::io::Error),
/// An I/O error occurred during launch.
Io(io::Error),
/// Route collisions were detected.
@ -124,7 +124,7 @@ impl From<hyper::Error> for LaunchError {
#[inline]
fn from(error: hyper::Error) -> LaunchError {
match error {
hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)),
// TODO hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)),
e => LaunchError::new(LaunchErrorKind::Unknown(Box::new(e)))
}
}

View File

@ -107,7 +107,9 @@ impl<'c> LocalRequest<'c> {
uri: Cow<'c, str>
) -> LocalRequest<'c> {
// We set a dummy string for now and check the user's URI on dispatch.
let request = Request::new(client.rocket(), method, Origin::dummy());
let config = &client.rocket().config;
let state = &client.rocket().state;
let request = Request::new(config, state, method, Origin::dummy());
// Set up any cookies we know about.
if let Some(ref jar) = client.cookies {
@ -408,7 +410,7 @@ impl<'c> LocalRequest<'c> {
}
// Actually dispatch the request.
let response = client.rocket().dispatch(request, Data::local(data));
let response = client.rocket().dispatch(request, Data::new(data));
// If the client is tracking cookies, updates the internal cookie jar
// with the changes reflected by `response`.

View File

@ -158,16 +158,16 @@ pub(crate) fn try_init(level: LoggingLevel, verbose: bool) -> bool {
}
push_max_level(level);
if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) {
/* if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) {
if verbose {
eprintln!("Logger failed to initialize: {}", e);
}
pop_max_level();
return false;
}
}*/
true
false
}
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};

View File

@ -3,6 +3,8 @@ use std::cell::{Cell, RefCell};
use std::net::{IpAddr, SocketAddr};
use std::fmt;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use yansi::Paint;
use state::{Container, Storage};
@ -13,7 +15,7 @@ use crate::request::{FromFormValue, FormItems, FormItem};
use crate::rocket::Rocket;
use crate::router::Route;
use crate::config::{Config, Limits};
use crate::http::{hyper, uri::{Origin, Segments}};
use crate::http::{hyper, uri::{Origin, Segments, Uri}};
use crate::http::{Method, Header, HeaderMap, Cookies};
use crate::http::{RawStr, ContentType, Accept, MediaType};
use crate::http::private::{Indexed, SmallVec, CookieJar};
@ -59,7 +61,8 @@ impl<'r> Request<'r> {
/// Create a new `Request` with the given `method` and `uri`.
#[inline(always)]
pub(crate) fn new<'s: 'r>(
rocket: &'r Rocket,
config: &'r Config,
managed: &'r Container,
method: Method,
uri: Origin<'s>
) -> Request<'r> {
@ -71,8 +74,8 @@ impl<'r> Request<'r> {
state: RequestState {
path_segments: SmallVec::new(),
query_items: None,
config: &rocket.config,
managed: &rocket.state,
config,
managed,
route: Cell::new(None),
cookies: RefCell::new(CookieJar::new()),
accept: Storage::new(),
@ -699,7 +702,7 @@ impl<'r> Request<'r> {
pub fn example<F: Fn(&mut Request<'_>)>(method: Method, uri: &str, f: F) {
let rocket = Rocket::custom(Config::development());
let uri = Origin::parse(uri).expect("invalid URI in example");
let mut request = Request::new(&rocket, method, uri);
let mut request = Request::new(&rocket.config, &rocket.state, method, uri);
f(&mut request);
}
@ -782,36 +785,42 @@ impl<'r> Request<'r> {
/// Convert from Hyper types into a Rocket Request.
pub(crate) fn from_hyp(
rocket: &'r Rocket,
h_method: hyper::Method,
h_headers: hyper::header::Headers,
h_uri: hyper::RequestUri,
h_addr: SocketAddr,
config: &'r Config,
managed: &'r Container,
request_parts: &hyper::Parts,
) -> Result<Request<'r>, String> {
// Get a copy of the URI for later use.
let uri = match h_uri {
hyper::RequestUri::AbsolutePath(s) => s,
_ => return Err(format!("Bad URI: {}", h_uri)),
};
let h_uri = &request_parts.uri;
let h_headers = &request_parts.headers;
let h_version = &request_parts.version;
let h_method = &request_parts.method;;
// if !h_uri.is_absolute() {
// return Err(format!("Bad URI: {}", h_uri));
// };
// Ensure that the method is known. TODO: Allow made-up methods?
let method = match Method::from_hyp(&h_method) {
let method = match Method::from_hyp(h_method) {
Some(method) => method,
None => return Err(format!("Invalid method: {}", h_method))
None => return Err(format!("Unknown method: {}", h_method))
};
// We need to re-parse the URI since we don't trust Hyper... :(
let uri = Origin::parse_owned(uri).map_err(|e| e.to_string())?;
let uri = Origin::parse_owned(format!("{}", h_uri)).map_err(|e| e.to_string())?;
// Construct the request object.
let mut request = Request::new(rocket, method, uri);
request.set_remote(h_addr);
let mut request = Request::new(config, managed, method, uri);
// request.set_remote(match hyp_req.remote_addr() {
// Some(remote) => remote,
// None => return Err(String::from("Missing remote address"))
// });
// Set the request cookies, if they exist.
if let Some(cookie_headers) = h_headers.get_raw("Cookie") {
let cookie_headers = h_headers.get_all("Cookie").iter();
// TODO if cookie_headers.peek().is_some() {
let mut cookie_jar = CookieJar::new();
for header in cookie_headers {
let raw_str = match std::str::from_utf8(header) {
let raw_str = match ::std::str::from_utf8(header.as_bytes()) {
Ok(string) => string,
Err(_) => continue
};
@ -824,18 +833,19 @@ impl<'r> Request<'r> {
}
request.state.cookies = RefCell::new(cookie_jar);
}
// TODO }
// Set the rest of the headers.
for hyp in h_headers.iter() {
if let Some(header_values) = h_headers.get_raw(hyp.name()) {
for value in header_values {
for (name, value) in h_headers.iter() {
// TODO if let Some(header_values) = h_headers.get_all(hyp.name()) {
// This is not totally correct since values needn't be UTF8.
let value_str = String::from_utf8_lossy(value).into_owned();
let header = Header::new(hyp.name().to_string(), value_str);
let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned();
let header = Header::new(name.to_string(), value_str);
request.add_header(header);
}
}
// TODO }
}
Ok(request)

View File

@ -1,14 +1,22 @@
use std::collections::HashMap;
use std::str::from_utf8;
use std::convert::From;
use std::str::{from_utf8, FromStr};
use std::cmp::min;
use std::io::{self, Write};
use std::time::Duration;
use std::mem;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use futures::{Future, Stream};
use futures::future::{self, FutureResult};
use yansi::Paint;
use state::Container;
use tokio::net::TcpListener;
use tokio::prelude::{Future as _, Stream as _};
#[cfg(feature = "tls")] use crate::http::tls::TlsServer;
#[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor;
use crate::{logger, handler};
use crate::ext::ReadExt;
@ -37,23 +45,59 @@ pub struct Rocket {
fairings: Fairings,
}
struct RocketArcs {
config: Arc<Config>,
router: Arc<Router>,
default_catchers: Arc<HashMap<u16, Catcher>>,
catchers: Arc<HashMap<u16, Catcher>>,
state: Arc<Container>,
fairings: Arc<Fairings>,
}
impl<Ctx> hyper::MakeService<Ctx> for RocketArcs {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::Error;
type Service = RocketHyperService;
type Future = FutureResult<Self::Service, Self::MakeError>;
type MakeError = Self::Error;
fn make_service(&mut self, _: Ctx) -> Self::Future {
future::ok(RocketHyperService::new(self))
}
}
#[derive(Clone)]
pub struct RocketHyperService {
config: Arc<Config>,
router: Arc<Router>,
default_catchers: Arc<HashMap<u16, Catcher>>,
catchers: Arc<HashMap<u16, Catcher>>,
state: Arc<Container>,
fairings: Arc<Fairings>,
}
#[doc(hidden)]
impl hyper::Handler for Rocket {
impl hyper::Service for RocketHyperService {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::Error;
//type Future = FutureResult<hyper::Response<Self::ResBody>, Self::Error>;
type Future = Box<future::Future<Item = hyper::Response<Self::ResBody>, Error = Self::Error> + Send>;
// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
// `dispatch` function, which knows nothing about Hyper. Because responding
// depends on the `HyperResponse` type, this function does the actual
// response processing.
fn handle<'h, 'k>(
&self,
hyp_req: hyper::Request<'h, 'k>,
res: hyper::FreshResponse<'h>,
) {
// Get all of the information from Hyper.
let (h_addr, h_method, h_headers, h_uri, _, h_body) = hyp_req.deconstruct();
fn call<'h>(
&mut self,
hyp_req: hyper::Request<Self::ReqBody>,
) -> Self::Future {
let (parts, body) = hyp_req.into_parts();
// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(self, h_method, h_headers, h_uri, h_addr);
let req_res = Request::from_hyp(&self.config, &self.state, &parts);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
@ -62,111 +106,42 @@ impl hyper::Handler for Rocket {
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(self, Method::Get, Origin::dummy());
let dummy = Request::new(&self.config, &self.state, Method::Get, Origin::dummy());
let r = self.handle_error(Status::BadRequest, &dummy);
return self.issue_response(r, res);
return Box::new(future::ok(hyper::Response::from(r)));
}
};
// Retrieve the data from the hyper body.
let data = match Data::from_hyp(h_body) {
Ok(data) => data,
Err(reason) => {
error_!("Bad data in request: {}", reason);
let r = self.handle_error(Status::InternalServerError, &req);
return self.issue_response(r, res);
}
};
let this = self.clone();
// Dispatch the request to get a response, then write that response out.
let response = self.dispatch(&mut req, data);
self.issue_response(response, res)
let response = body.concat2()
.map(move |chunk| {
let body = chunk.iter().rev().cloned().collect::<Vec<u8>>();
let data = Data::new(body);
// TODO: Due to life time constraints the clone of the service has been made.
// TODO: It should not be necessary but it is required to find a better solution
let mut req = Request::from_hyp(&this.config, &this.state, &parts).unwrap();
// Dispatch the request to get a response, then write that response out.
let response = this.dispatch(&mut req, data);
hyper::Response::from(response)
});
Box::new(response)
}
}
// This macro is a terrible hack to get around Hyper's Server<L> type. What we
// want is to use almost exactly the same launch code when we're serving over
// HTTPS as over HTTP. But Hyper forces two different types, so we can't use the
// same code, at least not trivially. These macros get around that by passing in
// the same code as a continuation in `$continue`. This wouldn't work as a
// regular function taking in a closure because the types of the inputs to the
// closure would be different depending on whether TLS was enabled or not.
#[cfg(not(feature = "tls"))]
macro_rules! serve {
($rocket:expr, $addr:expr, |$server:ident, $proto:ident| $continue:expr) => ({
let ($proto, $server) = ("http://", hyper::Server::http($addr));
$continue
})
}
#[cfg(feature = "tls")]
macro_rules! serve {
($rocket:expr, $addr:expr, |$server:ident, $proto:ident| $continue:expr) => ({
if let Some(tls) = $rocket.config.tls.clone() {
let tls = TlsServer::new(tls.certs, tls.key);
let ($proto, $server) = ("https://", hyper::Server::https($addr, tls));
$continue
} else {
let ($proto, $server) = ("http://", hyper::Server::http($addr));
$continue
}
})
}
impl Rocket {
#[inline]
fn issue_response(&self, response: Response<'_>, hyp_res: hyper::FreshResponse<'_>) {
match self.write_response(response, hyp_res) {
Ok(_) => info_!("{}", Paint::green("Response succeeded.")),
Err(e) => error_!("Failed to write response: {:?}.", e),
}
}
impl RocketHyperService {
#[inline]
fn write_response(
&self,
mut response: Response<'_>,
mut hyp_res: hyper::FreshResponse<'_>,
) -> io::Result<()> {
*hyp_res.status_mut() = hyper::StatusCode::from_u16(response.status().code);
for header in response.headers().iter() {
// FIXME: Using hyper here requires two allocations.
let name = header.name.into_string();
let value = Vec::from(header.value.as_bytes());
hyp_res.headers_mut().append_raw(name, value);
}
match response.body() {
None => {
hyp_res.headers_mut().set(header::ContentLength(0));
hyp_res.start()?.end()
}
Some(Body::Sized(body, size)) => {
hyp_res.headers_mut().set(header::ContentLength(size));
let mut stream = hyp_res.start()?;
io::copy(body, &mut stream)?;
stream.end()
}
Some(Body::Chunked(mut body, chunk_size)) => {
// This _might_ happen on a 32-bit machine!
if chunk_size > (usize::max_value() as u64) {
let msg = "chunk size exceeds limits of usize type";
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
// The buffer stores the current chunk being written out.
let mut buffer = vec![0; chunk_size as usize];
let mut stream = hyp_res.start()?;
loop {
match body.read_max(&mut buffer)? {
0 => break,
n => stream.write_all(&buffer[..n])?,
}
}
stream.end()
}
fn new(rocket: &RocketArcs) -> RocketHyperService {
RocketHyperService {
config: rocket.config.clone(),
router: rocket.router.clone(),
default_catchers: rocket.default_catchers.clone(),
catchers: rocket.catchers.clone(),
state: rocket.state.clone(),
fairings: rocket.fairings.clone(),
}
}
@ -331,6 +306,26 @@ impl Rocket {
default.handle(req).expect("Default 500 response.")
})
}
}
impl Rocket {
#[inline]
pub(crate) fn dispatch<'s, 'r>(
&'s self,
request: &'r mut Request<'s>,
data: Data
) -> Response<'r> {
unimplemented!("TODO")
}
pub(crate) fn handle_error<'r>(
&self,
status: Status,
req: &'r Request
) -> Response<'r> {
unimplemented!("TODO")
}
/// Create a new `Rocket` application using the configuration information in
/// `Rocket.toml`. If the file does not exist or if there is an I/O error
@ -510,6 +505,7 @@ impl Rocket {
panic!("Invalid mount point.");
}
let mut router = self.router.clone();
for mut route in routes.into() {
let path = route.uri.clone();
if let Err(e) = route.set_uri(base_uri.clone(), path) {
@ -518,9 +514,11 @@ impl Rocket {
}
info_!("{}", route);
self.router.add(route);
router.add(route);
}
self.router = router;
self
}
@ -554,6 +552,9 @@ impl Rocket {
#[inline]
pub fn register(mut self, catchers: Vec<Catcher>) -> Self {
info!("{}{}", Paint::masked("👾 "), Paint::magenta("Catchers:"));
let mut current_catchers = self.catchers.clone();
for c in catchers {
if self.catchers.get(&c.code).map_or(false, |e| !e.is_default) {
info_!("{} {}", c, Paint::yellow("(warning: duplicate catcher!)"));
@ -561,9 +562,11 @@ impl Rocket {
info_!("{}", c);
}
self.catchers.insert(c.code, c);
current_catchers.insert(c.code, c);
}
self.catchers = current_catchers;
self
}
@ -687,46 +690,90 @@ impl Rocket {
self.fairings.pretty_print_counts();
let full_addr = format!("{}:{}", self.config.address, self.config.port);
serve!(self, &full_addr, |server, proto| {
let mut server = match server {
Ok(server) => server,
Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)),
};
// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let mut runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");
// Determine the address and port we actually binded to.
match server.local_addr() {
Ok(server_addr) => self.config.port = server_addr.port(),
Err(e) => return LaunchError::from(e),
let threads = self.config.workers as usize;
let full_addr = format!("{}:{}", self.config.address, self.config.port)
.to_socket_addrs()
.expect("A valid socket address")
.next()
.unwrap();
let listener = match TcpListener::bind(&full_addr) {
Ok(listener) => listener,
Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)),
};
// Determine the address and port we actually binded to.
match listener.local_addr() {
Ok(server_addr) => /* TODO self.config.port = */ server_addr.port(),
Err(e) => return LaunchError::from(e),
};
let proto;
let incoming;
#[cfg(feature = "tls")]
{
// TODO.async: Can/should we make the clone unnecessary (by reference, or by moving out?)
if let Some(tls) = self.config.tls.clone() {
proto = "https://";
let mut config = tls::rustls::ServerConfig::new(tls::rustls::NoClientAuth::new());
config.set_single_cert(tls.certs, tls.key).expect("invalid key or certificate");
// TODO.async: I once observed an unhandled AlertReceived(UnknownCA) but
// have no idea what happened and cannot reproduce.
let config = TlsAcceptor::from(Arc::new(config));
incoming = Box::new(listener.incoming().and_then(move |stream| {
config.accept(stream)
.map(|stream| Box::new(stream))
}));
}
// Set the keep-alive.
let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64));
server.keep_alive(timeout);
// Freeze managed state for synchronization-free accesses later.
self.state.freeze();
// Run the launch fairings.
self.fairings.handle_launch(&self);
let full_addr = format!("{}:{}", self.config.address, self.config.port);
launch_info!("{}{} {}{}",
Paint::masked("🚀 "),
Paint::default("Rocket has launched from").bold(),
Paint::default(proto).bold().underline(),
Paint::default(&full_addr).bold().underline());
// Restore the log level back to what it originally was.
logger::pop_max_level();
let threads = self.config.workers as usize;
if let Err(e) = server.handle_threads(self, threads) {
return LaunchError::from(e);
else {
proto = "http://";
incoming = Box::new(listener.incoming().map(|stream| Box::new(stream)));
}
}
unreachable!("the call to `handle_threads` should block on success")
})
#[cfg(not(feature = "tls"))]
{
proto = "http://";
incoming = Box::new(listener.incoming().map(|stream| Box::new(stream)));
}
// Freeze managed state for synchronization-free accesses later.
self.state.freeze();
// Run the launch fairings.
self.fairings.handle_launch(&self);
launch_info!("{}{} {}{}",
Paint::masked("🚀 "),
Paint::default("Rocket has launched from").bold(),
Paint::default(proto).bold().underline(),
Paint::default(&full_addr).bold().underline());
// Restore the log level back to what it originally was.
logger::pop_max_level();
let arcs = RocketArcs::from(self);
// NB: executor must be passed manually here, see hyperium/hyper#1537
let server = hyper::Server::builder(incoming)
.executor(runtime.executor())
.serve(arcs);
// TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error>
runtime.block_on(server).expect("TODO.async handle error");
unreachable!("the call to `handle_threads` should block on success")
}
/// Returns an iterator over all of the routes mounted on this instance of
@ -811,3 +858,47 @@ impl Rocket {
&self.config
}
}
impl From<Rocket> for RocketArcs {
fn from(mut rocket: Rocket) -> Self {
RocketArcs {
config: Arc::new(rocket.config),
router: Arc::new(rocket.router),
default_catchers: Arc::new(rocket.default_catchers),
catchers: Arc::new(rocket.catchers),
state: Arc::new(rocket.state),
fairings: Arc::new(rocket.fairings),
}
}
}
// TODO: consider try_from here?
impl<'a> From<Response<'a>> for hyper::Response<hyper::Body> {
fn from(mut response: Response) -> Self {
let mut builder = hyper::Response::builder();
builder.status(hyper::StatusCode::from_u16(response.status().code).expect(""));
for header in response.headers().iter() {
// FIXME: Using hyper here requires two allocations.
let name = hyper::HeaderName::from_str(&header.name.into_string()).unwrap();
let value = hyper::HeaderValue::from_bytes(header.value.as_bytes()).unwrap();
builder.header(name, value);
}
match response.body() {
None => {
builder.body(hyper::Body::empty())
},
Some(Body::Sized(body, size)) => {
let mut buffer = Vec::with_capacity(size as usize);
body.read_to_end(&mut buffer);
builder.header(header::CONTENT_LENGTH, hyper::HeaderValue::from(size));
builder.body(hyper::Body::from(buffer))
},
Some(Body::Chunked(mut body, chunk_size)) => {
unimplemented!()
}
}.unwrap()
}
}

View File

@ -16,7 +16,7 @@ pub(crate) fn dummy_handler<'r>(r: &'r crate::Request<'_>, _: crate::Data) -> cr
crate::Outcome::from(r, ())
}
#[derive(Default)]
#[derive(Default, Clone)]
pub struct Router {
routes: HashMap<Selector, Vec<Route>>,
}