From c067fd396f15eb2f6c0f7e0453e452765ab7d06a Mon Sep 17 00:00:00 2001 From: Marc Schreiber Date: Sun, 19 May 2019 19:58:19 +0200 Subject: [PATCH] Upgrade to hyper 0.12: - Use hyper's MakeService implementation with futures API - Use tokio runtime to serve HTTP backend --- core/http/Cargo.toml | 4 +- core/http/src/hyper.rs | 92 ++++---- core/http/src/method.rs | 24 +- core/http/src/uri/uri.rs | 14 ++ core/lib/Cargo.toml | 2 + core/lib/src/catcher.rs | 1 + core/lib/src/data/data.rs | 127 ++--------- core/lib/src/data/data_stream.rs | 17 +- core/lib/src/data/net_stream.rs | 84 +++---- core/lib/src/error.rs | 4 +- core/lib/src/local/request.rs | 6 +- core/lib/src/logger.rs | 6 +- core/lib/src/request/request.rs | 70 +++--- core/lib/src/rocket.rs | 379 +++++++++++++++++++------------ core/lib/src/router/mod.rs | 2 +- 15 files changed, 422 insertions(+), 410 deletions(-) diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index 9a2241b9..6e550538 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -22,7 +22,9 @@ private-cookies = ["cookie/secure"] [dependencies] smallvec = "0.6" percent-encoding = "1" -hyper = { version = "0.10.13", default-features = false } +hyper = { version = "0.12.31", default-features = false, features = ["tokio"] } +http = "0.1.17" +mime = "0.3.13" time = "0.1" indexmap = "1.0" rustls = { version = "0.15", optional = true } diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index 587387dc..5b5ba964 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -4,72 +4,66 @@ //! These types will, with certainty, be removed with time, but they reside here //! while necessary. -#[doc(hidden)] pub use hyper::server::Request as Request; -#[doc(hidden)] pub use hyper::server::Response as Response; -#[doc(hidden)] pub use hyper::server::Server as Server; -#[doc(hidden)] pub use hyper::server::Handler as Handler; - -#[doc(hidden)] pub use hyper::net; - -#[doc(hidden)] pub use hyper::method::Method; -#[doc(hidden)] pub use hyper::status::StatusCode; +#[doc(hidden)] pub use hyper::{Body, Request, Response}; +#[doc(hidden)] pub use hyper::body::Payload as Payload; #[doc(hidden)] pub use hyper::error::Error; -#[doc(hidden)] pub use hyper::uri::RequestUri; -#[doc(hidden)] pub use hyper::http::h1; -#[doc(hidden)] pub use hyper::buffer; +#[doc(hidden)] pub use hyper::server::Server; +#[doc(hidden)] pub use hyper::service::{MakeService, Service}; + +#[doc(hidden)] pub use hyper::Chunk; +#[doc(hidden)] pub use http::header::HeaderName as HeaderName; +#[doc(hidden)] pub use http::header::HeaderValue as HeaderValue; +#[doc(hidden)] pub use http::method::Method; +#[doc(hidden)] pub use http::request::Parts; +#[doc(hidden)] pub use http::status::StatusCode; +#[doc(hidden)] pub use http::uri::Uri; /// Type alias to `hyper::Response<'a, hyper::net::Fresh>`. -#[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>; +// TODO #[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>; /// Reexported Hyper header types. pub mod header { use crate::Header; - use hyper::header::Header as HyperHeaderTrait; - macro_rules! import_hyper_items { ($($item:ident),*) => ($(pub use hyper::header::$item;)*) } macro_rules! import_hyper_headers { ($($name:ident),*) => ($( - impl std::convert::From for Header<'static> { - fn from(header: self::$name) -> Header<'static> { - Header::new($name::header_name(), header.to_string()) - } - } + pub use http::header::$name as $name; )*) } - import_hyper_items! { - Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges, - AccessControlAllowCredentials, AccessControlAllowHeaders, - AccessControlAllowMethods, AccessControlExposeHeaders, - AccessControlMaxAge, AccessControlRequestHeaders, - AccessControlRequestMethod, Allow, Authorization, Basic, Bearer, - CacheControl, Connection, ContentDisposition, ContentEncoding, - ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag, - EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince, - IfUnmodifiedSince, LastModified, Location, Origin, Prefer, - PreferenceApplied, Protocol, Quality, QualityItem, Referer, - StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent, - AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset, - ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType, - Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference, - ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem - } - +// import_hyper_items! { +// Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges, +// AccessControlAllowCredentials, AccessControlAllowHeaders, +// AccessControlAllowMethods, AccessControlExposeHeaders, +// AccessControlMaxAge, AccessControlRequestHeaders, +// AccessControlRequestMethod, Allow, Authorization, Basic, Bearer, +// CacheControl, Connection, ContentDisposition, ContentEncoding, +// ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag, +// EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince, +// IfUnmodifiedSince, LastModified, Location, Origin, Prefer, +// PreferenceApplied, Protocol, Quality, QualityItem, Referer, +// StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent, +// AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset, +// ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType, +// Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference, +// ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem +// } +// import_hyper_headers! { - Accept, AccessControlAllowCredentials, AccessControlAllowHeaders, - AccessControlAllowMethods, AccessControlAllowOrigin, - AccessControlExposeHeaders, AccessControlMaxAge, - AccessControlRequestHeaders, AccessControlRequestMethod, AcceptCharset, - AcceptEncoding, AcceptLanguage, AcceptRanges, Allow, CacheControl, - Connection, ContentDisposition, ContentEncoding, ContentLanguage, - ContentLength, ContentRange, Date, ETag, Expect, Expires, Host, IfMatch, - IfModifiedSince, IfNoneMatch, IfRange, IfUnmodifiedSince, LastModified, - Location, Origin, Pragma, Prefer, PreferenceApplied, Range, Referer, - ReferrerPolicy, StrictTransportSecurity, TransferEncoding, Upgrade, - UserAgent, Vary + ACCEPT, ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS, + ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, + ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE, + ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ACCEPT_CHARSET, + ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES, ALLOW, CACHE_CONTROL, + CONNECTION, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_RANGE, DATE, ETAG, EXPECT, EXPIRES, HOST, IF_MATCH, + IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED, + LOCATION, ORIGIN, PRAGMA, RANGE, REFERER, + REFERRER_POLICY, STRICT_TRANSPORT_SECURITY, TRANSFER_ENCODING, UPGRADE, + USER_AGENT, VARY } } diff --git a/core/http/src/method.rs b/core/http/src/method.rs index ce83d67a..1a4722f8 100644 --- a/core/http/src/method.rs +++ b/core/http/src/method.rs @@ -1,3 +1,5 @@ +extern crate http; + use std::fmt; use std::str::FromStr; @@ -24,18 +26,18 @@ pub enum Method { impl Method { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[doc(hidden)] - pub fn from_hyp(method: &hyper::Method) -> Option { + pub fn from_hyp(method: &http::method::Method) -> Option { match *method { - hyper::Method::Get => Some(Get), - hyper::Method::Put => Some(Put), - hyper::Method::Post => Some(Post), - hyper::Method::Delete => Some(Delete), - hyper::Method::Options => Some(Options), - hyper::Method::Head => Some(Head), - hyper::Method::Trace => Some(Trace), - hyper::Method::Connect => Some(Connect), - hyper::Method::Patch => Some(Patch), - hyper::Method::Extension(_) => None, + http::method::Method::GET => Some(Get), + http::method::Method::PUT => Some(Put), + http::method::Method::POST => Some(Post), + http::method::Method::DELETE => Some(Delete), + http::method::Method::OPTIONS => Some(Options), + http::method::Method::HEAD => Some(Head), + http::method::Method::TRACE => Some(Trace), + http::method::Method::CONNECT => Some(Connect), + http::method::Method::PATCH => Some(Patch), + _ => None, } } diff --git a/core/http/src/uri/uri.rs b/core/http/src/uri/uri.rs index 990ca5c2..3f1a4865 100644 --- a/core/http/src/uri/uri.rs +++ b/core/http/src/uri/uri.rs @@ -94,6 +94,20 @@ impl<'a> Uri<'a> { crate::parse::uri::from_str(string) } +// pub fn from_hyp(uri: &'a hyper::Uri) -> Uri<'a> { +// match uri.is_absolute() { +// true => Uri::Absolute(Absolute::new( +// uri.scheme().unwrap(), +// match uri.host() { +// Some(host) => Some(Authority::new(None, Host::Raw(host), uri.port())), +// None => None +// }, +// None +// )), +// false => Uri::Asterisk +// } +// } + /// Returns the internal instance of `Origin` if `self` is a `Uri::Origin`. /// Otherwise, returns `None`. /// diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 072a5032..2a1b3dea 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -24,8 +24,10 @@ tls = ["rocket_http/tls"] private-cookies = ["rocket_http/private-cookies"] [dependencies] +futures = "0.1" rocket_codegen = { version = "0.5.0-dev", path = "../codegen" } rocket_http = { version = "0.5.0-dev", path = "../http" } +tokio = "0.1.16" yansi = "0.5" log = "0.4" toml = "0.4.7" diff --git a/core/lib/src/catcher.rs b/core/lib/src/catcher.rs index ee86fadc..3600ae68 100644 --- a/core/lib/src/catcher.rs +++ b/core/lib/src/catcher.rs @@ -59,6 +59,7 @@ use yansi::Color::*; /// /// A function decorated with `catch` must take exactly zero or one arguments. /// If the catcher takes an argument, it must be of type [`&Request`](Request). +#[derive(Clone)] pub struct Catcher { /// The HTTP status code to match against. pub code: u16, diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index dc5e447e..d0d2f0a4 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -5,20 +5,12 @@ use std::time::Duration; #[cfg(feature = "tls")] use super::net_stream::HttpsStream; -use super::data_stream::{DataStream, kill_stream}; +use super::data_stream::{DataStream, /* TODO kill_stream */}; use super::net_stream::NetStream; use crate::ext::ReadExt; -use crate::http::hyper; -use crate::http::hyper::h1::HttpReader; -use crate::http::hyper::h1::HttpReader::*; -use crate::http::hyper::net::{HttpStream, NetworkStream}; - -pub type HyperBodyReader<'a, 'b> = - self::HttpReader<&'a mut hyper::buffer::BufReader<&'b mut dyn NetworkStream>>; - -// |---- from hyper ----| -pub type BodyReader = HttpReader>, NetStream>>; +use crate::http::hyper::{self, Payload}; +use futures::{Async, Future}; /// The number of bytes to read into the "peek" buffer. const PEEK_BYTES: usize = 512; @@ -56,9 +48,7 @@ const PEEK_BYTES: usize = 512; /// body data. This enables partially or fully reading from a `Data` object /// without consuming the `Data` object. pub struct Data { - buffer: Vec, - is_complete: bool, - stream: BodyReader, + body: Vec, } impl Data { @@ -79,62 +69,11 @@ impl Data { /// } /// ``` pub fn open(mut self) -> DataStream { - let buffer = std::mem::replace(&mut self.buffer, vec![]); - let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty); - // FIXME: Insert a `BufReader` in front of the `NetStream` with capacity // 4096. We need the new `Chain` methods to get the inner reader to // actually do this, however. - let empty_http_stream = HttpReader::SizedReader(empty_stream, 0); - let stream = std::mem::replace(&mut self.stream, empty_http_stream); - DataStream(Cursor::new(buffer).chain(stream)) - } - - // FIXME: This is absolutely terrible (downcasting!), thanks to Hyper. - pub(crate) fn from_hyp(mut body: HyperBodyReader<'_, '_>) -> Result { - #[inline(always)] - #[cfg(feature = "tls")] - fn concrete_stream(stream: &mut dyn NetworkStream) -> Option { - stream.downcast_ref::() - .map(|s| NetStream::Https(s.clone())) - .or_else(|| { - stream.downcast_ref::() - .map(|s| NetStream::Http(s.clone())) - }) - } - - #[inline(always)] - #[cfg(not(feature = "tls"))] - fn concrete_stream(stream: &mut dyn NetworkStream) -> Option { - stream.downcast_ref::() - .map(|s| NetStream::Http(s.clone())) - } - - // Retrieve the underlying Http(s)Stream from Hyper. - let net_stream = match concrete_stream(*body.get_mut().get_mut()) { - Some(net_stream) => net_stream, - None => return Err("Stream is not an HTTP(s) stream!") - }; - - // Set the read timeout to 5 seconds. - let _ = net_stream.set_read_timeout(Some(Duration::from_secs(5))); - - // Steal the internal, undecoded data buffer from Hyper. - let (mut hyper_buf, pos, cap) = body.get_mut().take_buf(); - hyper_buf.truncate(cap); // slow, but safe - let mut cursor = Cursor::new(hyper_buf); - cursor.set_position(pos as u64); - - // Create an HTTP reader from the buffer + stream. - let inner_data = cursor.chain(net_stream); - let http_stream = match body { - SizedReader(_, n) => SizedReader(inner_data, n), - EofReader(_) => EofReader(inner_data), - EmptyReader(_) => EmptyReader(inner_data), - ChunkedReader(_, n) => ChunkedReader(inner_data, n) - }; - - Ok(Data::new(http_stream)) + let stream = ::std::mem::replace(&mut self.body, vec![]); + DataStream(Cursor::new(stream)) } /// Retrieve the `peek` buffer. @@ -155,10 +94,10 @@ impl Data { /// ``` #[inline(always)] pub fn peek(&self) -> &[u8] { - if self.buffer.len() > PEEK_BYTES { - &self.buffer[..PEEK_BYTES] + if self.body.len() > PEEK_BYTES { + &self.body[..PEEK_BYTES] } else { - &self.buffer + &self.body } } @@ -179,7 +118,8 @@ impl Data { /// ``` #[inline(always)] pub fn peek_complete(&self) -> bool { - self.is_complete + // TODO self.is_complete + true } /// A helper method to write the body of the request to any `Write` type. @@ -230,49 +170,8 @@ impl Data { // bytes `vec[pos..cap]` are buffered and unread. The remainder of the data // bytes can be read from `stream`. #[inline(always)] - pub(crate) fn new(mut stream: BodyReader) -> Data { - trace_!("Data::new({:?})", stream); - let mut peek_buf: Vec = vec![0; PEEK_BYTES]; - - // Fill the buffer with as many bytes as possible. If we read less than - // that buffer's length, we know we reached the EOF. Otherwise, it's - // unclear, so we just say we didn't reach EOF. - let eof = match stream.read_max(&mut peek_buf[..]) { - Ok(n) => { - trace_!("Filled peek buf with {} bytes.", n); - // We can use `set_len` here instead of `truncate`, but we'll - // take the performance hit to avoid `unsafe`. All of this code - // should go away when we migrate away from hyper 0.10.x. - peek_buf.truncate(n); - n < PEEK_BYTES - } - Err(e) => { - error_!("Failed to read into peek buffer: {:?}.", e); - // Likewise here as above. - peek_buf.truncate(0); - false - }, - }; - - trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES); - Data { buffer: peek_buf, stream, is_complete: eof } + pub(crate) fn new(body: Vec) -> Data { + Data { body } } - /// This creates a `data` object from a local data source `data`. - #[inline] - pub(crate) fn local(data: Vec) -> Data { - let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty); - - Data { - buffer: data, - stream: HttpReader::SizedReader(empty_stream, 0), - is_complete: true, - } - } -} - -impl Drop for Data { - fn drop(&mut self) { - kill_stream(&mut self.stream); - } } diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index 772cb075..a6ec7032 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -1,12 +1,7 @@ -use std::io::{self, Read, Cursor, Chain}; +use std::io::{self, Chain, Cursor, Read, Write}; use std::net::Shutdown; -use super::data::BodyReader; -use crate::http::hyper::net::NetworkStream; -use crate::http::hyper::h1::HttpReader; - -// |-- peek buf --| -pub type InnerStream = Chain>, BodyReader>; +pub type InnerStream = Cursor>; /// Raw data stream of a request body. /// @@ -26,9 +21,9 @@ impl Read for DataStream { } } -pub fn kill_stream(stream: &mut BodyReader) { +/* pub fn kill_stream(stream: &mut BodyReader) { // Only do the expensive reading if we're not sure we're done. - use self::HttpReader::*; + // TODO use self::HttpReader::*; match *stream { SizedReader(_, n) | ChunkedReader(_, Some(n)) if n > 0 => { /* continue */ }, _ => return @@ -46,10 +41,10 @@ pub fn kill_stream(stream: &mut BodyReader) { } Ok(n) => debug!("flushed {} unread bytes", n) } -} +}*/ impl Drop for DataStream { fn drop(&mut self) { - kill_stream(&mut self.0.get_mut().1); + // TODO kill_stream(&mut self.0.get_mut().1); } } diff --git a/core/lib/src/data/net_stream.rs b/core/lib/src/data/net_stream.rs index b9a8099c..09e0762b 100644 --- a/core/lib/src/data/net_stream.rs +++ b/core/lib/src/data/net_stream.rs @@ -3,7 +3,7 @@ use std::net::{SocketAddr, Shutdown}; use std::time::Duration; #[cfg(feature = "tls")] use crate::http::tls::{WrappedStream, ServerSession}; -use crate::http::hyper::net::{HttpStream, NetworkStream}; +// TODO use http::hyper::net::{HttpStream, NetworkStream}; use self::NetStream::*; @@ -13,7 +13,7 @@ use self::NetStream::*; // This really shouldn't be necessary, but, you know, Hyper. #[derive(Clone)] pub enum NetStream { - Http(HttpStream), + Http/* TODO (HttpStream) */, #[cfg(feature = "tls")] Https(HttpsStream), Empty, @@ -24,7 +24,7 @@ impl io::Read for NetStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { trace_!("NetStream::read()"); let res = match *self { - Http(ref mut stream) => stream.read(buf), + Http/*(ref mut stream)*/ => Ok(0) /* TODO stream.read(buf)*/, #[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf), Empty => Ok(0), }; @@ -39,7 +39,7 @@ impl io::Write for NetStream { fn write(&mut self, buf: &[u8]) -> io::Result { trace_!("NetStream::write()"); match *self { - Http(ref mut stream) => stream.write(buf), + Http/* TODO (ref mut stream) => stream.write(buf)*/ => Ok(0), #[cfg(feature = "tls")] Https(ref mut stream) => stream.write(buf), Empty => Ok(0), } @@ -48,47 +48,47 @@ impl io::Write for NetStream { #[inline(always)] fn flush(&mut self) -> io::Result<()> { match *self { - Http(ref mut stream) => stream.flush(), + Http/* TODO (ref mut stream) => stream.flush()*/ => Ok(()), #[cfg(feature = "tls")] Https(ref mut stream) => stream.flush(), Empty => Ok(()), } } } -impl NetworkStream for NetStream { - #[inline(always)] - fn peer_addr(&mut self) -> io::Result { - match *self { - Http(ref mut stream) => stream.peer_addr(), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(), - Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), - } - } - - #[inline(always)] - fn set_read_timeout(&self, dur: Option) -> io::Result<()> { - match *self { - Http(ref stream) => stream.set_read_timeout(dur), - #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur), - Empty => Ok(()), - } - } - - #[inline(always)] - fn set_write_timeout(&self, dur: Option) -> io::Result<()> { - match *self { - Http(ref stream) => stream.set_write_timeout(dur), - #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur), - Empty => Ok(()), - } - } - - #[inline(always)] - fn close(&mut self, how: Shutdown) -> io::Result<()> { - match *self { - Http(ref mut stream) => stream.close(how), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how), - Empty => Ok(()), - } - } -} +//impl NetworkStream for NetStream { +// #[inline(always)] +// fn peer_addr(&mut self) -> io::Result { +// match *self { +// Http/* TODO (ref mut stream) => stream.peer_addr()*/ => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), +// #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(), +// Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), +// } +// } +// +// #[inline(always)] +// fn set_read_timeout(&self, dur: Option) -> io::Result<()> { +// match *self { +// Http/* TODO (ref stream) => stream.set_read_timeout(dur)*/ => Ok(()), +// #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur), +// Empty => Ok(()), +// } +// } +// +// #[inline(always)] +// fn set_write_timeout(&self, dur: Option) -> io::Result<()> { +// match *self { +// Http/* TODO (ref stream) => stream.set_write_timeout(dur)*/ => Ok(()), +// #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur), +// Empty => Ok(()), +// } +// } +// +// #[inline(always)] +// fn close(&mut self, how: Shutdown) -> io::Result<()> { +// match *self { +// Http/* TODO (ref mut stream) => stream.close(how)*/ => Ok(()), +// #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how), +// Empty => Ok(()), +// } +// } +//} diff --git a/core/lib/src/error.rs b/core/lib/src/error.rs index 0add2d88..614284d0 100644 --- a/core/lib/src/error.rs +++ b/core/lib/src/error.rs @@ -19,7 +19,7 @@ use crate::router::Route; #[derive(Debug)] pub enum LaunchErrorKind { /// Binding to the provided address/port failed. - Bind(hyper::Error), + Bind(std::io::Error), /// An I/O error occurred during launch. Io(io::Error), /// Route collisions were detected. @@ -124,7 +124,7 @@ impl From for LaunchError { #[inline] fn from(error: hyper::Error) -> LaunchError { match error { - hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)), + // TODO hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)), e => LaunchError::new(LaunchErrorKind::Unknown(Box::new(e))) } } diff --git a/core/lib/src/local/request.rs b/core/lib/src/local/request.rs index 06779d37..9280147a 100644 --- a/core/lib/src/local/request.rs +++ b/core/lib/src/local/request.rs @@ -107,7 +107,9 @@ impl<'c> LocalRequest<'c> { uri: Cow<'c, str> ) -> LocalRequest<'c> { // We set a dummy string for now and check the user's URI on dispatch. - let request = Request::new(client.rocket(), method, Origin::dummy()); + let config = &client.rocket().config; + let state = &client.rocket().state; + let request = Request::new(config, state, method, Origin::dummy()); // Set up any cookies we know about. if let Some(ref jar) = client.cookies { @@ -408,7 +410,7 @@ impl<'c> LocalRequest<'c> { } // Actually dispatch the request. - let response = client.rocket().dispatch(request, Data::local(data)); + let response = client.rocket().dispatch(request, Data::new(data)); // If the client is tracking cookies, updates the internal cookie jar // with the changes reflected by `response`. diff --git a/core/lib/src/logger.rs b/core/lib/src/logger.rs index a6a63562..2c525889 100644 --- a/core/lib/src/logger.rs +++ b/core/lib/src/logger.rs @@ -158,16 +158,16 @@ pub(crate) fn try_init(level: LoggingLevel, verbose: bool) -> bool { } push_max_level(level); - if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { +/* if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { if verbose { eprintln!("Logger failed to initialize: {}", e); } pop_max_level(); return false; - } + }*/ - true + false } use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; diff --git a/core/lib/src/request/request.rs b/core/lib/src/request/request.rs index d8e78b88..ddd9b767 100644 --- a/core/lib/src/request/request.rs +++ b/core/lib/src/request/request.rs @@ -3,6 +3,8 @@ use std::cell::{Cell, RefCell}; use std::net::{IpAddr, SocketAddr}; use std::fmt; use std::str; +use std::str::FromStr; +use std::sync::Arc; use yansi::Paint; use state::{Container, Storage}; @@ -13,7 +15,7 @@ use crate::request::{FromFormValue, FormItems, FormItem}; use crate::rocket::Rocket; use crate::router::Route; use crate::config::{Config, Limits}; -use crate::http::{hyper, uri::{Origin, Segments}}; +use crate::http::{hyper, uri::{Origin, Segments, Uri}}; use crate::http::{Method, Header, HeaderMap, Cookies}; use crate::http::{RawStr, ContentType, Accept, MediaType}; use crate::http::private::{Indexed, SmallVec, CookieJar}; @@ -59,7 +61,8 @@ impl<'r> Request<'r> { /// Create a new `Request` with the given `method` and `uri`. #[inline(always)] pub(crate) fn new<'s: 'r>( - rocket: &'r Rocket, + config: &'r Config, + managed: &'r Container, method: Method, uri: Origin<'s> ) -> Request<'r> { @@ -71,8 +74,8 @@ impl<'r> Request<'r> { state: RequestState { path_segments: SmallVec::new(), query_items: None, - config: &rocket.config, - managed: &rocket.state, + config, + managed, route: Cell::new(None), cookies: RefCell::new(CookieJar::new()), accept: Storage::new(), @@ -699,7 +702,7 @@ impl<'r> Request<'r> { pub fn example)>(method: Method, uri: &str, f: F) { let rocket = Rocket::custom(Config::development()); let uri = Origin::parse(uri).expect("invalid URI in example"); - let mut request = Request::new(&rocket, method, uri); + let mut request = Request::new(&rocket.config, &rocket.state, method, uri); f(&mut request); } @@ -782,36 +785,42 @@ impl<'r> Request<'r> { /// Convert from Hyper types into a Rocket Request. pub(crate) fn from_hyp( - rocket: &'r Rocket, - h_method: hyper::Method, - h_headers: hyper::header::Headers, - h_uri: hyper::RequestUri, - h_addr: SocketAddr, + config: &'r Config, + managed: &'r Container, + request_parts: &hyper::Parts, ) -> Result, String> { - // Get a copy of the URI for later use. - let uri = match h_uri { - hyper::RequestUri::AbsolutePath(s) => s, - _ => return Err(format!("Bad URI: {}", h_uri)), - }; + + let h_uri = &request_parts.uri; + let h_headers = &request_parts.headers; + let h_version = &request_parts.version; + let h_method = &request_parts.method;; + +// if !h_uri.is_absolute() { +// return Err(format!("Bad URI: {}", h_uri)); +// }; // Ensure that the method is known. TODO: Allow made-up methods? - let method = match Method::from_hyp(&h_method) { + let method = match Method::from_hyp(h_method) { Some(method) => method, - None => return Err(format!("Invalid method: {}", h_method)) + None => return Err(format!("Unknown method: {}", h_method)) }; // We need to re-parse the URI since we don't trust Hyper... :( - let uri = Origin::parse_owned(uri).map_err(|e| e.to_string())?; + let uri = Origin::parse_owned(format!("{}", h_uri)).map_err(|e| e.to_string())?; // Construct the request object. - let mut request = Request::new(rocket, method, uri); - request.set_remote(h_addr); + let mut request = Request::new(config, managed, method, uri); +// request.set_remote(match hyp_req.remote_addr() { +// Some(remote) => remote, +// None => return Err(String::from("Missing remote address")) +// }); // Set the request cookies, if they exist. - if let Some(cookie_headers) = h_headers.get_raw("Cookie") { + let cookie_headers = h_headers.get_all("Cookie").iter(); + // TODO if cookie_headers.peek().is_some() { let mut cookie_jar = CookieJar::new(); for header in cookie_headers { - let raw_str = match std::str::from_utf8(header) { + let raw_str = match ::std::str::from_utf8(header.as_bytes()) { Ok(string) => string, Err(_) => continue }; @@ -824,18 +833,19 @@ impl<'r> Request<'r> { } request.state.cookies = RefCell::new(cookie_jar); - } + // TODO } // Set the rest of the headers. - for hyp in h_headers.iter() { - if let Some(header_values) = h_headers.get_raw(hyp.name()) { - for value in header_values { + for (name, value) in h_headers.iter() { + + // TODO if let Some(header_values) = h_headers.get_all(hyp.name()) { + // This is not totally correct since values needn't be UTF8. - let value_str = String::from_utf8_lossy(value).into_owned(); - let header = Header::new(hyp.name().to_string(), value_str); + let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned(); + let header = Header::new(name.to_string(), value_str); request.add_header(header); - } - } + + // TODO } } Ok(request) diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 96c25990..3feb31b8 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -1,14 +1,22 @@ use std::collections::HashMap; -use std::str::from_utf8; +use std::convert::From; +use std::str::{from_utf8, FromStr}; use std::cmp::min; use std::io::{self, Write}; use std::time::Duration; use std::mem; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::sync::Arc; + +use futures::{Future, Stream}; +use futures::future::{self, FutureResult}; use yansi::Paint; use state::Container; +use tokio::net::TcpListener; +use tokio::prelude::{Future as _, Stream as _}; -#[cfg(feature = "tls")] use crate::http::tls::TlsServer; +#[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor; use crate::{logger, handler}; use crate::ext::ReadExt; @@ -37,23 +45,59 @@ pub struct Rocket { fairings: Fairings, } +struct RocketArcs { + config: Arc, + router: Arc, + default_catchers: Arc>, + catchers: Arc>, + state: Arc, + fairings: Arc, +} + +impl hyper::MakeService for RocketArcs { + type ReqBody = hyper::Body; + type ResBody = hyper::Body; + type Error = hyper::Error; + type Service = RocketHyperService; + type Future = FutureResult; + type MakeError = Self::Error; + + fn make_service(&mut self, _: Ctx) -> Self::Future { + future::ok(RocketHyperService::new(self)) + } +} + +#[derive(Clone)] +pub struct RocketHyperService { + config: Arc, + router: Arc, + default_catchers: Arc>, + catchers: Arc>, + state: Arc, + fairings: Arc, +} + #[doc(hidden)] -impl hyper::Handler for Rocket { +impl hyper::Service for RocketHyperService { + type ReqBody = hyper::Body; + type ResBody = hyper::Body; + type Error = hyper::Error; + //type Future = FutureResult, Self::Error>; + type Future = Box, Error = Self::Error> + Send>; + // This function tries to hide all of the Hyper-ness from Rocket. It // essentially converts Hyper types into Rocket types, then calls the // `dispatch` function, which knows nothing about Hyper. Because responding // depends on the `HyperResponse` type, this function does the actual // response processing. - fn handle<'h, 'k>( - &self, - hyp_req: hyper::Request<'h, 'k>, - res: hyper::FreshResponse<'h>, - ) { - // Get all of the information from Hyper. - let (h_addr, h_method, h_headers, h_uri, _, h_body) = hyp_req.deconstruct(); + fn call<'h>( + &mut self, + hyp_req: hyper::Request, + ) -> Self::Future { + let (parts, body) = hyp_req.into_parts(); // Convert the Hyper request into a Rocket request. - let req_res = Request::from_hyp(self, h_method, h_headers, h_uri, h_addr); + let req_res = Request::from_hyp(&self.config, &self.state, &parts); let mut req = match req_res { Ok(req) => req, Err(e) => { @@ -62,111 +106,42 @@ impl hyper::Handler for Rocket { // fabricate one. This is weird. We should let the user know // that we failed to parse a request (by invoking some special // handler) instead of doing this. - let dummy = Request::new(self, Method::Get, Origin::dummy()); + let dummy = Request::new(&self.config, &self.state, Method::Get, Origin::dummy()); let r = self.handle_error(Status::BadRequest, &dummy); - return self.issue_response(r, res); + return Box::new(future::ok(hyper::Response::from(r))); } }; - // Retrieve the data from the hyper body. - let data = match Data::from_hyp(h_body) { - Ok(data) => data, - Err(reason) => { - error_!("Bad data in request: {}", reason); - let r = self.handle_error(Status::InternalServerError, &req); - return self.issue_response(r, res); - } - }; + let this = self.clone(); - // Dispatch the request to get a response, then write that response out. - let response = self.dispatch(&mut req, data); - self.issue_response(response, res) + let response = body.concat2() + .map(move |chunk| { + let body = chunk.iter().rev().cloned().collect::>(); + let data = Data::new(body); + + // TODO: Due to life time constraints the clone of the service has been made. + // TODO: It should not be necessary but it is required to find a better solution + let mut req = Request::from_hyp(&this.config, &this.state, &parts).unwrap(); + // Dispatch the request to get a response, then write that response out. + let response = this.dispatch(&mut req, data); + hyper::Response::from(response) + }); + + Box::new(response) } } -// This macro is a terrible hack to get around Hyper's Server type. What we -// want is to use almost exactly the same launch code when we're serving over -// HTTPS as over HTTP. But Hyper forces two different types, so we can't use the -// same code, at least not trivially. These macros get around that by passing in -// the same code as a continuation in `$continue`. This wouldn't work as a -// regular function taking in a closure because the types of the inputs to the -// closure would be different depending on whether TLS was enabled or not. -#[cfg(not(feature = "tls"))] -macro_rules! serve { - ($rocket:expr, $addr:expr, |$server:ident, $proto:ident| $continue:expr) => ({ - let ($proto, $server) = ("http://", hyper::Server::http($addr)); - $continue - }) -} - -#[cfg(feature = "tls")] -macro_rules! serve { - ($rocket:expr, $addr:expr, |$server:ident, $proto:ident| $continue:expr) => ({ - if let Some(tls) = $rocket.config.tls.clone() { - let tls = TlsServer::new(tls.certs, tls.key); - let ($proto, $server) = ("https://", hyper::Server::https($addr, tls)); - $continue - } else { - let ($proto, $server) = ("http://", hyper::Server::http($addr)); - $continue - } - }) -} - -impl Rocket { - #[inline] - fn issue_response(&self, response: Response<'_>, hyp_res: hyper::FreshResponse<'_>) { - match self.write_response(response, hyp_res) { - Ok(_) => info_!("{}", Paint::green("Response succeeded.")), - Err(e) => error_!("Failed to write response: {:?}.", e), - } - } +impl RocketHyperService { #[inline] - fn write_response( - &self, - mut response: Response<'_>, - mut hyp_res: hyper::FreshResponse<'_>, - ) -> io::Result<()> { - *hyp_res.status_mut() = hyper::StatusCode::from_u16(response.status().code); - - for header in response.headers().iter() { - // FIXME: Using hyper here requires two allocations. - let name = header.name.into_string(); - let value = Vec::from(header.value.as_bytes()); - hyp_res.headers_mut().append_raw(name, value); - } - - match response.body() { - None => { - hyp_res.headers_mut().set(header::ContentLength(0)); - hyp_res.start()?.end() - } - Some(Body::Sized(body, size)) => { - hyp_res.headers_mut().set(header::ContentLength(size)); - let mut stream = hyp_res.start()?; - io::copy(body, &mut stream)?; - stream.end() - } - Some(Body::Chunked(mut body, chunk_size)) => { - // This _might_ happen on a 32-bit machine! - if chunk_size > (usize::max_value() as u64) { - let msg = "chunk size exceeds limits of usize type"; - return Err(io::Error::new(io::ErrorKind::Other, msg)); - } - - // The buffer stores the current chunk being written out. - let mut buffer = vec![0; chunk_size as usize]; - let mut stream = hyp_res.start()?; - loop { - match body.read_max(&mut buffer)? { - 0 => break, - n => stream.write_all(&buffer[..n])?, - } - } - - stream.end() - } + fn new(rocket: &RocketArcs) -> RocketHyperService { + RocketHyperService { + config: rocket.config.clone(), + router: rocket.router.clone(), + default_catchers: rocket.default_catchers.clone(), + catchers: rocket.catchers.clone(), + state: rocket.state.clone(), + fairings: rocket.fairings.clone(), } } @@ -331,6 +306,26 @@ impl Rocket { default.handle(req).expect("Default 500 response.") }) } +} + +impl Rocket { + + #[inline] + pub(crate) fn dispatch<'s, 'r>( + &'s self, + request: &'r mut Request<'s>, + data: Data + ) -> Response<'r> { + unimplemented!("TODO") + } + + pub(crate) fn handle_error<'r>( + &self, + status: Status, + req: &'r Request + ) -> Response<'r> { + unimplemented!("TODO") + } /// Create a new `Rocket` application using the configuration information in /// `Rocket.toml`. If the file does not exist or if there is an I/O error @@ -510,6 +505,7 @@ impl Rocket { panic!("Invalid mount point."); } + let mut router = self.router.clone(); for mut route in routes.into() { let path = route.uri.clone(); if let Err(e) = route.set_uri(base_uri.clone(), path) { @@ -518,9 +514,11 @@ impl Rocket { } info_!("{}", route); - self.router.add(route); + router.add(route); } + self.router = router; + self } @@ -554,6 +552,9 @@ impl Rocket { #[inline] pub fn register(mut self, catchers: Vec) -> Self { info!("{}{}", Paint::masked("👾 "), Paint::magenta("Catchers:")); + + let mut current_catchers = self.catchers.clone(); + for c in catchers { if self.catchers.get(&c.code).map_or(false, |e| !e.is_default) { info_!("{} {}", c, Paint::yellow("(warning: duplicate catcher!)")); @@ -561,9 +562,11 @@ impl Rocket { info_!("{}", c); } - self.catchers.insert(c.code, c); + current_catchers.insert(c.code, c); } + self.catchers = current_catchers; + self } @@ -687,46 +690,90 @@ impl Rocket { self.fairings.pretty_print_counts(); - let full_addr = format!("{}:{}", self.config.address, self.config.port); - serve!(self, &full_addr, |server, proto| { - let mut server = match server { - Ok(server) => server, - Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)), - }; + // TODO.async What meaning should config.workers have now? + // Initialize the tokio runtime + let mut runtime = tokio::runtime::Builder::new() + .core_threads(self.config.workers as usize) + .build() + .expect("Cannot build runtime!"); - // Determine the address and port we actually binded to. - match server.local_addr() { - Ok(server_addr) => self.config.port = server_addr.port(), - Err(e) => return LaunchError::from(e), + let threads = self.config.workers as usize; + + let full_addr = format!("{}:{}", self.config.address, self.config.port) + .to_socket_addrs() + .expect("A valid socket address") + .next() + .unwrap(); + + let listener = match TcpListener::bind(&full_addr) { + Ok(listener) => listener, + Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)), + }; + + // Determine the address and port we actually binded to. + match listener.local_addr() { + Ok(server_addr) => /* TODO self.config.port = */ server_addr.port(), + Err(e) => return LaunchError::from(e), + }; + + let proto; + let incoming; + + #[cfg(feature = "tls")] + { + // TODO.async: Can/should we make the clone unnecessary (by reference, or by moving out?) + if let Some(tls) = self.config.tls.clone() { + proto = "https://"; + let mut config = tls::rustls::ServerConfig::new(tls::rustls::NoClientAuth::new()); + config.set_single_cert(tls.certs, tls.key).expect("invalid key or certificate"); + + // TODO.async: I once observed an unhandled AlertReceived(UnknownCA) but + // have no idea what happened and cannot reproduce. + let config = TlsAcceptor::from(Arc::new(config)); + + incoming = Box::new(listener.incoming().and_then(move |stream| { + config.accept(stream) + .map(|stream| Box::new(stream)) + })); } - - // Set the keep-alive. - let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64)); - server.keep_alive(timeout); - - // Freeze managed state for synchronization-free accesses later. - self.state.freeze(); - - // Run the launch fairings. - self.fairings.handle_launch(&self); - - let full_addr = format!("{}:{}", self.config.address, self.config.port); - launch_info!("{}{} {}{}", - Paint::masked("🚀 "), - Paint::default("Rocket has launched from").bold(), - Paint::default(proto).bold().underline(), - Paint::default(&full_addr).bold().underline()); - - // Restore the log level back to what it originally was. - logger::pop_max_level(); - - let threads = self.config.workers as usize; - if let Err(e) = server.handle_threads(self, threads) { - return LaunchError::from(e); + else { + proto = "http://"; + incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); } + } - unreachable!("the call to `handle_threads` should block on success") - }) + #[cfg(not(feature = "tls"))] + { + proto = "http://"; + incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); + } + + // Freeze managed state for synchronization-free accesses later. + self.state.freeze(); + + // Run the launch fairings. + self.fairings.handle_launch(&self); + + launch_info!("{}{} {}{}", + Paint::masked("🚀 "), + Paint::default("Rocket has launched from").bold(), + Paint::default(proto).bold().underline(), + Paint::default(&full_addr).bold().underline()); + + // Restore the log level back to what it originally was. + logger::pop_max_level(); + + let arcs = RocketArcs::from(self); + + // NB: executor must be passed manually here, see hyperium/hyper#1537 + let server = hyper::Server::builder(incoming) + .executor(runtime.executor()) + .serve(arcs); + + // TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error> + runtime.block_on(server).expect("TODO.async handle error"); + + unreachable!("the call to `handle_threads` should block on success") } /// Returns an iterator over all of the routes mounted on this instance of @@ -811,3 +858,47 @@ impl Rocket { &self.config } } + +impl From for RocketArcs { + fn from(mut rocket: Rocket) -> Self { + RocketArcs { + config: Arc::new(rocket.config), + router: Arc::new(rocket.router), + default_catchers: Arc::new(rocket.default_catchers), + catchers: Arc::new(rocket.catchers), + state: Arc::new(rocket.state), + fairings: Arc::new(rocket.fairings), + } + } +} + +// TODO: consider try_from here? +impl<'a> From> for hyper::Response { + fn from(mut response: Response) -> Self { + + let mut builder = hyper::Response::builder(); + builder.status(hyper::StatusCode::from_u16(response.status().code).expect("")); + + for header in response.headers().iter() { + // FIXME: Using hyper here requires two allocations. + let name = hyper::HeaderName::from_str(&header.name.into_string()).unwrap(); + let value = hyper::HeaderValue::from_bytes(header.value.as_bytes()).unwrap(); + builder.header(name, value); + } + + match response.body() { + None => { + builder.body(hyper::Body::empty()) + }, + Some(Body::Sized(body, size)) => { + let mut buffer = Vec::with_capacity(size as usize); + body.read_to_end(&mut buffer); + builder.header(header::CONTENT_LENGTH, hyper::HeaderValue::from(size)); + builder.body(hyper::Body::from(buffer)) + }, + Some(Body::Chunked(mut body, chunk_size)) => { + unimplemented!() + } + }.unwrap() + } +} diff --git a/core/lib/src/router/mod.rs b/core/lib/src/router/mod.rs index 36c54e6a..38700299 100644 --- a/core/lib/src/router/mod.rs +++ b/core/lib/src/router/mod.rs @@ -16,7 +16,7 @@ pub(crate) fn dummy_handler<'r>(r: &'r crate::Request<'_>, _: crate::Data) -> cr crate::Outcome::from(r, ()) } -#[derive(Default)] +#[derive(Default, Clone)] pub struct Router { routes: HashMap>, }