diff --git a/core/codegen/src/attribute/catch.rs b/core/codegen/src/attribute/catch.rs index 465cf2ec..cf519365 100644 --- a/core/codegen/src/attribute/catch.rs +++ b/core/codegen/src/attribute/catch.rs @@ -51,7 +51,7 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result { let status_code = status.0.code; // Variables names we'll use and reuse. - define_vars_and_mods!(req, catcher, response, Request, Response); + define_vars_and_mods!(req, catcher, Request, Response, ErrorHandlerFuture); // Determine the number of parameters that will be passed in. let (fn_sig, inputs) = match catch.function.sig.inputs.len() { @@ -84,12 +84,14 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result { /// Rocket code generated wrapping catch function. #[doc(hidden)] - #vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #response::Result<'_b> { - let __response = #catcher_response; - #Response::build() - .status(#status) - .merge(__response) - .ok() + #vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #ErrorHandlerFuture<'_b> { + Box::pin(async move { + let __response = #catcher_response; + #Response::build() + .status(#status) + .merge(__response) + .ok() + }) } /// Rocket code generated static catcher info. diff --git a/core/codegen/src/attribute/route.rs b/core/codegen/src/attribute/route.rs index 6de82a40..2be3853a 100644 --- a/core/codegen/src/attribute/route.rs +++ b/core/codegen/src/attribute/route.rs @@ -178,7 +178,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 { define_vars_and_mods!(req, data, FromData, Outcome, Transform); let span = ident.span().unstable().join(ty.span()).unwrap().into(); quote_spanned! { span => - let __transform = <#ty as #FromData>::transform(#req, #data); + let __transform = <#ty as #FromData>::transform(#req, #data).await; #[allow(unreachable_patterns, unreachable_code)] let __outcome = match __transform { @@ -195,7 +195,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 { }; #[allow(non_snake_case, unreachable_patterns, unreachable_code)] - let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome) { + let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome).await { #Outcome::Success(__d) => __d, #Outcome::Forward(__d) => return #Outcome::Forward(__d), #Outcome::Failure((__c, _)) => return #Outcome::Failure(__c), @@ -369,8 +369,18 @@ fn generate_respond_expr(route: &Route) -> TokenStream2 { let parameter_names = route.inputs.iter() .map(|(_, rocket_ident, _)| rocket_ident); + let responder_stmt = if route.function.sig.asyncness.is_some() { + quote_spanned! { ret_span => + let ___responder = #user_handler_fn_name(#(#parameter_names),*).await; + } + } else { + quote_spanned! { ret_span => + let ___responder = #user_handler_fn_name(#(#parameter_names),*); + } + }; + quote_spanned! { ret_span => - let ___responder = #user_handler_fn_name(#(#parameter_names),*); + #responder_stmt #handler::Outcome::from(#req, ___responder) } } @@ -403,7 +413,7 @@ fn codegen_route(route: Route) -> Result { } // Gather everything we need. - define_vars_and_mods!(req, data, handler, Request, Data, StaticRouteInfo); + define_vars_and_mods!(req, data, Request, Data, StaticRouteInfo, HandlerFuture); let (vis, user_handler_fn) = (&route.function.vis, &route.function); let user_handler_fn_name = &user_handler_fn.sig.ident; let generated_fn_name = user_handler_fn_name.prepend(ROUTE_FN_PREFIX); @@ -424,12 +434,14 @@ fn codegen_route(route: Route) -> Result { #vis fn #generated_fn_name<'_b>( #req: &'_b #Request, #data: #Data - ) -> #handler::Outcome<'_b> { - #(#req_guard_definitions)* - #(#parameter_definitions)* - #data_stmt + ) -> #HandlerFuture<'_b> { + Box::pin(async move { + #(#req_guard_definitions)* + #(#parameter_definitions)* + #data_stmt - #generated_respond_expr + #generated_respond_expr + }) } /// Rocket code generated wrapping URI macro. diff --git a/core/codegen/src/lib.rs b/core/codegen/src/lib.rs index 964cbaab..48336a4b 100644 --- a/core/codegen/src/lib.rs +++ b/core/codegen/src/lib.rs @@ -1,4 +1,5 @@ #![feature(proc_macro_diagnostic, proc_macro_span)] +#![feature(async_await)] #![recursion_limit="128"] #![doc(html_root_url = "https://api.rocket.rs/v0.5")] @@ -96,12 +97,8 @@ vars_and_mods! { Data => rocket::Data, StaticRouteInfo => rocket::StaticRouteInfo, SmallVec => rocket::http::private::SmallVec, - _Option => ::std::option::Option, - _Result => ::std::result::Result, - _Some => ::std::option::Option::Some, - _None => ::std::option::Option::None, - _Ok => ::std::result::Result::Ok, - _Err => ::std::result::Result::Err, + HandlerFuture => rocket::handler::HandlerFuture, + ErrorHandlerFuture => rocket::handler::ErrorHandlerFuture, } macro_rules! define_vars_and_mods { diff --git a/core/codegen/tests/route.rs b/core/codegen/tests/route.rs index bdc92e25..8afcea85 100644 --- a/core/codegen/tests/route.rs +++ b/core/codegen/tests/route.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] // Rocket sometimes generates mangled identifiers that activate the // non_snake_case lint. We deny the lint in this test to ensure that diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index 1ef5b6cb..992482b7 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -16,27 +16,22 @@ edition = "2018" [features] default = [] -tls = ["rustls", "hyper-sync-rustls"] +tls = ["tokio-rustls"] private-cookies = ["cookie/private", "cookie/key-expansion"] [dependencies] smallvec = "1.0" percent-encoding = "1" -hyper = { version = "0.12.31", default-features = false, features = ["tokio"] } +hyper = { version = "0.12.31", default-features = false, features = ["runtime"] } http = "0.1.17" mime = "0.3.13" time = "0.2.11" indexmap = "1.0" -rustls = { version = ">=0.16, <=0.17", optional = true } state = "0.4" +tokio-rustls = { version = "0.10.3", optional = true } cookie = { version = "0.14.0", features = ["percent-encode"] } pear = "0.1" unicode-xid = "0.2" -[dependencies.hyper-sync-rustls] -version = ">=0.3.0-rc.6, <=0.3.0-rc.17" -features = ["server"] -optional = true - [dev-dependencies] rocket = { version = "0.5.0-dev", path = "../lib" } diff --git a/core/http/src/cookies.rs b/core/http/src/cookies.rs index 112314f5..73eb65a1 100644 --- a/core/http/src/cookies.rs +++ b/core/http/src/cookies.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::cell::RefMut; use crate::Header; use cookie::Delta; @@ -129,7 +128,7 @@ mod key { /// 32`. pub enum Cookies<'a> { #[doc(hidden)] - Jarred(RefMut<'a, CookieJar>, &'a Key), + Jarred(CookieJar, &'a Key, Box), #[doc(hidden)] Empty(CookieJar) } @@ -138,8 +137,8 @@ impl<'a> Cookies<'a> { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[inline] #[doc(hidden)] - pub fn new(jar: RefMut<'a, CookieJar>, key: &'a Key) -> Cookies<'a> { - Cookies::Jarred(jar, key) + pub fn new(jar: CookieJar, key: &'a Key, on_drop: F) -> Cookies<'a> { + Cookies::Jarred(jar, key, Box::new(on_drop)) } /// WARNING: This is unstable! Do not use this method outside of Rocket! @@ -161,7 +160,7 @@ impl<'a> Cookies<'a> { #[inline] #[doc(hidden)] pub fn add_original(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.add_original(cookie) } } @@ -181,7 +180,7 @@ impl<'a> Cookies<'a> { /// ``` pub fn get(&self, name: &str) -> Option<&Cookie<'static>> { match *self { - Cookies::Jarred(ref jar, _) => jar.get(name), + Cookies::Jarred(ref jar, _, _) => jar.get(name), Cookies::Empty(_) => None } } @@ -206,7 +205,7 @@ impl<'a> Cookies<'a> { /// } /// ``` pub fn add(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.add(cookie) } } @@ -232,7 +231,7 @@ impl<'a> Cookies<'a> { /// } /// ``` pub fn remove(&mut self, cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, _) = *self { + if let Cookies::Jarred(ref mut jar, _, _) = *self { jar.remove(cookie) } } @@ -243,7 +242,7 @@ impl<'a> Cookies<'a> { #[doc(hidden)] pub fn reset_delta(&mut self) { match *self { - Cookies::Jarred(ref mut jar, _) => jar.reset_delta(), + Cookies::Jarred(ref mut jar, ..) => jar.reset_delta(), Cookies::Empty(ref mut jar) => jar.reset_delta() } } @@ -264,7 +263,7 @@ impl<'a> Cookies<'a> { /// ``` pub fn iter(&self) -> impl Iterator> { match *self { - Cookies::Jarred(ref jar, _) => jar.iter(), + Cookies::Jarred(ref jar, _, _) => jar.iter(), Cookies::Empty(ref jar) => jar.iter() } } @@ -274,12 +273,22 @@ impl<'a> Cookies<'a> { #[doc(hidden)] pub fn delta(&self) -> Delta<'_> { match *self { - Cookies::Jarred(ref jar, _) => jar.delta(), + Cookies::Jarred(ref jar, _, _) => jar.delta(), Cookies::Empty(ref jar) => jar.delta() } } } +impl<'a> Drop for Cookies<'a> { + fn drop(&mut self) { + if let Cookies::Jarred(ref mut jar, _, ref mut on_drop) = *self { + let jar = std::mem::replace(jar, CookieJar::new()); + let on_drop = std::mem::replace(on_drop, Box::new(|_| {})); + on_drop(jar); + } + } +} + #[cfg(feature = "private-cookies")] impl Cookies<'_> { /// Returns a reference to the `Cookie` inside this collection with the name @@ -302,7 +311,7 @@ impl Cookies<'_> { /// ``` pub fn get_private(&mut self, name: &str) -> Option> { match *self { - Cookies::Jarred(ref mut jar, key) => jar.private(key).get(name), + Cookies::Jarred(ref mut jar, key, _) => jar.private(key).get(name), Cookies::Empty(_) => None } } @@ -338,7 +347,7 @@ impl Cookies<'_> { /// } /// ``` pub fn add_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { Cookies::set_private_defaults(&mut cookie); jar.private(key).add(cookie) } @@ -348,7 +357,7 @@ impl Cookies<'_> { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[doc(hidden)] pub fn add_original_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { Cookies::set_private_defaults(&mut cookie); jar.private(key).add_original(cookie) } @@ -402,7 +411,7 @@ impl Cookies<'_> { /// } /// ``` pub fn remove_private(&mut self, mut cookie: Cookie<'static>) { - if let Cookies::Jarred(ref mut jar, key) = *self { + if let Cookies::Jarred(ref mut jar, key, _) = *self { if cookie.path().is_none() { cookie.set_path("/"); } @@ -415,7 +424,7 @@ impl Cookies<'_> { impl fmt::Debug for Cookies<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - Cookies::Jarred(ref jar, _) => jar.fmt(f), + Cookies::Jarred(ref jar, _, _) => jar.fmt(f), Cookies::Empty(ref jar) => jar.fmt(f) } } diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index 5b5ba964..143afcdc 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -4,66 +4,44 @@ //! These types will, with certainty, be removed with time, but they reside here //! while necessary. -#[doc(hidden)] pub use hyper::{Body, Request, Response}; +#[doc(hidden)] pub use hyper::{Body, Request, Response, Server}; #[doc(hidden)] pub use hyper::body::Payload as Payload; #[doc(hidden)] pub use hyper::error::Error; -#[doc(hidden)] pub use hyper::server::Server; -#[doc(hidden)] pub use hyper::service::{MakeService, Service}; +#[doc(hidden)] pub use hyper::service::{make_service_fn, MakeService, Service}; +#[doc(hidden)] pub use hyper::server::conn::{AddrIncoming, AddrStream}; #[doc(hidden)] pub use hyper::Chunk; +#[doc(hidden)] pub use http::header::HeaderMap; #[doc(hidden)] pub use http::header::HeaderName as HeaderName; #[doc(hidden)] pub use http::header::HeaderValue as HeaderValue; #[doc(hidden)] pub use http::method::Method; -#[doc(hidden)] pub use http::request::Parts; +#[doc(hidden)] pub use http::request::Parts as RequestParts; +#[doc(hidden)] pub use http::response::Builder as ResponseBuilder; #[doc(hidden)] pub use http::status::StatusCode; #[doc(hidden)] pub use http::uri::Uri; -/// Type alias to `hyper::Response<'a, hyper::net::Fresh>`. -// TODO #[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>; - -/// Reexported Hyper header types. +/// Reexported http header types. pub mod header { - use crate::Header; - - macro_rules! import_hyper_items { - ($($item:ident),*) => ($(pub use hyper::header::$item;)*) - } - - macro_rules! import_hyper_headers { + macro_rules! import_http_headers { ($($name:ident),*) => ($( pub use http::header::$name as $name; )*) } -// import_hyper_items! { -// Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges, -// AccessControlAllowCredentials, AccessControlAllowHeaders, -// AccessControlAllowMethods, AccessControlExposeHeaders, -// AccessControlMaxAge, AccessControlRequestHeaders, -// AccessControlRequestMethod, Allow, Authorization, Basic, Bearer, -// CacheControl, Connection, ContentDisposition, ContentEncoding, -// ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag, -// EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince, -// IfUnmodifiedSince, LastModified, Location, Origin, Prefer, -// PreferenceApplied, Protocol, Quality, QualityItem, Referer, -// StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent, -// AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset, -// ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType, -// Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference, -// ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem -// } -// - import_hyper_headers! { - ACCEPT, ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS, + import_http_headers! { + ACCEPT, ACCEPT_CHARSET, ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES, + ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE, - ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ACCEPT_CHARSET, - ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES, ALLOW, CACHE_CONTROL, - CONNECTION, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, - CONTENT_LENGTH, CONTENT_RANGE, DATE, ETAG, EXPECT, EXPIRES, HOST, IF_MATCH, - IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED, - LOCATION, ORIGIN, PRAGMA, RANGE, REFERER, - REFERRER_POLICY, STRICT_TRANSPORT_SECURITY, TRANSFER_ENCODING, UPGRADE, - USER_AGENT, VARY + ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ALLOW, + AUTHORIZATION, CACHE_CONTROL, CONNECTION, CONTENT_DISPOSITION, + CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_LOCATION, + CONTENT_RANGE, CONTENT_SECURITY_POLICY, + CONTENT_SECURITY_POLICY_REPORT_ONLY, CONTENT_TYPE, DATE, ETAG, EXPECT, + EXPIRES, FORWARDED, FROM, HOST, IF_MATCH, IF_MODIFIED_SINCE, + IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED, LINK, + LOCATION, ORIGIN, PRAGMA, RANGE, REFERER, REFERRER_POLICY, REFRESH, + STRICT_TRANSPORT_SECURITY, TE, TRANSFER_ENCODING, UPGRADE, USER_AGENT, + VARY } } diff --git a/core/http/src/method.rs b/core/http/src/method.rs index 1a4722f8..f8b58255 100644 --- a/core/http/src/method.rs +++ b/core/http/src/method.rs @@ -1,9 +1,7 @@ -extern crate http; - use std::fmt; use std::str::FromStr; -use crate::{hyper, uncased::uncased_eq}; +use crate::uncased::uncased_eq; use self::Method::*; diff --git a/core/http/src/tls.rs b/core/http/src/tls.rs index b0311be8..5e236d9b 100644 --- a/core/http/src/tls.rs +++ b/core/http/src/tls.rs @@ -1,2 +1,8 @@ -pub use hyper_sync_rustls::{util, WrappedStream, ServerSession, TlsServer}; -pub use rustls::{Certificate, PrivateKey}; +pub use tokio_rustls::TlsAcceptor; +pub use tokio_rustls::rustls; + +pub use rustls::internal::pemfile; +pub use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig}; + +// TODO.async: extract from hyper-sync-rustls some convenience +// functions to load certs and keys diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 092dfd58..e62a0ac7 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -24,12 +24,12 @@ tls = ["rocket_http/tls"] private-cookies = ["rocket_http/private-cookies"] [dependencies] -futures = "0.1" rocket_codegen = { version = "0.5.0-dev", path = "../codegen" } rocket_http = { version = "0.5.0-dev", path = "../http" } +futures-preview = { version = "0.3.0-alpha.14", features = ["compat", "io-compat"] } tokio = "0.1.16" yansi = "0.5" -log = "0.4" +log = { version = "0.4", features = ["std"] } toml = "0.4.7" num_cpus = "1.0" state = "0.4.1" diff --git a/core/lib/build.rs b/core/lib/build.rs index 0f71316d..b6ca3322 100644 --- a/core/lib/build.rs +++ b/core/lib/build.rs @@ -3,8 +3,8 @@ use yansi::{Paint, Color::{Red, Yellow, Blue}}; // Specifies the minimum nightly version needed to compile Rocket. -const MIN_DATE: &'static str = "2019-04-05"; -const MIN_VERSION: &'static str = "1.35.0-nightly"; +const MIN_DATE: &'static str = "2019-07-03"; +const MIN_VERSION: &'static str = "1.37.0-nightly"; macro_rules! err { ($version:expr, $date:expr, $msg:expr) => ( diff --git a/core/lib/src/catcher.rs b/core/lib/src/catcher.rs index 80b3c49c..f27ebb72 100644 --- a/core/lib/src/catcher.rs +++ b/core/lib/src/catcher.rs @@ -1,3 +1,5 @@ +use futures::future::Future; + use crate::response; use crate::handler::ErrorHandler; use crate::codegen::StaticCatchInfo; @@ -59,7 +61,6 @@ use yansi::Color::*; /// /// A function decorated with `catch` must take exactly zero or one arguments. /// If the catcher takes an argument, it must be of type [`&Request`](Request). -#[derive(Clone)] pub struct Catcher { /// The HTTP status code to match against. pub code: u16, @@ -99,7 +100,7 @@ impl Catcher { } #[inline(always)] - pub(crate) fn handle<'r>(&self, req: &'r Request<'_>) -> response::Result<'r> { + pub(crate) fn handle<'r>(&self, req: &'r Request<'_>) -> impl Future> { (self.handler)(req) } @@ -152,10 +153,12 @@ macro_rules! default_catchers { let mut map = HashMap::new(); $( - fn $fn_name<'r>(req: &'r Request<'_>) -> response::Result<'r> { - status::Custom(Status::from_code($code).unwrap(), - content::Html(error_page_template!($code, $name, $description)) - ).respond_to(req) + fn $fn_name<'r>(req: &'r Request<'_>) -> std::pin::Pin> + Send + 'r>> { + (async move { + status::Custom(Status::from_code($code).unwrap(), + content::Html(error_page_template!($code, $name, $description)) + ).respond_to(req) + }).boxed() } map.insert($code, Catcher::new_default($code, $fn_name)); @@ -167,6 +170,7 @@ macro_rules! default_catchers { pub mod defaults { use super::Catcher; + use futures::future::FutureExt; use std::collections::HashMap; diff --git a/core/lib/src/codegen.rs b/core/lib/src/codegen.rs index 276eea1a..894cf854 100644 --- a/core/lib/src/codegen.rs +++ b/core/lib/src/codegen.rs @@ -1,9 +1,11 @@ +use futures::future::Future; + use crate::{Request, Data}; use crate::handler::{Outcome, ErrorHandler}; use crate::http::{Method, MediaType}; /// Type of a static handler, which users annotate with Rocket's attribute. -pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> Outcome<'r>; +pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> std::pin::Pin> + Send + 'r>>; /// Information generated by the `route` attribute during codegen. pub struct StaticRouteInfo { diff --git a/core/lib/src/config/config.rs b/core/lib/src/config/config.rs index de2f8d6d..af5e417d 100644 --- a/core/lib/src/config/config.rs +++ b/core/lib/src/config/config.rs @@ -574,23 +574,33 @@ impl Config { /// ``` #[cfg(feature = "tls")] pub fn set_tls(&mut self, certs_path: &str, key_path: &str) -> Result<()> { - use crate::http::tls::util::{self, Error}; + use crate::http::tls::pemfile::{certs, rsa_private_keys}; + use std::fs::File; + use std::io::BufReader; let pem_err = "malformed PEM file"; + // TODO.async: Fully copy from hyper-sync-rustls, move to http/src/tls + // Partially extracted from hyper-sync-rustls + // Load the certificates. - let certs = util::load_certs(self.root_relative(certs_path)) - .map_err(|e| match e { - Error::Io(e) => ConfigError::Io(e, "tls.certs"), - _ => self.bad_type("tls", pem_err, "a valid certificates file") - })?; + let certs = match File::open(self.root_relative(certs_path)) { + Ok(file) => certs(&mut BufReader::new(file)).map_err(|_| { + self.bad_type("tls", pem_err, "a valid certificates file") + }), + Err(e) => Err(ConfigError::Io(e, "tls.certs"))?, + }?; // And now the private key. - let key = util::load_private_key(self.root_relative(key_path)) - .map_err(|e| match e { - Error::Io(e) => ConfigError::Io(e, "tls.key"), - _ => self.bad_type("tls", pem_err, "a valid private key file") - })?; + let mut keys = match File::open(self.root_relative(key_path)) { + Ok(file) => rsa_private_keys(&mut BufReader::new(file)).map_err(|_| { + self.bad_type("tls", pem_err, "a valid private key file") + }), + Err(e) => Err(ConfigError::Io(e, "tls.key")), + }?; + + // TODO.async: Proper check for one key + let key = keys.remove(0); self.tls = Some(TlsConfig { certs, key }); Ok(()) diff --git a/core/lib/src/config/mod.rs b/core/lib/src/config/mod.rs index bcba687c..8e0aa880 100644 --- a/core/lib/src/config/mod.rs +++ b/core/lib/src/config/mod.rs @@ -460,7 +460,7 @@ mod test { use std::env; use std::sync::Mutex; - use super::{FullConfig, ConfigError, ConfigBuilder}; + use super::{Config, FullConfig, ConfigError, ConfigBuilder}; use super::{Environment, GLOBAL_ENV_NAME}; use super::environment::CONFIG_ENV; use super::Environment::*; @@ -1071,19 +1071,6 @@ mod test { } } - macro_rules! check_value { - ($key:expr, $val:expr, $config:expr) => ( - match $key { - "log" => assert_eq!($config.log_level, $val.parse().unwrap()), - "port" => assert_eq!($config.port, $val.parse().unwrap()), - "address" => assert_eq!($config.address, $val), - "extra_extra" => assert_eq!($config.get_bool($key).unwrap(), true), - "workers" => assert_eq!($config.workers, $val.parse().unwrap()), - _ => panic!("Unexpected key: {}", $key) - } - ) - } - #[test] fn test_env_override() { // Take the lock so changing the environment doesn't cause races. @@ -1094,6 +1081,17 @@ mod test { ("address", "1.2.3.4"), ("EXTRA_EXTRA", "true"), ("workers", "3") ]; + let check_value = |key: &str, val: &str, config: &Config| { + match key { + "log" => assert_eq!(config.log_level, val.parse().unwrap()), + "port" => assert_eq!(config.port, val.parse::().unwrap()), + "address" => assert_eq!(config.address, val), + "extra_extra" => assert_eq!(config.get_bool(key).unwrap(), true), + "workers" => assert_eq!(config.workers, val.parse::().unwrap()), + _ => panic!("Unexpected key: {}", key) + } + }; + // Check that setting the environment variable actually changes the // config for the default active and nonactive environments. for &(key, val) in &pairs { @@ -1103,13 +1101,13 @@ mod test { for env in &Environment::ALL { env::set_var(CONFIG_ENV, env.to_string()); let rconfig = env_default().unwrap(); - check_value!(&*key.to_lowercase(), val, rconfig.active()); + check_value(&*key.to_lowercase(), val, rconfig.active()); } // And non-active configs. let rconfig = env_default().unwrap(); for env in &Environment::ALL { - check_value!(&*key.to_lowercase(), val, rconfig.get(*env)); + check_value(&*key.to_lowercase(), val, rconfig.get(*env)); } } @@ -1144,11 +1142,11 @@ mod test { let mut r = FullConfig::parse(toml, TEST_CONFIG_FILENAME).unwrap(); r.override_from_env().unwrap(); - check_value!(&*key.to_lowercase(), val, r.active()); + check_value(&*key.to_lowercase(), val, r.active()); // And non-active configs. for env in &Environment::ALL { - check_value!(&*key.to_lowercase(), val, r.get(*env)); + check_value(&*key.to_lowercase(), val, r.get(*env)); } } diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index d0d2f0a4..caa32ddc 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -1,16 +1,16 @@ -use std::io::{self, Read, Write, Cursor, Chain}; use std::path::Path; -use std::fs::File; -use std::time::Duration; +use std::pin::Pin; -#[cfg(feature = "tls")] use super::net_stream::HttpsStream; +use futures::compat::{Future01CompatExt, Stream01CompatExt, AsyncWrite01CompatExt}; +use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite}; +use futures::future::Future; +use futures::stream::TryStreamExt; -use super::data_stream::{DataStream, /* TODO kill_stream */}; -use super::net_stream::NetStream; -use crate::ext::ReadExt; +use super::data_stream::DataStream; -use crate::http::hyper::{self, Payload}; -use futures::{Async, Future}; +use crate::http::hyper; + +use crate::ext::AsyncReadExt; /// The number of bytes to read into the "peek" buffer. const PEEK_BYTES: usize = 512; @@ -48,7 +48,9 @@ const PEEK_BYTES: usize = 512; /// body data. This enables partially or fully reading from a `Data` object /// without consuming the `Data` object. pub struct Data { - body: Vec, + buffer: Vec, + is_complete: bool, + stream: Box, } impl Data { @@ -69,11 +71,15 @@ impl Data { /// } /// ``` pub fn open(mut self) -> DataStream { - // FIXME: Insert a `BufReader` in front of the `NetStream` with capacity - // 4096. We need the new `Chain` methods to get the inner reader to - // actually do this, however. - let stream = ::std::mem::replace(&mut self.body, vec![]); - DataStream(Cursor::new(stream)) + let buffer = std::mem::replace(&mut self.buffer, vec![]); + let stream = std::mem::replace(&mut self.stream, Box::new(&[][..])); + DataStream(buffer, stream) + } + + pub(crate) fn from_hyp(body: hyper::Body) -> impl Future { + // TODO.async: This used to also set the read timeout to 5 seconds. + + Data::new(body) } /// Retrieve the `peek` buffer. @@ -94,10 +100,10 @@ impl Data { /// ``` #[inline(always)] pub fn peek(&self) -> &[u8] { - if self.body.len() > PEEK_BYTES { - &self.body[..PEEK_BYTES] + if self.buffer.len() > PEEK_BYTES { + &self.buffer[..PEEK_BYTES] } else { - &self.body + &self.buffer } } @@ -118,8 +124,7 @@ impl Data { /// ``` #[inline(always)] pub fn peek_complete(&self) -> bool { - // TODO self.is_complete - true + self.is_complete } /// A helper method to write the body of the request to any `Write` type. @@ -139,8 +144,11 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to(self, writer: &mut W) -> io::Result { - io::copy(&mut self.open(), writer) + pub fn stream_to<'w, W: AsyncWrite + Unpin>(self, writer: &'w mut W) -> impl Future> + 'w { + Box::pin(async move { + let stream = self.open(); + stream.copy_into(writer).await + }) } /// A helper method to write the body of the request to a file at the path @@ -161,8 +169,11 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to_file>(self, path: P) -> io::Result { - io::copy(&mut self.open(), &mut File::create(path)?) + pub fn stream_to_file + Send + 'static>(self, path: P) -> impl Future> { + Box::pin(async move { + let mut file = tokio::fs::File::create(path).compat().await?.compat(); + self.stream_to(&mut file).await + }) } // Creates a new data object with an internal buffer `buf`, where the cursor @@ -170,8 +181,56 @@ impl Data { // bytes `vec[pos..cap]` are buffered and unread. The remainder of the data // bytes can be read from `stream`. #[inline(always)] - pub(crate) fn new(body: Vec) -> Data { - Data { body } + pub(crate) fn new(body: hyper::Body) -> Pin + Send>> { + trace_!("Data::new({:?})", body); + + let mut stream = body.compat().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }).into_async_read(); + + Box::pin(async { + let mut peek_buf = vec![0; PEEK_BYTES]; + + let eof = match stream.read_max(&mut peek_buf[..]).await { + Ok(n) => { + trace_!("Filled peek buf with {} bytes.", n); + + // TODO.async: This has not gone away, and I don't entirely + // understand what's happening here + + // We can use `set_len` here instead of `truncate`, but we'll + // take the performance hit to avoid `unsafe`. All of this code + // should go away when we migrate away from hyper 0.10.x. + + peek_buf.truncate(n); + n < PEEK_BYTES + } + Err(e) => { + error_!("Failed to read into peek buffer: {:?}.", e); + // Likewise here as above. + peek_buf.truncate(0); + false + } + }; + + trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES); + Data { buffer: peek_buf, stream: Box::new(stream), is_complete: eof } + }) } + /// This creates a `data` object from a local data source `data`. + #[inline] + pub(crate) fn local(data: Vec) -> Data { + Data { + buffer: data, + stream: Box::new(&[][..]), + is_complete: true, + } + } +} + +impl std::borrow::Borrow<()> for Data { + fn borrow(&self) -> &() { + &() + } } diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index a6ec7032..721079bd 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -1,50 +1,36 @@ -use std::io::{self, Chain, Cursor, Read, Write}; -use std::net::Shutdown; +use std::pin::Pin; -pub type InnerStream = Cursor>; +use futures::io::{AsyncRead, Error as IoError}; +use futures::task::{Poll, Context}; +// TODO.async: Consider storing the real type here instead of a Box to avoid +// the dynamic dispatch /// Raw data stream of a request body. /// /// This stream can only be obtained by calling /// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data /// in the body of the request. It exposes no methods directly. Instead, it must /// be used as an opaque [`Read`] structure. -pub struct DataStream(pub(crate) InnerStream); +pub struct DataStream(pub(crate) Vec, pub(crate) Box); + +// TODO.async: Consider implementing `AsyncBufRead` // TODO: Have a `BufRead` impl for `DataStream`. At the moment, this isn't // possible since Hyper's `HttpReader` doesn't implement `BufRead`. -impl Read for DataStream { +impl AsyncRead for DataStream { #[inline(always)] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace_!("DataStream::read()"); - self.0.read(buf) - } -} - -/* pub fn kill_stream(stream: &mut BodyReader) { - // Only do the expensive reading if we're not sure we're done. - // TODO use self::HttpReader::*; - match *stream { - SizedReader(_, n) | ChunkedReader(_, Some(n)) if n > 0 => { /* continue */ }, - _ => return - }; - - // Take <= 1k from the stream. If there might be more data, force close. - const FLUSH_LEN: u64 = 1024; - match io::copy(&mut stream.take(FLUSH_LEN), &mut io::sink()) { - Ok(FLUSH_LEN) | Err(_) => { - warn_!("Data left unread. Force closing network stream."); - let (_, network) = stream.get_mut().get_mut(); - if let Err(e) = network.close(Shutdown::Read) { - error_!("Failed to close network stream: {:?}", e); - } + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + trace_!("DataStream::poll_read()"); + if self.0.len() > 0 { + let count = std::cmp::min(buf.len(), self.0.len()); + trace_!("Reading peeked {} into dest {} = {} bytes", self.0.len(), buf.len(), count); + let next = self.0.split_off(count); + (&mut buf[..count]).copy_from_slice(&self.0[..]); + self.0 = next; + Poll::Ready(Ok(count)) + } else { + trace_!("Delegating to remaining stream"); + Pin::new(&mut self.1).poll_read(cx, buf) } - Ok(n) => debug!("flushed {} unread bytes", n) - } -}*/ - -impl Drop for DataStream { - fn drop(&mut self) { - // TODO kill_stream(&mut self.0.get_mut().1); } } diff --git a/core/lib/src/data/from_data.rs b/core/lib/src/data/from_data.rs index db17bbb2..03cab1c1 100644 --- a/core/lib/src/data/from_data.rs +++ b/core/lib/src/data/from_data.rs @@ -1,4 +1,8 @@ use std::borrow::Borrow; +use std::pin::Pin; + +use futures::future::{ready, Future, FutureExt}; +use futures::io::AsyncReadExt; use crate::outcome::{self, IntoOutcome}; use crate::outcome::Outcome::*; @@ -108,6 +112,9 @@ pub type Transformed<'a, T> = Outcome<&'a >::Borrowed, >::Error> >; +pub type TransformFuture<'a, T, E> = Pin>> + Send + 'a>>; +pub type FromDataFuture<'a, T, E> = Pin> + Send + 'a>>; + /// Trait implemented by data guards to derive a value from request body data. /// /// # Data Guards @@ -321,7 +328,7 @@ pub type Transformed<'a, T> = /// [`FromDataSimple`] documentation. pub trait FromData<'a>: Sized { /// The associated error to be returned when the guard fails. - type Error; + type Error: Send; /// The owned type returned from [`FromData::transform()`]. /// @@ -354,7 +361,7 @@ pub trait FromData<'a>: Sized { /// If transformation succeeds, an outcome of `Success` is returned. /// If the data is not appropriate given the type of `Self`, `Forward` is /// returned. On failure, `Failure` is returned. - fn transform(request: &Request<'_>, data: Data) -> Transform>; + fn transform(request: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error>; /// Validates, parses, and converts the incoming request body data into an /// instance of `Self`. @@ -384,23 +391,23 @@ pub trait FromData<'a>: Sized { /// # unimplemented!() /// # } /// ``` - fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> Outcome; + fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error>; } /// The identity implementation of `FromData`. Always returns `Success`. -impl<'f> FromData<'f> for Data { +impl<'a> FromData<'a> for Data { type Error = std::convert::Infallible; type Owned = Data; - type Borrowed = Data; + type Borrowed = (); #[inline(always)] - fn transform(_: &Request<'_>, data: Data) -> Transform> { - Transform::Owned(Success(data)) + fn transform(_: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { + Box::pin(ready(Transform::Owned(Success(data)))) } #[inline(always)] - fn from_data(_: &Request<'_>, outcome: Transformed<'f, Self>) -> Outcome { - outcome.owned() + fn from_data(_: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(ready(outcome.owned())) } } @@ -494,8 +501,9 @@ impl<'f> FromData<'f> for Data { /// # fn main() { } /// ``` pub trait FromDataSimple: Sized { + // TODO.async: Can/should we relax this 'static? And how? /// The associated error to be returned when the guard fails. - type Error; + type Error: Send + 'static; /// Validates, parses, and converts an instance of `Self` from the incoming /// request body data. @@ -503,22 +511,25 @@ pub trait FromDataSimple: Sized { /// If validation and parsing succeeds, an outcome of `Success` is returned. /// If the data is not appropriate given the type of `Self`, `Forward` is /// returned. If parsing fails, `Failure` is returned. - fn from_data(request: &Request<'_>, data: Data) -> Outcome; + fn from_data(request: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error>; } -impl<'a, T: FromDataSimple> FromData<'a> for T { +impl<'a, T: FromDataSimple + 'a> FromData<'a> for T { type Error = T::Error; type Owned = Data; - type Borrowed = Data; + type Borrowed = (); #[inline(always)] - fn transform(_: &Request<'_>, d: Data) -> Transform> { - Transform::Owned(Success(d)) + fn transform(_: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { + Box::pin(ready(Transform::Owned(Success(d)))) } #[inline(always)] - fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - T::from_data(req, try_outcome!(o.owned())) + fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + match o.owned() { + Success(data) => T::from_data(req, data), + _ => unreachable!(), + } } } @@ -528,17 +539,17 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Result { type Borrowed = T::Borrowed; #[inline(always)] - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { T::transform(r, d) } #[inline(always)] - fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - match T::from_data(r, o) { + fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(T::from_data(r, o).map(|x| match x { Success(val) => Success(Ok(val)), Forward(data) => Forward(data), Failure((_, e)) => Success(Err(e)), - } + })) } } @@ -548,46 +559,52 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Option { type Borrowed = T::Borrowed; #[inline(always)] - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> { T::transform(r, d) } #[inline(always)] - fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome { - match T::from_data(r, o) { + fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(T::from_data(r, o).map(|x| match x { Success(val) => Success(Some(val)), Failure(_) | Forward(_) => Success(None), - } + })) } } -#[cfg(debug_assertions)] -use std::io::{self, Read}; - #[cfg(debug_assertions)] impl FromDataSimple for String { - type Error = io::Error; + type Error = std::io::Error; #[inline(always)] - fn from_data(_: &Request<'_>, data: Data) -> Outcome { - let mut string = String::new(); - match data.open().read_to_string(&mut string) { - Ok(_) => Success(string), - Err(e) => Failure((Status::BadRequest, e)) - } + fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> { + Box::pin(async { + let mut stream = data.open(); + let mut buf = Vec::new(); + if let Err(e) = stream.read_to_end(&mut buf).await { + return Failure((Status::BadRequest, e)); + } + match String::from_utf8(buf) { + Ok(s) => Success(s), + Err(e) => Failure((Status::BadRequest, std::io::Error::new(std::io::ErrorKind::Other, e))), + } + }) } } #[cfg(debug_assertions)] impl FromDataSimple for Vec { - type Error = io::Error; + type Error = std::io::Error; #[inline(always)] - fn from_data(_: &Request<'_>, data: Data) -> Outcome { - let mut bytes = Vec::new(); - match data.open().read_to_end(&mut bytes) { - Ok(_) => Success(bytes), - Err(e) => Failure((Status::BadRequest, e)) - } + fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> { + Box::pin(async { + let mut stream = data.open(); + let mut buf = Vec::new(); + match stream.read_to_end(&mut buf).await { + Ok(_) => Success(buf), + Err(e) => Failure((Status::BadRequest, e)), + } + }) } } diff --git a/core/lib/src/data/mod.rs b/core/lib/src/data/mod.rs index 20523fac..350b2685 100644 --- a/core/lib/src/data/mod.rs +++ b/core/lib/src/data/mod.rs @@ -2,9 +2,8 @@ mod data; mod data_stream; -mod net_stream; mod from_data; pub use self::data::Data; pub use self::data_stream::DataStream; -pub use self::from_data::{FromData, FromDataSimple, Outcome, Transform, Transformed}; +pub use self::from_data::{FromData, FromDataFuture, FromDataSimple, Outcome, Transform, Transformed, TransformFuture}; diff --git a/core/lib/src/data/net_stream.rs b/core/lib/src/data/net_stream.rs deleted file mode 100644 index 09e0762b..00000000 --- a/core/lib/src/data/net_stream.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::io; -use std::net::{SocketAddr, Shutdown}; -use std::time::Duration; - -#[cfg(feature = "tls")] use crate::http::tls::{WrappedStream, ServerSession}; -// TODO use http::hyper::net::{HttpStream, NetworkStream}; - -use self::NetStream::*; - -#[cfg(feature = "tls")] pub type HttpsStream = WrappedStream; - -// This is a representation of all of the possible network streams we might get. -// This really shouldn't be necessary, but, you know, Hyper. -#[derive(Clone)] -pub enum NetStream { - Http/* TODO (HttpStream) */, - #[cfg(feature = "tls")] - Https(HttpsStream), - Empty, -} - -impl io::Read for NetStream { - #[inline(always)] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace_!("NetStream::read()"); - let res = match *self { - Http/*(ref mut stream)*/ => Ok(0) /* TODO stream.read(buf)*/, - #[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf), - Empty => Ok(0), - }; - - trace_!("NetStream::read() -- complete"); - res - } -} - -impl io::Write for NetStream { - #[inline(always)] - fn write(&mut self, buf: &[u8]) -> io::Result { - trace_!("NetStream::write()"); - match *self { - Http/* TODO (ref mut stream) => stream.write(buf)*/ => Ok(0), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.write(buf), - Empty => Ok(0), - } - } - - #[inline(always)] - fn flush(&mut self) -> io::Result<()> { - match *self { - Http/* TODO (ref mut stream) => stream.flush()*/ => Ok(()), - #[cfg(feature = "tls")] Https(ref mut stream) => stream.flush(), - Empty => Ok(()), - } - } -} - -//impl NetworkStream for NetStream { -// #[inline(always)] -// fn peer_addr(&mut self) -> io::Result { -// match *self { -// Http/* TODO (ref mut stream) => stream.peer_addr()*/ => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), -// #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(), -// Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)), -// } -// } -// -// #[inline(always)] -// fn set_read_timeout(&self, dur: Option) -> io::Result<()> { -// match *self { -// Http/* TODO (ref stream) => stream.set_read_timeout(dur)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur), -// Empty => Ok(()), -// } -// } -// -// #[inline(always)] -// fn set_write_timeout(&self, dur: Option) -> io::Result<()> { -// match *self { -// Http/* TODO (ref stream) => stream.set_write_timeout(dur)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur), -// Empty => Ok(()), -// } -// } -// -// #[inline(always)] -// fn close(&mut self, how: Shutdown) -> io::Result<()> { -// match *self { -// Http/* TODO (ref mut stream) => stream.close(how)*/ => Ok(()), -// #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how), -// Empty => Ok(()), -// } -// } -//} diff --git a/core/lib/src/error.rs b/core/lib/src/error.rs index 614284d0..c3ed1721 100644 --- a/core/lib/src/error.rs +++ b/core/lib/src/error.rs @@ -19,7 +19,7 @@ use crate::router::Route; #[derive(Debug)] pub enum LaunchErrorKind { /// Binding to the provided address/port failed. - Bind(std::io::Error), + Bind(hyper::Error), /// An I/O error occurred during launch. Io(io::Error), /// Route collisions were detected. @@ -123,10 +123,9 @@ impl LaunchError { impl From for LaunchError { #[inline] fn from(error: hyper::Error) -> LaunchError { - match error { - // TODO hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)), - e => LaunchError::new(LaunchErrorKind::Unknown(Box::new(e))) - } + // TODO.async: Should "hyper error" be another variant of LaunchErrorKind? + // Or should this use LaunchErrorKind::Io? + LaunchError::new(LaunchErrorKind::Unknown(Box::new(error))) } } diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 8813b741..f7996e4f 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -1,19 +1,95 @@ use std::io; +use std::pin::Pin; -pub trait ReadExt: io::Read { - fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result { - let start_len = buf.len(); - while !buf.is_empty() { - match self.read(buf) { - Ok(0) => break, - Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } +use futures::io::{AsyncRead, AsyncReadExt as _}; +use futures::future::{Future}; +use futures::stream::Stream; +use futures::task::{Poll, Context}; + +use crate::http::hyper::Chunk; + +// Based on std::io::Take, but for AsyncRead instead of Read +pub struct Take{ + inner: R, + limit: u64, +} + +// TODO.async: Verify correctness of this implementation. +impl AsyncRead for Take where R: AsyncRead + Unpin { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + if self.limit == 0 { + return Poll::Ready(Ok(0)); } - Ok(start_len - buf.len()) + let max = std::cmp::min(buf.len() as u64, self.limit) as usize; + match Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(n)) => { + self.limit -= n as u64; + Poll::Ready(Ok(n)) + }, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } } } -impl ReadExt for T { } +pub struct IntoChunkStream { + inner: R, + buf_size: usize, + buffer: Vec, +} + +// TODO.async: Verify correctness of this implementation. +impl Stream for IntoChunkStream + where R: AsyncRead + Unpin +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ + assert!(self.buffer.len() == self.buf_size); + + let Self { ref mut inner, ref mut buffer, buf_size } = *self; + + match Pin::new(inner).poll_read(cx, &mut buffer[..]) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None), + Poll::Ready(Ok(n)) => { + let mut next = std::mem::replace(buffer, vec![0; buf_size]); + next.truncate(n); + Poll::Ready(Some(Ok(Chunk::from(next)))) + } + } + } +} + +pub trait AsyncReadExt: AsyncRead { + fn take(self, limit: u64) -> Take where Self: Sized { + Take { inner: self, limit } + } + + fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream where Self: Sized { + IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] } + } + + // TODO.async: Verify correctness of this implementation. + fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin> + Send + '_>> + where Self: Send + Unpin + { + Box::pin(async move { + let start_len = buf.len(); + while !buf.is_empty() { + match self.read(buf).await { + Ok(0) => break, + Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + + Ok(start_len - buf.len()) + }) + } +} + +impl AsyncReadExt for T { } diff --git a/core/lib/src/handler.rs b/core/lib/src/handler.rs index 1689c788..b8d33716 100644 --- a/core/lib/src/handler.rs +++ b/core/lib/src/handler.rs @@ -1,5 +1,7 @@ //! Types and traits for request and error handlers and their return values. +use futures::future::Future; + use crate::data::Data; use crate::request::Request; use crate::response::{self, Response, Responder}; @@ -9,6 +11,9 @@ use crate::outcome; /// Type alias for the `Outcome` of a `Handler`. pub type Outcome<'r> = outcome::Outcome, Status, Data>; +/// Type alias for the unwieldy `Handler` return type +pub type HandlerFuture<'r> = std::pin::Pin> + Send + 'r>>; + /// Trait implemented by types that can handle requests. /// /// In general, you will never need to implement `Handler` manually or be @@ -142,7 +147,7 @@ pub trait Handler: Cloneable + Send + Sync + 'static { /// a response. Otherwise, if the return value is `Forward(Data)`, the next /// matching route is attempted. If there are no other matching routes, the /// `404` error catcher is invoked. - fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> Outcome<'r>; + fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> HandlerFuture<'r>; } /// Unfortunate but necessary hack to be able to clone a `Box`. @@ -170,16 +175,18 @@ impl Clone for Box { } impl Handler for F - where for<'r> F: Fn(&'r Request<'_>, Data) -> Outcome<'r> + where for<'r> F: Fn(&'r Request<'_>, Data) -> HandlerFuture<'r> { #[inline(always)] - fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> Outcome<'r> { + fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> HandlerFuture<'r> { self(req, data) } } /// The type of an error handler. -pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> response::Result<'r>; +pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> ErrorHandlerFuture<'r>; + +pub type ErrorHandlerFuture<'r> = std::pin::Pin> + Send + 'r>>; impl<'r> Outcome<'r> { /// Return the `Outcome` of response to `req` from `responder`. diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index 9c3759e5..9adb9f2e 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -1,4 +1,5 @@ #![feature(proc_macro_hygiene)] +#![feature(async_await)] #![recursion_limit="256"] diff --git a/core/lib/src/local/request.rs b/core/lib/src/local/request.rs index 96c15252..4b545f42 100644 --- a/core/lib/src/local/request.rs +++ b/core/lib/src/local/request.rs @@ -107,9 +107,7 @@ impl<'c> LocalRequest<'c> { uri: Cow<'c, str> ) -> LocalRequest<'c> { // We set a dummy string for now and check the user's URI on dispatch. - let config = &client.rocket().config; - let state = &client.rocket().state; - let request = Request::new(config, state, method, Origin::dummy()); + let request = Request::new(client.rocket(), method, Origin::dummy()); // Set up any cookies we know about. if let Some(ref jar) = client.cookies { @@ -399,40 +397,46 @@ impl<'c> LocalRequest<'c> { uri: &str, data: Vec ) -> LocalResponse<'c> { + let maybe_uri = Origin::parse(uri); + // First, validate the URI, returning an error response (generated from // an error catcher) immediately if it's invalid. - if let Ok(uri) = Origin::parse(uri) { + if let Ok(uri) = maybe_uri { request.set_uri(uri.into_owned()); } else { error!("Malformed request URI: {}", uri); - let res = client.rocket().handle_error(Status::BadRequest, request); - return LocalResponse { _request: owned_request, response: res }; + return futures::executor::block_on(async move { + let res = client.rocket().handle_error(Status::BadRequest, request).await; + LocalResponse { _request: owned_request, response: res } + }) } - // Actually dispatch the request. - let response = client.rocket().dispatch(request, Data::new(data)); + futures::executor::block_on(async move { + // Actually dispatch the request. + let response = client.rocket().dispatch(request, Data::local(data)).await; - // If the client is tracking cookies, updates the internal cookie jar - // with the changes reflected by `response`. - if let Some(ref jar) = client.cookies { - let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock"); + // If the client is tracking cookies, updates the internal cookie jar + // with the changes reflected by `response`. + if let Some(ref jar) = client.cookies { + let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock"); let current_time = time::OffsetDateTime::now_utc(); - for cookie in response.cookies() { - if let Some(expires) = cookie.expires() { - if expires <= current_time { - jar.force_remove(cookie); - continue; + for cookie in response.cookies() { + if let Some(expires) = cookie.expires() { + if expires <= current_time { + jar.force_remove(cookie); + continue; + } } + + jar.add(cookie.into_owned()); } - - jar.add(cookie.into_owned()); } - } - LocalResponse { - _request: owned_request, - response: response - } + LocalResponse { + _request: owned_request, + response: response + } + }) } } @@ -454,6 +458,16 @@ pub struct LocalResponse<'c> { response: Response<'c>, } +impl LocalResponse<'_> { + pub fn body_string_wait(&mut self) -> Option { + futures::executor::block_on(self.body_string()) + } + + pub fn body_bytes_wait(&mut self) -> Option> { + futures::executor::block_on(self.body_bytes()) + } +} + impl<'c> Deref for LocalResponse<'c> { type Target = Response<'c>; diff --git a/core/lib/src/logger.rs b/core/lib/src/logger.rs index b0d53a25..edace752 100644 --- a/core/lib/src/logger.rs +++ b/core/lib/src/logger.rs @@ -158,16 +158,16 @@ pub(crate) fn try_init(level: LoggingLevel, verbose: bool) -> bool { } push_max_level(level); -/* if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { + if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) { if verbose { eprintln!("Logger failed to initialize: {}", e); } pop_max_level(); return false; - }*/ + } - false + true } use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; diff --git a/core/lib/src/request/form/form.rs b/core/lib/src/request/form/form.rs index ea70acad..5d13353f 100644 --- a/core/lib/src/request/form/form.rs +++ b/core/lib/src/request/form/form.rs @@ -1,9 +1,12 @@ use std::ops::Deref; +use futures::io::AsyncReadExt; + use crate::outcome::Outcome::*; use crate::request::{Request, form::{FromForm, FormItems, FormDataError}}; -use crate::data::{Outcome, Transform, Transformed, Data, FromData}; +use crate::data::{Outcome, Transform, Transformed, Data, FromData, TransformFuture, FromDataFuture}; use crate::http::{Status, uri::{Query, FromUriParam}}; +use crate::ext::AsyncReadExt as _; /// A data guard for parsing [`FromForm`] types strictly. /// @@ -184,7 +187,7 @@ impl<'f, T: FromForm<'f>> Form { /// /// All relevant warnings and errors are written to the console in Rocket /// logging format. -impl<'f, T: FromForm<'f>> FromData<'f> for Form { +impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form { type Error = FormDataError<'f, T::Error>; type Owned = String; type Borrowed = str; @@ -192,26 +195,31 @@ impl<'f, T: FromForm<'f>> FromData<'f> for Form { fn transform( request: &Request<'_>, data: Data - ) -> Transform> { - use std::{cmp::min, io::Read}; - + ) -> TransformFuture<'f, Self::Owned, Self::Error> { if !request.content_type().map_or(false, |ct| ct.is_form()) { warn_!("Form data does not have form content type."); - return Transform::Borrowed(Forward(data)) + return Box::pin(futures::future::ready(Transform::Borrowed(Forward(data)))); } let limit = request.limits().forms; let mut stream = data.open().take(limit); - let mut form_string = String::with_capacity(min(4096, limit) as usize); - if let Err(e) = stream.read_to_string(&mut form_string) { - return Transform::Borrowed(Failure((Status::InternalServerError, FormDataError::Io(e)))) - } + Box::pin(async move { + let mut buf = Vec::new(); + if let Err(e) = stream.read_to_end(&mut buf).await { + return Transform::Borrowed(Failure((Status::InternalServerError, FormDataError::Io(e)))); + } - Transform::Borrowed(Success(form_string)) + Transform::Borrowed(match String::from_utf8(buf) { + Ok(s) => Success(s), + Err(e) => Failure((Status::BadRequest, FormDataError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))), + }) + }) } - fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome { - >::from_data(try_outcome!(o.borrowed()), true).map(Form) + fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { + Box::pin(futures::future::ready(o.borrowed().and_then(|data| { + >::from_data(data, true).map(Form) + }))) } } diff --git a/core/lib/src/request/form/from_form.rs b/core/lib/src/request/form/from_form.rs index dc590430..08c55985 100644 --- a/core/lib/src/request/form/from_form.rs +++ b/core/lib/src/request/form/from_form.rs @@ -93,7 +93,7 @@ use crate::request::FormItems; /// ``` pub trait FromForm<'f>: Sized { /// The associated error to be returned when parsing fails. - type Error; + type Error: Send; /// Parses an instance of `Self` from the iterator of form items `it`. /// diff --git a/core/lib/src/request/form/lenient.rs b/core/lib/src/request/form/lenient.rs index 0cfe122d..d25b3f1c 100644 --- a/core/lib/src/request/form/lenient.rs +++ b/core/lib/src/request/form/lenient.rs @@ -1,7 +1,7 @@ use std::ops::Deref; use crate::request::{Request, form::{Form, FormDataError, FromForm}}; -use crate::data::{Data, Transform, Transformed, FromData, Outcome}; +use crate::data::{Data, Transformed, FromData, TransformFuture, FromDataFuture}; use crate::http::uri::{Query, FromUriParam}; /// A data guard for parsing [`FromForm`] types leniently. @@ -95,17 +95,19 @@ impl Deref for LenientForm { } } -impl<'f, T: FromForm<'f>> FromData<'f> for LenientForm { +impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for LenientForm { type Error = FormDataError<'f, T::Error>; type Owned = String; type Borrowed = str; - fn transform(r: &Request<'_>, d: Data) -> Transform> { + fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'f, Self::Owned, Self::Error> { >::transform(r, d) } - fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome { - >::from_data(try_outcome!(o.borrowed()), false).map(LenientForm) + fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { + Box::pin(futures::future::ready(o.borrowed().and_then(|form| { + >::from_data(form, false).map(LenientForm) + }))) } } diff --git a/core/lib/src/request/request.rs b/core/lib/src/request/request.rs index ddd9b767..e1c73f73 100644 --- a/core/lib/src/request/request.rs +++ b/core/lib/src/request/request.rs @@ -1,10 +1,7 @@ -use std::rc::Rc; -use std::cell::{Cell, RefCell}; +use std::sync::{Arc, RwLock, Mutex}; use std::net::{IpAddr, SocketAddr}; use std::fmt; use std::str; -use std::str::FromStr; -use std::sync::Arc; use yansi::Paint; use state::{Container, Storage}; @@ -15,7 +12,7 @@ use crate::request::{FromFormValue, FormItems, FormItem}; use crate::rocket::Rocket; use crate::router::Route; use crate::config::{Config, Limits}; -use crate::http::{hyper, uri::{Origin, Segments, Uri}}; +use crate::http::{hyper, uri::{Origin, Segments}}; use crate::http::{Method, Header, HeaderMap, Cookies}; use crate::http::{RawStr, ContentType, Accept, MediaType}; use crate::http::private::{Indexed, SmallVec, CookieJar}; @@ -28,26 +25,26 @@ type Indices = (usize, usize); /// should likely only be used when writing [`FromRequest`] implementations. It /// contains all of the information for a given web request except for the body /// data. This includes the HTTP method, URI, cookies, headers, and more. -#[derive(Clone)] +//#[derive(Clone)] pub struct Request<'r> { - method: Cell, + method: RwLock, uri: Origin<'r>, headers: HeaderMap<'r>, remote: Option, pub(crate) state: RequestState<'r>, } -#[derive(Clone)] +//#[derive(Clone)] pub(crate) struct RequestState<'r> { pub config: &'r Config, pub managed: &'r Container, pub path_segments: SmallVec<[Indices; 12]>, pub query_items: Option>, - pub route: Cell>, - pub cookies: RefCell, + pub route: RwLock>, + pub cookies: Mutex>, pub accept: Storage>, pub content_type: Storage>, - pub cache: Rc, + pub cache: Arc, } #[derive(Clone)] @@ -61,26 +58,25 @@ impl<'r> Request<'r> { /// Create a new `Request` with the given `method` and `uri`. #[inline(always)] pub(crate) fn new<'s: 'r>( - config: &'r Config, - managed: &'r Container, + rocket: &'r Rocket, method: Method, uri: Origin<'s> ) -> Request<'r> { let mut request = Request { - method: Cell::new(method), + method: RwLock::new(method), uri: uri, headers: HeaderMap::new(), remote: None, state: RequestState { path_segments: SmallVec::new(), query_items: None, - config, - managed, - route: Cell::new(None), - cookies: RefCell::new(CookieJar::new()), + config: &rocket.config, + managed: &rocket.state, + route: RwLock::new(None), + cookies: Mutex::new(Some(CookieJar::new())), accept: Storage::new(), content_type: Storage::new(), - cache: Rc::new(Container::new()), + cache: Arc::new(Container::new()), } }; @@ -103,7 +99,7 @@ impl<'r> Request<'r> { /// ``` #[inline(always)] pub fn method(&self) -> Method { - self.method.get() + *self.method.read().unwrap() } /// Set the method of `self`. @@ -292,9 +288,13 @@ impl<'r> Request<'r> { /// ``` pub fn cookies(&self) -> Cookies<'_> { // FIXME: Can we do better? This is disappointing. - match self.state.cookies.try_borrow_mut() { - Ok(jar) => Cookies::new(jar, self.state.config.secret_key()), - Err(_) => { + let mut guard = self.state.cookies.lock().expect("cookies lock"); + match guard.take() { + Some(jar) => { + let mutex = &self.state.cookies; + Cookies::new(jar, self.state.config.secret_key(), move |jar| *mutex.lock().expect("cookies lock") = Some(jar)) + } + None => { error_!("Multiple `Cookies` instances are active at once."); info_!("An instance of `Cookies` must be dropped before another \ can be retrieved."); @@ -499,7 +499,7 @@ impl<'r> Request<'r> { /// # }); /// ``` pub fn route(&self) -> Option<&'r Route> { - self.state.route.get() + *self.state.route.read().unwrap() } /// Invokes the request guard implementation for `T`, returning its outcome. @@ -702,7 +702,7 @@ impl<'r> Request<'r> { pub fn example)>(method: Method, uri: &str, f: F) { let rocket = Rocket::custom(Config::development()); let uri = Origin::parse(uri).expect("invalid URI in example"); - let mut request = Request::new(&rocket.config, &rocket.state, method, uri); + let mut request = Request::new(&rocket, method, uri); f(&mut request); } @@ -773,79 +773,66 @@ impl<'r> Request<'r> { /// was `route`. Use during routing when attempting a given route. #[inline(always)] pub(crate) fn set_route(&self, route: &'r Route) { - self.state.route.set(Some(route)); + *self.state.route.write().unwrap() = Some(route); } /// Set the method of `self`, even when `self` is a shared reference. Used /// during routing to override methods for re-routing. #[inline(always)] pub(crate) fn _set_method(&self, method: Method) { - self.method.set(method); + *self.method.write().unwrap() = method; } /// Convert from Hyper types into a Rocket Request. pub(crate) fn from_hyp( - config: &'r Config, - managed: &'r Container, - request_parts: &hyper::Parts, + rocket: &'r Rocket, + h_method: hyper::Method, + h_headers: hyper::HeaderMap, + h_uri: hyper::Uri, + h_addr: SocketAddr, ) -> Result, String> { - - let h_uri = &request_parts.uri; - let h_headers = &request_parts.headers; - let h_version = &request_parts.version; - let h_method = &request_parts.method;; - -// if !h_uri.is_absolute() { -// return Err(format!("Bad URI: {}", h_uri)); -// }; + // TODO.async: Can we avoid this allocation? + // TODO.async: Assert that uri is "absolute" + // Get a copy of the URI for later use. + let uri = h_uri.to_string(); // Ensure that the method is known. TODO: Allow made-up methods? - let method = match Method::from_hyp(h_method) { + let method = match Method::from_hyp(&h_method) { Some(method) => method, - None => return Err(format!("Unknown method: {}", h_method)) + None => return Err(format!("Unknown or invalid method: {}", h_method)) }; // We need to re-parse the URI since we don't trust Hyper... :( - let uri = Origin::parse_owned(format!("{}", h_uri)).map_err(|e| e.to_string())?; + let uri = Origin::parse_owned(format!("{}", uri)).map_err(|e| e.to_string())?; // Construct the request object. - let mut request = Request::new(config, managed, method, uri); -// request.set_remote(match hyp_req.remote_addr() { -// Some(remote) => remote, -// None => return Err(String::from("Missing remote address")) -// }); + let mut request = Request::new(rocket, method, uri); + request.set_remote(h_addr); // Set the request cookies, if they exist. - let cookie_headers = h_headers.get_all("Cookie").iter(); - // TODO if cookie_headers.peek().is_some() { - let mut cookie_jar = CookieJar::new(); - for header in cookie_headers { - let raw_str = match ::std::str::from_utf8(header.as_bytes()) { - Ok(string) => string, - Err(_) => continue - }; + let mut cookie_jar = CookieJar::new(); + for header in h_headers.get_all("Cookie") { + // TODO.async: This used to only allow UTF-8 but now only allows ASCII + // (needs verification) + let raw_str = match header.to_str() { + Ok(string) => string, + Err(_) => continue + }; - for cookie_str in raw_str.split(';').map(|s| s.trim()) { - if let Some(cookie) = Cookies::parse_cookie(cookie_str) { - cookie_jar.add_original(cookie); - } + for cookie_str in raw_str.split(';').map(|s| s.trim()) { + if let Some(cookie) = Cookies::parse_cookie(cookie_str) { + cookie_jar.add_original(cookie); } } - - request.state.cookies = RefCell::new(cookie_jar); - // TODO } + } + request.state.cookies = Mutex::new(Some(cookie_jar)); // Set the rest of the headers. for (name, value) in h_headers.iter() { - - // TODO if let Some(header_values) = h_headers.get_all(hyp.name()) { - - // This is not totally correct since values needn't be UTF8. - let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned(); - let header = Header::new(name.to_string(), value_str); - request.add_header(header); - - // TODO } + // This is not totally correct since values needn't be UTF8. + let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned(); + let header = Header::new(name.to_string(), value_str); + request.add_header(header); } Ok(request) diff --git a/core/lib/src/request/tests.rs b/core/lib/src/request/tests.rs index ac21bb41..3a9cafe5 100644 --- a/core/lib/src/request/tests.rs +++ b/core/lib/src/request/tests.rs @@ -7,13 +7,13 @@ use crate::http::hyper; macro_rules! assert_headers { ($($key:expr => [$($value:expr),+]),+) => ({ // Set up the parameters to the hyper request object. - let h_method = hyper::Method::Get; - let h_uri = hyper::RequestUri::AbsolutePath("/test".to_string()); + let h_method = hyper::Method::GET; + let h_uri = "/test".parse().unwrap(); let h_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000); - let mut h_headers = hyper::header::Headers::new(); + let mut h_headers = hyper::HeaderMap::new(); // Add all of the passed in headers to the request. - $($(h_headers.append_raw($key.to_string(), $value.as_bytes().into());)+)+ + $($(h_headers.append($key, hyper::HeaderValue::from_str($value).unwrap());)+)+ // Build up what we expect the headers to actually be. let mut expected = HashMap::new(); diff --git a/core/lib/src/response/responder.rs b/core/lib/src/response/responder.rs index 703cb046..725dad88 100644 --- a/core/lib/src/response/responder.rs +++ b/core/lib/src/response/responder.rs @@ -1,6 +1,8 @@ use std::fs::File; use std::io::{Cursor, BufReader}; +use futures::compat::AsyncRead01CompatExt; + use crate::http::{Status, ContentType, StatusClass}; use crate::response::{self, Response, Body}; use crate::request::Request; @@ -241,10 +243,11 @@ impl Responder<'_> for Vec { /// Returns a response with a sized body for the file. Always returns `Ok`. impl Responder<'_> for File { fn respond_to(self, _: &Request<'_>) -> response::Result<'static> { - let (metadata, file) = (self.metadata(), BufReader::new(self)); + let metadata = self.metadata(); + let stream = BufReader::new(tokio::fs::File::from_std(self)).compat(); match metadata { - Ok(md) => Response::build().raw_body(Body::Sized(file, md.len())).ok(), - Err(_) => Response::build().streamed_body(file).ok() + Ok(md) => Response::build().raw_body(Body::Sized(stream, md.len())).ok(), + Err(_) => Response::build().streamed_body(stream).ok() } } } diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index 02ff62d4..4b407953 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -1,8 +1,13 @@ use std::{io, fmt, str}; use std::borrow::Cow; +use std::pin::Pin; + +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncRead, AsyncReadExt}; use crate::response::Responder; use crate::http::{Header, HeaderMap, Status, ContentType, Cookie}; +use crate::ext::AsyncReadExt as _; /// The default size, in bytes, of a chunk for streamed responses. pub const DEFAULT_CHUNK_SIZE: u64 = 4096; @@ -59,31 +64,34 @@ impl Body { } } -impl Body { +impl Body { /// Attempts to read `self` into a `Vec` and returns it. If reading fails, /// returns `None`. - pub fn into_bytes(self) -> Option> { - let mut vec = Vec::new(); - let mut body = self.into_inner(); - if let Err(e) = body.read_to_end(&mut vec) { - error_!("Error reading body: {:?}", e); - return None; - } + pub fn into_bytes(self) -> impl Future>> { + Box::pin(async move { + let mut vec = Vec::new(); + let mut body = self.into_inner(); + if let Err(e) = body.read_to_end(&mut vec).await { + error_!("Error reading body: {:?}", e); + return None; + } - Some(vec) + Some(vec) + }) } /// Attempts to read `self` into a `String` and returns it. If reading or /// conversion fails, returns `None`. - pub fn into_string(self) -> Option { - self.into_bytes() - .and_then(|bytes| match String::from_utf8(bytes) { + pub fn into_string(self) -> impl Future> { + self.into_bytes().map(|bytes| { + bytes.and_then(|bytes| match String::from_utf8(bytes) { Ok(string) => Some(string), Err(e) => { error_!("Body is invalid UTF-8: {}", e); None } }) + }) } } @@ -350,7 +358,7 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn sized_body(&mut self, body: B) -> &mut ResponseBuilder<'r> - where B: io::Read + io::Seek + 'r + where B: AsyncRead + io::Seek + Send + Unpin + 'r { self.response.set_sized_body(body); self @@ -376,7 +384,7 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn streamed_body(&mut self, body: B) -> &mut ResponseBuilder<'r> - where B: io::Read + 'r + where B: AsyncRead + Send + 'r { self.response.set_streamed_body(body); self @@ -402,7 +410,7 @@ impl<'r> ResponseBuilder<'r> { /// # } /// ``` #[inline(always)] - pub fn chunked_body(&mut self, body: B, chunk_size: u64) + pub fn chunked_body(&mut self, body: B, chunk_size: u64) -> &mut ResponseBuilder<'r> { self.response.set_chunked_body(body, chunk_size); @@ -425,7 +433,7 @@ impl<'r> ResponseBuilder<'r> { /// .finalize(); /// ``` #[inline(always)] - pub fn raw_body(&mut self, body: Body) + pub fn raw_body(&mut self, body: Body) -> &mut ResponseBuilder<'r> { self.response.set_raw_body(body); @@ -560,7 +568,7 @@ impl<'r> ResponseBuilder<'r> { pub struct Response<'r> { status: Option, headers: HeaderMap<'r>, - body: Option>>, + body: Option>>>, } impl<'r> Response<'r> { @@ -889,7 +897,7 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("Hello, world!".to_string())); /// ``` #[inline(always)] - pub fn body(&mut self) -> Option> { + pub fn body(&mut self) -> Option> { // Looks crazy, right? Needed so Rust infers lifetime correctly. Weird. match self.body.as_mut() { Some(body) => Some(match body.as_mut() { @@ -919,8 +927,14 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn body_string(&mut self) -> Option { - self.take_body().and_then(Body::into_string) + pub fn body_string(&mut self) -> impl Future> + 'r { + let body = self.take_body(); + Box::pin(async move { + match body { + Some(body) => body.into_string().await, + None => None, + } + }) } /// Consumes `self's` body and reads it into a `Vec` of `u8` bytes. If @@ -941,8 +955,14 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn body_bytes(&mut self) -> Option> { - self.take_body().and_then(Body::into_bytes) + pub fn body_bytes(&mut self) -> impl Future>> + 'r { + let body = self.take_body(); + Box::pin(async move { + match body { + Some(body) => body.into_bytes().await, + None => None, + } + }) } /// Moves the body of `self` out and returns it, if there is one, leaving no @@ -966,17 +986,17 @@ impl<'r> Response<'r> { /// assert!(response.body().is_none()); /// ``` #[inline(always)] - pub fn take_body(&mut self) -> Option>> { + pub fn take_body(&mut self) -> Option>>> { self.body.take() } - // Makes the `Read`er in the body empty but leaves the size of the body if + // Makes the `AsyncRead`er in the body empty but leaves the size of the body if // it exists. Only meant to be used to handle HEAD requests automatically. #[inline(always)] pub(crate) fn strip_body(&mut self) { if let Some(body) = self.take_body() { self.body = match body { - Body::Sized(_, n) => Some(Body::Sized(Box::new(io::empty()), n)), + Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::empty()), n)), Body::Chunked(..) => None }; } @@ -1004,13 +1024,13 @@ impl<'r> Response<'r> { /// ``` #[inline] pub fn set_sized_body(&mut self, mut body: B) - where B: io::Read + io::Seek + 'r + where B: AsyncRead + io::Seek + Send + Unpin + 'r { let size = body.seek(io::SeekFrom::End(0)) .expect("Attempted to retrieve size by seeking, but failed."); body.seek(io::SeekFrom::Start(0)) .expect("Attempted to reset body by seeking after getting size."); - self.body = Some(Body::Sized(Box::new(body.take(size)), size)); + self.body = Some(Body::Sized(Box::pin(body.take(size)), size)); } /// Sets the body of `self` to be `body`, which will be streamed. The chunk @@ -1021,7 +1041,7 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::{Read, repeat}; + /// use std::io::{AsyncRead, repeat}; /// use rocket::Response; /// /// let mut response = Response::new(); @@ -1029,7 +1049,7 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("aaaaa".to_string())); /// ``` #[inline(always)] - pub fn set_streamed_body(&mut self, body: B) where B: io::Read + 'r { + pub fn set_streamed_body(&mut self, body: B) where B: AsyncRead + Send + 'r { self.set_chunked_body(body, DEFAULT_CHUNK_SIZE); } @@ -1039,7 +1059,7 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::{Read, repeat}; + /// use std::io::{AsyncRead, repeat}; /// use rocket::Response; /// /// let mut response = Response::new(); @@ -1048,8 +1068,8 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn set_chunked_body(&mut self, body: B, chunk_size: u64) - where B: io::Read + 'r { - self.body = Some(Body::Chunked(Box::new(body), chunk_size)); + where B: AsyncRead + Send + 'r { + self.body = Some(Body::Chunked(Box::pin(body), chunk_size)); } /// Sets the body of `self` to be `body`. This method should typically not @@ -1070,10 +1090,11 @@ impl<'r> Response<'r> { /// assert_eq!(response.body_string(), Some("Hello!".to_string())); /// ``` #[inline(always)] - pub fn set_raw_body(&mut self, body: Body) { + pub fn set_raw_body(&mut self, body: Body) + where T: AsyncRead + Send + Unpin + 'r { self.body = Some(match body { - Body::Sized(b, n) => Body::Sized(Box::new(b.take(n)), n), - Body::Chunked(b, n) => Body::Chunked(Box::new(b), n), + Body::Sized(b, n) => Body::Sized(Box::pin(b.take(n)), n), + Body::Chunked(b, n) => Body::Chunked(Box::pin(b), n), }); } diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index d0cec5f5..1a48c7e5 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -1,19 +1,20 @@ -use std::io::Read; use std::fmt::{self, Debug}; +use futures::io::AsyncRead; + use crate::request::Request; use crate::response::{Response, Responder, DEFAULT_CHUNK_SIZE}; use crate::http::Status; -/// Streams a response to a client from an arbitrary `Read`er type. +/// Streams a response to a client from an arbitrary `AsyncRead`er type. /// /// The client is sent a "chunked" response, where the chunk size is at most /// 4KiB. This means that at most 4KiB are stored in memory while the response /// is being sent. This type should be used when sending responses that are /// arbitrarily large in size, such as when streaming from a local socket. -pub struct Stream(T, u64); +pub struct Stream(T, u64); -impl Stream { +impl Stream { /// Create a new stream from the given `reader` and sets the chunk size for /// each streamed chunk to `chunk_size` bytes. /// @@ -34,7 +35,7 @@ impl Stream { } } -impl Debug for Stream { +impl Debug for Stream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("Stream").field(&self.0).finish() } @@ -54,7 +55,7 @@ impl Debug for Stream { /// # #[allow(unused_variables)] /// let response = Stream::from(io::stdin()); /// ``` -impl From for Stream { +impl From for Stream { fn from(reader: T) -> Self { Stream(reader, DEFAULT_CHUNK_SIZE) } @@ -68,7 +69,7 @@ impl From for Stream { /// If reading from the input stream fails at any point during the response, the /// response is abandoned, and the response ends abruptly. An error is printed /// to the console with an indication of what went wrong. -impl<'r, T: Read + 'r> Responder<'r> for Stream { +impl<'r, T: AsyncRead + Send + 'r> Responder<'r> for Stream { fn respond_to(self, _: &Request<'_>) -> Result, Status> { Response::build().chunked_body(self.0, self.1).ok() } diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 58c75b44..be767bc0 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -1,25 +1,25 @@ use std::collections::HashMap; -use std::convert::From; -use std::str::{from_utf8, FromStr}; +use std::convert::{From, TryInto}; use std::cmp::min; -use std::io::{self, Write}; -use std::time::Duration; +use std::io; use std::mem; -use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::net::ToSocketAddrs; use std::sync::Arc; +use std::time::Duration; +use std::pin::Pin; -use futures::{Future, Stream}; -use futures::future::{self, FutureResult}; +use futures::compat::{Compat, Executor01CompatExt, Sink01CompatExt}; +use futures::future::{Future, FutureExt, TryFutureExt}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::SpawnExt; use yansi::Paint; use state::Container; -use tokio::net::TcpListener; -use tokio::prelude::{Future as _, Stream as _}; #[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor; use crate::{logger, handler}; -use crate::ext::ReadExt; use crate::config::{Config, FullConfig, ConfigError, LoggedValue}; use crate::request::{Request, FormItems}; use crate::data::Data; @@ -30,6 +30,7 @@ use crate::outcome::Outcome; use crate::error::{LaunchError, LaunchErrorKind}; use crate::fairing::{Fairing, Fairings}; use crate::logger::PaintExt; +use crate::ext::AsyncReadExt; use crate::http::{Method, Status, Header}; use crate::http::hyper::{self, header}; @@ -46,45 +47,26 @@ pub struct Rocket { fairings: Fairings, } -struct RocketArcs { - config: Arc, - router: Arc, - default_catchers: Arc>, - catchers: Arc>, - state: Arc, - fairings: Arc, +struct RocketHyperService { + rocket: Arc, + spawn: Box, + remote_addr: std::net::SocketAddr, } -impl hyper::MakeService for RocketArcs { - type ReqBody = hyper::Body; - type ResBody = hyper::Body; - type Error = hyper::Error; - type Service = RocketHyperService; - type Future = FutureResult; - type MakeError = Self::Error; +impl std::ops::Deref for RocketHyperService { + type Target = Rocket; - fn make_service(&mut self, _: Ctx) -> Self::Future { - future::ok(RocketHyperService::new(self)) + fn deref(&self) -> &Self::Target { + &*self.rocket } } -#[derive(Clone)] -pub struct RocketHyperService { - config: Arc, - router: Arc, - default_catchers: Arc>, - catchers: Arc>, - state: Arc, - fairings: Arc, -} - #[doc(hidden)] impl hyper::Service for RocketHyperService { type ReqBody = hyper::Body; type ResBody = hyper::Body; - type Error = hyper::Error; - //type Future = FutureResult, Self::Error>; - type Future = Box, Error = Self::Error> + Send>; + type Error = io::Error; + type Future = Compat, Self::Error>> + Send>>>; // This function tries to hide all of the Hyper-ness from Rocket. It // essentially converts Hyper types into Rocket types, then calls the @@ -95,57 +77,142 @@ impl hyper::Service for RocketHyperService { &mut self, hyp_req: hyper::Request, ) -> Self::Future { - let (parts, body) = hyp_req.into_parts(); + let rocket = self.rocket.clone(); + let h_addr = self.remote_addr; - // Convert the Hyper request into a Rocket request. - let req_res = Request::from_hyp(&self.config, &self.state, &parts); - let mut req = match req_res { - Ok(req) => req, - Err(e) => { - error!("Bad incoming request: {}", e); - // TODO: We don't have a request to pass in, so we just - // fabricate one. This is weird. We should let the user know - // that we failed to parse a request (by invoking some special - // handler) instead of doing this. - let dummy = Request::new(&self.config, &self.state, Method::Get, Origin::dummy()); - let r = self.handle_error(Status::BadRequest, &dummy); - return Box::new(future::ok(hyper::Response::from(r))); - } - }; + // This future must return a hyper::Response, but that's not easy + // because the response body might borrow from the request. Instead, + // we do the body writing in another future that will send us + // the response metadata (and a body channel) beforehand. + let (tx, rx) = futures::channel::oneshot::channel(); - let this = self.clone(); + self.spawn.spawn(async move { + // Get all of the information from Hyper. + let (h_parts, h_body) = hyp_req.into_parts(); - let response = body.concat2() - .map(move |chunk| { - let body = chunk.iter().rev().cloned().collect::>(); - let data = Data::new(body); + // Convert the Hyper request into a Rocket request. + let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr); + let mut req = match req_res { + Ok(req) => req, + Err(e) => { + error!("Bad incoming request: {}", e); + // TODO: We don't have a request to pass in, so we just + // fabricate one. This is weird. We should let the user know + // that we failed to parse a request (by invoking some special + // handler) instead of doing this. + let dummy = Request::new(&rocket, Method::Get, Origin::dummy()); + let r = rocket.handle_error(Status::BadRequest, &dummy).await; + return rocket.issue_response(r, tx).await; + } + }; - // TODO: Due to life time constraints the clone of the service has been made. - // TODO: It should not be necessary but it is required to find a better solution - let mut req = Request::from_hyp(&this.config, &this.state, &parts).unwrap(); - // Dispatch the request to get a response, then write that response out. - let response = this.dispatch(&mut req, data); - hyper::Response::from(response) - }); + // Retrieve the data from the hyper body. + let data = Data::from_hyp(h_body).await; - Box::new(response) + // Dispatch the request to get a response, then write that response out. + let r = rocket.dispatch(&mut req, data).await; + rocket.issue_response(r, tx).await; + }).expect("failed to spawn handler"); + + async move { + Ok(rx.await.expect("TODO.async: sender was dropped, error instead")) + }.boxed().compat() } } -impl RocketHyperService { - +impl Rocket { + // TODO.async: Reconsider io::Result #[inline] - fn new(rocket: &RocketArcs) -> RocketHyperService { - RocketHyperService { - config: rocket.config.clone(), - router: rocket.router.clone(), - default_catchers: rocket.default_catchers.clone(), - catchers: rocket.catchers.clone(), - state: rocket.state.clone(), - fairings: rocket.fairings.clone(), + fn issue_response<'r>( + &self, + response: Response<'r>, + tx: futures::channel::oneshot::Sender>, + ) -> impl Future + 'r { + let result = self.write_response(response, tx); + async move { + match result.await { + Ok(()) => { + info_!("{}", Paint::green("Response succeeded.")); + } + Err(e) => { + error_!("Failed to write response: {:?}.", e); + } + } } } + #[inline] + fn write_response<'r>( + &self, + mut response: Response<'r>, + tx: futures::channel::oneshot::Sender>, + ) -> impl Future> + 'r { + async move { + let mut hyp_res = hyper::Response::builder(); + hyp_res.status(response.status().code); + + for header in response.headers().iter() { + let name = header.name.as_str(); + let value = header.value.as_bytes(); + hyp_res.header(name, value); + } + + let send_response = move |mut hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { + let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + tx.send(response).expect("channel receiver should not be dropped"); + Ok(()) + }; + + match response.body() { + None => { + hyp_res.header(header::CONTENT_LENGTH, "0"); + send_response(hyp_res, hyper::Body::empty())?; + } + Some(Body::Sized(body, size)) => { + hyp_res.header(header::CONTENT_LENGTH, size.to_string()); + let (sender, hyp_body) = hyper::Body::channel(); + send_response(hyp_res, hyp_body)?; + + let mut stream = body.into_chunk_stream(4096); + let mut sink = sender.sink_compat().sink_map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + + while let Some(next) = stream.next().await { + sink.send(next?).await?; + } + + // TODO.async: This should be better, but it creates an + // incomprehensible error messasge instead + // stream.forward(sink).await; + } + Some(Body::Chunked(body, chunk_size)) => { + // TODO.async: This is identical to Body::Sized except for the chunk size + + let (sender, hyp_body) = hyper::Body::channel(); + send_response(hyp_res, hyp_body)?; + + let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow")); + let mut sink = sender.sink_compat().sink_map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + + while let Some(next) = stream.next().await { + sink.send(next?).await?; + } + + // TODO.async: This should be better, but it creates an + // incomprehensible error messasge instead + // stream.forward(sink).await; + } + }; + + Ok(()) + } + } +} + +impl Rocket { /// Preprocess the request for Rocket things. Currently, this means: /// /// * Rewriting the method in the request if _method form field exists. @@ -159,7 +226,7 @@ impl RocketHyperService { let is_form = req.content_type().map_or(false, |ct| ct.is_form()); if is_form && req.method() == Method::Post && data_len >= min_len { - if let Ok(form) = from_utf8(&data.peek()[..min(data_len, max_len)]) { + if let Ok(form) = std::str::from_utf8(&data.peek()[..min(data_len, max_len)]) { let method: Option> = FormItems::from(form) .filter(|item| item.key.as_str() == "_method") .map(|item| item.value.parse()) @@ -173,75 +240,80 @@ impl RocketHyperService { } #[inline] - pub(crate) fn dispatch<'s, 'r>( + pub(crate) fn dispatch<'s, 'r: 's>( &'s self, request: &'r mut Request<'s>, data: Data - ) -> Response<'r> { - info!("{}:", request); + ) -> impl Future> + 's { + async move { + info!("{}:", request); - // Do a bit of preprocessing before routing. - self.preprocess_request(request, &data); + // Do a bit of preprocessing before routing. + self.preprocess_request(request, &data); - // Run the request fairings. - self.fairings.handle_request(request, &data); + // Run the request fairings. + self.fairings.handle_request(request, &data); - // Remember if the request is a `HEAD` request for later body stripping. - let was_head_request = request.method() == Method::Head; + // Remember if the request is a `HEAD` request for later body stripping. + let was_head_request = request.method() == Method::Head; - // Route the request and run the user's handlers. - let mut response = self.route_and_process(request, data); + // Route the request and run the user's handlers. + let mut response = self.route_and_process(request, data).await; - // Add a default 'Server' header if it isn't already there. - // TODO: If removing Hyper, write out `Date` header too. - if !response.headers().contains("Server") { - response.set_header(Header::new("Server", "Rocket")); + // Add a default 'Server' header if it isn't already there. + // TODO: If removing Hyper, write out `Date` header too. + if !response.headers().contains("Server") { + response.set_header(Header::new("Server", "Rocket")); + } + + // Run the response fairings. + self.fairings.handle_response(request, &mut response); + + // Strip the body if this is a `HEAD` request. + if was_head_request { + response.strip_body(); + } + + response } - - // Run the response fairings. - self.fairings.handle_response(request, &mut response); - - // Strip the body if this is a `HEAD` request. - if was_head_request { - response.strip_body(); - } - - response } /// Route the request and process the outcome to eventually get a response. - fn route_and_process<'s, 'r>( + fn route_and_process<'s, 'r: 's>( &'s self, request: &'r Request<'s>, data: Data - ) -> Response<'r> { - let mut response = match self.route(request, data) { - Outcome::Success(response) => response, - Outcome::Forward(data) => { - // There was no matching route. Autohandle `HEAD` requests. - if request.method() == Method::Head { - info_!("Autohandling {} request.", Paint::default("HEAD").bold()); + ) -> impl Future> + Send + 's { + async move { + let mut response = match self.route(request, data).await { + Outcome::Success(response) => response, + Outcome::Forward(data) => { + // There was no matching route. Autohandle `HEAD` requests. + if request.method() == Method::Head { + info_!("Autohandling {} request.", Paint::default("HEAD").bold()); - // Dispatch the request again with Method `GET`. - request._set_method(Method::Get); + // Dispatch the request again with Method `GET`. + request._set_method(Method::Get); - // Return early so we don't set cookies twice. - return self.route_and_process(request, data); - } else { - // No match was found and it can't be autohandled. 404. - self.handle_error(Status::NotFound, request) + // Return early so we don't set cookies twice. + let try_next: Pin + Send>> = Box::pin(self.route_and_process(request, data)); + return try_next.await; + } else { + // No match was found and it can't be autohandled. 404. + self.handle_error(Status::NotFound, request).await + } } + Outcome::Failure(status) => self.handle_error(status, request).await, + }; + + // Set the cookies. Note that error responses will only include + // cookies set by the error handler. See `handle_error` for more. + for cookie in request.cookies().delta() { + response.adjoin_header(cookie); } - Outcome::Failure(status) => self.handle_error(status, request), - }; - // Set the cookies. Note that error responses will only include cookies - // set by the error handler. See `handle_error` for more. - for cookie in request.cookies().delta() { - response.adjoin_header(cookie); + response } - - response } /// Tries to find a `Responder` for a given `request`. It does this by @@ -256,87 +328,75 @@ impl RocketHyperService { // (ensuring `handler` takes an immutable borrow), any caller to `route` // should be able to supply an `&mut` and retain an `&` after the call. #[inline] - pub(crate) fn route<'s, 'r>( + pub(crate) fn route<'s, 'r: 's>( &'s self, request: &'r Request<'s>, mut data: Data, - ) -> handler::Outcome<'r> { - // Go through the list of matching routes until we fail or succeed. - let matches = self.router.route(request); - for route in matches { - // Retrieve and set the requests parameters. - info_!("Matched: {}", route); - request.set_route(route); + ) -> impl Future> + 's { + async move { + // Go through the list of matching routes until we fail or succeed. + let matches = self.router.route(request); + for route in matches { + // Retrieve and set the requests parameters. + info_!("Matched: {}", route); + request.set_route(route); - // Dispatch the request to the handler. - let outcome = route.handler.handle(request, data); + // Dispatch the request to the handler. + let outcome = route.handler.handle(request, data).await; - // Check if the request processing completed or if the request needs - // to be forwarded. If it does, continue the loop to try again. - info_!("{} {}", Paint::default("Outcome:").bold(), outcome); - match outcome { - o@Outcome::Success(_) | o@Outcome::Failure(_) => return o, - Outcome::Forward(unused_data) => data = unused_data, - }; + // Check if the request processing completed (Some) or if the request needs + // to be forwarded. If it does, continue the loop (None) to try again. + info_!("{} {}", Paint::default("Outcome:").bold(), outcome); + match outcome { + o@Outcome::Success(_) | o@Outcome::Failure(_) => return o, + Outcome::Forward(unused_data) => data = unused_data, + } + } + + error_!("No matching routes for {}.", request); + Outcome::Forward(data) } - - error_!("No matching routes for {}.", request); - Outcome::Forward(data) } // Finds the error catcher for the status `status` and executes it for the - // given request `req`; the cookies in `req` are reset to their original - // state before invoking the error handler. If a user has registered a - // catcher for `status`, the catcher is called. If the catcher fails to - // return a good response, the 500 catcher is executed. If there is no - // registered catcher for `status`, the default catcher is used. - pub(crate) fn handle_error<'r>( - &self, + // given request `req`. If a user has registered a catcher for `status`, the + // catcher is called. If the catcher fails to return a good response, the + // 500 catcher is executed. If there is no registered catcher for `status`, + // the default catcher is used. + pub(crate) fn handle_error<'s, 'r: 's>( + &'s self, status: Status, - req: &'r Request<'_> - ) -> Response<'r> { - warn_!("Responding with {} catcher.", Paint::red(&status)); + req: &'r Request<'s> + ) -> impl Future> + 's { + async move { + warn_!("Responding with {} catcher.", Paint::red(&status)); - // For now, we reset the delta state to prevent any modifications from - // earlier, unsuccessful paths from being reflected in error response. - // We may wish to relax this in the future. - req.cookies().reset_delta(); + // For now, we reset the delta state to prevent any modifications + // from earlier, unsuccessful paths from being reflected in error + // response. We may wish to relax this in the future. + req.cookies().reset_delta(); - // Try to get the active catcher but fallback to user's 500 catcher. - let catcher = self.catchers.get(&status.code).unwrap_or_else(|| { - error_!("No catcher found for {}. Using 500 catcher.", status); - self.catchers.get(&500).expect("500 catcher.") - }); + // Try to get the active catcher but fallback to user's 500 catcher. + let catcher = self.catchers.get(&status.code).unwrap_or_else(|| { + error_!("No catcher found for {}. Using 500 catcher.", status); + self.catchers.get(&500).expect("500 catcher.") + }); - // Dispatch to the user's catcher. If it fails, use the default 500. - catcher.handle(req).unwrap_or_else(|err_status| { - error_!("Catcher failed with status: {}!", err_status); - warn_!("Using default 500 error catcher."); - let default = self.default_catchers.get(&500).expect("Default 500"); - default.handle(req).expect("Default 500 response.") - }) + // Dispatch to the user's catcher. If it fails, use the default 500. + match catcher.handle(req).await { + Ok(r) => return r, + Err(err_status) => { + error_!("Catcher failed with status: {}!", err_status); + warn_!("Using default 500 error catcher."); + let default = self.default_catchers.get(&500).expect("Default 500"); + default.handle(req).await.expect("Default 500 response.") + } + } + } } } impl Rocket { - - #[inline] - pub(crate) fn dispatch<'s, 'r>( - &'s self, - request: &'r mut Request<'s>, - data: Data - ) -> Response<'r> { - unimplemented!("TODO") - } - - pub(crate) fn handle_error<'r>( - &self, - status: Status, - req: &'r Request - ) -> Response<'r> { - unimplemented!("TODO") - } - /// Create a new `Rocket` application using the configuration information in /// `Rocket.toml`. If the file does not exist or if there is an I/O error /// reading the file, the defaults, overridden by any environment-based @@ -528,7 +588,6 @@ impl Rocket { panic!("Invalid mount point."); } - let mut router = self.router.clone(); for mut route in routes.into() { let path = route.uri.clone(); if let Err(e) = route.set_uri(base_uri.clone(), path) { @@ -537,11 +596,9 @@ impl Rocket { } info_!("{}", route); - router.add(route); + self.router.add(route); } - self.router = router; - self } @@ -576,8 +633,6 @@ impl Rocket { pub fn register(mut self, catchers: Vec) -> Self { info!("{}{}", Paint::emoji("👾 "), Paint::magenta("Catchers:")); - let mut current_catchers = self.catchers.clone(); - for c in catchers { if self.catchers.get(&c.code).map_or(false, |e| !e.is_default) { info_!("{} {}", c, Paint::yellow("(warning: duplicate catcher!)")); @@ -585,11 +640,9 @@ impl Rocket { info_!("{}", c); } - current_catchers.insert(c.code, c); + self.catchers.insert(c.code, c); } - self.catchers = current_catchers; - self } @@ -706,6 +759,8 @@ impl Rocket { /// # } /// ``` pub fn launch(mut self) -> LaunchError { + #[cfg(feature = "tls")] use crate::http::tls; + self = match self.prelaunch_check() { Ok(rocket) => rocket, Err(launch_error) => return launch_error @@ -720,56 +775,29 @@ impl Rocket { .build() .expect("Cannot build runtime!"); - let threads = self.config.workers as usize; + let full_addr = format!("{}:{}", self.config.address, self.config.port); + let addrs = match full_addr.to_socket_addrs() { + Ok(a) => a.collect::>(), + // TODO.async: Reconsider this error type + Err(e) => return From::from(io::Error::new(io::ErrorKind::Other, e)), + }; - let full_addr = format!("{}:{}", self.config.address, self.config.port) - .to_socket_addrs() - .expect("A valid socket address") - .next() - .unwrap(); + // TODO.async: support for TLS, unix sockets. + // Likely will be implemented with a custom "Incoming" type. - let listener = match TcpListener::bind(&full_addr) { - Ok(listener) => listener, + let mut incoming = match hyper::AddrIncoming::bind(&addrs[0]) { + Ok(incoming) => incoming, Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)), }; // Determine the address and port we actually binded to. - match listener.local_addr() { - Ok(server_addr) => /* TODO self.config.port = */ server_addr.port(), - Err(e) => return LaunchError::from(e), - }; + self.config.port = incoming.local_addr().port(); - let proto; - let incoming; + let proto = "http://"; - #[cfg(feature = "tls")] - { - // TODO.async: Can/should we make the clone unnecessary (by reference, or by moving out?) - if let Some(tls) = self.config.tls.clone() { - proto = "https://"; - let mut config = tls::rustls::ServerConfig::new(tls::rustls::NoClientAuth::new()); - config.set_single_cert(tls.certs, tls.key).expect("invalid key or certificate"); - - // TODO.async: I once observed an unhandled AlertReceived(UnknownCA) but - // have no idea what happened and cannot reproduce. - let config = TlsAcceptor::from(Arc::new(config)); - - incoming = Box::new(listener.incoming().and_then(move |stream| { - config.accept(stream) - .map(|stream| Box::new(stream)) - })); - } - else { - proto = "http://"; - incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); - } - } - - #[cfg(not(feature = "tls"))] - { - proto = "http://"; - incoming = Box::new(listener.incoming().map(|stream| Box::new(stream))); - } + // Set the keep-alive. + let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64)); + incoming.set_keepalive(timeout); // Freeze managed state for synchronization-free accesses later. self.state.freeze(); @@ -786,17 +814,25 @@ impl Rocket { // Restore the log level back to what it originally was. logger::pop_max_level(); - let arcs = RocketArcs::from(self); + let rocket = Arc::new(self); + let spawn = Box::new(runtime.executor().compat()); + let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| { + futures::future::ok::<_, Box>(RocketHyperService { + rocket: rocket.clone(), + spawn: spawn.clone(), + remote_addr: socket.remote_addr(), + }).compat() + }); // NB: executor must be passed manually here, see hyperium/hyper#1537 let server = hyper::Server::builder(incoming) .executor(runtime.executor()) - .serve(arcs); + .serve(service); // TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error> runtime.block_on(server).expect("TODO.async handle error"); - unreachable!("the call to `handle_threads` should block on success") + unreachable!("the call to `block_on` should block on success") } /// Returns an iterator over all of the routes mounted on this instance of @@ -881,47 +917,3 @@ impl Rocket { &self.config } } - -impl From for RocketArcs { - fn from(mut rocket: Rocket) -> Self { - RocketArcs { - config: Arc::new(rocket.config), - router: Arc::new(rocket.router), - default_catchers: Arc::new(rocket.default_catchers), - catchers: Arc::new(rocket.catchers), - state: Arc::new(rocket.state), - fairings: Arc::new(rocket.fairings), - } - } -} - -// TODO: consider try_from here? -impl<'a> From> for hyper::Response { - fn from(mut response: Response) -> Self { - - let mut builder = hyper::Response::builder(); - builder.status(hyper::StatusCode::from_u16(response.status().code).expect("")); - - for header in response.headers().iter() { - // FIXME: Using hyper here requires two allocations. - let name = hyper::HeaderName::from_str(&header.name.into_string()).unwrap(); - let value = hyper::HeaderValue::from_bytes(header.value.as_bytes()).unwrap(); - builder.header(name, value); - } - - match response.body() { - None => { - builder.body(hyper::Body::empty()) - }, - Some(Body::Sized(body, size)) => { - let mut buffer = Vec::with_capacity(size as usize); - body.read_to_end(&mut buffer); - builder.header(header::CONTENT_LENGTH, hyper::HeaderValue::from(size)); - builder.body(hyper::Body::from(buffer)) - }, - Some(Body::Chunked(mut body, chunk_size)) => { - unimplemented!() - } - }.unwrap() - } -} diff --git a/core/lib/src/router/mod.rs b/core/lib/src/router/mod.rs index 38700299..d423e962 100644 --- a/core/lib/src/router/mod.rs +++ b/core/lib/src/router/mod.rs @@ -3,6 +3,8 @@ mod route; use std::collections::hash_map::HashMap; +use futures::future::{Future, FutureExt}; + pub use self::route::Route; use crate::request::Request; @@ -12,11 +14,11 @@ use crate::http::Method; type Selector = Method; // A handler to use when one is needed temporarily. -pub(crate) fn dummy_handler<'r>(r: &'r crate::Request<'_>, _: crate::Data) -> crate::handler::Outcome<'r> { - crate::Outcome::from(r, ()) +pub(crate) fn dummy_handler<'r>(r: &'r Request<'_>, _: crate::Data) -> std::pin::Pin> + Send + 'r>> { + futures::future::ready(crate::Outcome::from(r, ())).boxed() } -#[derive(Default, Clone)] +#[derive(Default)] pub struct Router { routes: HashMap>, } diff --git a/core/lib/tests/absolute-uris-okay-issue-443.rs b/core/lib/tests/absolute-uris-okay-issue-443.rs index 758d1d72..7f57295e 100644 --- a/core/lib/tests/absolute-uris-okay-issue-443.rs +++ b/core/lib/tests/absolute-uris-okay-issue-443.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/core/lib/tests/conditionally-set-server-header-996.rs b/core/lib/tests/conditionally-set-server-header-996.rs index f20f18a1..a6f8fac5 100644 --- a/core/lib/tests/conditionally-set-server-header-996.rs +++ b/core/lib/tests/conditionally-set-server-header-996.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/core/lib/tests/derive-reexports.rs b/core/lib/tests/derive-reexports.rs index 3d3ac70e..18513acb 100644 --- a/core/lib/tests/derive-reexports.rs +++ b/core/lib/tests/derive-reexports.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] use rocket; @@ -51,8 +51,8 @@ fn test_derive_reexports() { let client = Client::new(rocket).unwrap(); let mut response = client.get("/").dispatch(); - assert_eq!(response.body_string().unwrap(), "hello"); + assert_eq!(response.body_string_wait().unwrap(), "hello"); let mut response = client.get("/?thing=b").dispatch(); - assert_eq!(response.body_string().unwrap(), "b"); + assert_eq!(response.body_string_wait().unwrap(), "b"); } diff --git a/core/lib/tests/fairing_before_head_strip-issue-546.rs b/core/lib/tests/fairing_before_head_strip-issue-546.rs index 546e7a78..648046ac 100644 --- a/core/lib/tests/fairing_before_head_strip-issue-546.rs +++ b/core/lib/tests/fairing_before_head_strip-issue-546.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -35,13 +35,15 @@ mod fairing_before_head_strip { })) .attach(AdHoc::on_response("Check HEAD 2", |req, res| { assert_eq!(req.method(), Method::Head); - assert_eq!(res.body_string(), Some(RESPONSE_STRING.into())); + // TODO.async: Needs async on_response fairings + // assert_eq!(res.body_string().await, Some(RESPONSE_STRING.into())); })); let client = Client::new(rocket).unwrap(); let mut response = client.head("/").dispatch(); assert_eq!(response.status(), Status::Ok); - assert!(response.body().is_none()); + // TODO.async: See above + // assert!(response.body().is_none()); } #[test] @@ -62,12 +64,14 @@ mod fairing_before_head_strip { })) .attach(AdHoc::on_response("Check GET", |req, res| { assert_eq!(req.method(), Method::Get); - assert_eq!(res.body_string(), Some(RESPONSE_STRING.into())); + // TODO.async: Needs async on_response fairings + // assert_eq!(res.body_string().await, Some(RESPONSE_STRING.into())); })); let client = Client::new(rocket).unwrap(); let mut response = client.head("/").dispatch(); assert_eq!(response.status(), Status::Ok); - assert!(response.body().is_none()); + // TODO.async: See above + // assert!(response.body().is_none()); } } diff --git a/core/lib/tests/flash-lazy-removes-issue-466.rs b/core/lib/tests/flash-lazy-removes-issue-466.rs index 584c97db..9a3c73be 100644 --- a/core/lib/tests/flash-lazy-removes-issue-466.rs +++ b/core/lib/tests/flash-lazy-removes-issue-466.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -49,7 +49,7 @@ mod flash_lazy_remove_tests { // Now use it. let mut response = client.get("/use").dispatch(); - assert_eq!(response.body_string(), Some(FLASH_MESSAGE.into())); + assert_eq!(response.body_string_wait(), Some(FLASH_MESSAGE.into())); // Now it should be gone. let response = client.get("/unused").dispatch(); diff --git a/core/lib/tests/form_method-issue-45.rs b/core/lib/tests/form_method-issue-45.rs index 5acaff82..b65230e8 100644 --- a/core/lib/tests/form_method-issue-45.rs +++ b/core/lib/tests/form_method-issue-45.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -28,7 +28,7 @@ mod tests { .body("_method=patch&form_data=Form+data") .dispatch(); - assert_eq!(response.body_string(), Some("OK".into())); + assert_eq!(response.body_string_wait(), Some("OK".into())); } #[test] diff --git a/core/lib/tests/form_value_decoding-issue-82.rs b/core/lib/tests/form_value_decoding-issue-82.rs index 2780eeed..b6fc477f 100644 --- a/core/lib/tests/form_value_decoding-issue-82.rs +++ b/core/lib/tests/form_value_decoding-issue-82.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -28,7 +28,7 @@ mod tests { .dispatch(); assert_eq!(response.status(), Status::Ok); - assert_eq!(Some(decoded.to_string()), response.body_string()); + assert_eq!(Some(decoded.to_string()), response.body_string_wait()); } #[test] diff --git a/core/lib/tests/head_handling.rs b/core/lib/tests/head_handling.rs index 5e3dd96d..e5a48877 100644 --- a/core/lib/tests/head_handling.rs +++ b/core/lib/tests/head_handling.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> { mod head_handling_tests { use super::*; - use std::io::Read; + use futures::io::AsyncReadExt; use rocket::Route; use rocket::local::Client; @@ -33,13 +33,15 @@ mod head_handling_tests { routes![index, empty, other] } - fn assert_empty_sized_body(body: Body, expected_size: u64) { + fn assert_empty_sized_body(body: Body, expected_size: u64) { match body { Body::Sized(mut body, size) => { let mut buffer = vec![]; - let n = body.read_to_end(&mut buffer).unwrap(); + futures::executor::block_on(async { + body.read_to_end(&mut buffer).await.unwrap(); + }); assert_eq!(size, expected_size); - assert_eq!(n, 0); + assert_eq!(buffer.len(), 0); } _ => panic!("Expected a sized body.") } @@ -57,7 +59,7 @@ mod head_handling_tests { let mut response = client.head("/empty").dispatch(); assert_eq!(response.status(), Status::NoContent); - assert!(response.body_bytes().is_none()); + assert!(response.body_bytes_wait().is_none()); } #[test] diff --git a/core/lib/tests/limits.rs b/core/lib/tests/limits.rs index 9e23abb1..7f3dca65 100644 --- a/core/lib/tests/limits.rs +++ b/core/lib/tests/limits.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -36,7 +36,7 @@ mod limits_tests { .header(ContentType::Form) .dispatch(); - assert_eq!(response.body_string(), Some("Hello world".into())); + assert_eq!(response.body_string_wait(), Some("Hello world".into())); } #[test] @@ -47,7 +47,7 @@ mod limits_tests { .header(ContentType::Form) .dispatch(); - assert_eq!(response.body_string(), Some("Hello world".into())); + assert_eq!(response.body_string_wait(), Some("Hello world".into())); } #[test] @@ -69,6 +69,6 @@ mod limits_tests { .header(ContentType::Form) .dispatch(); - assert_eq!(response.body_string(), Some("Hell".into())); + assert_eq!(response.body_string_wait(), Some("Hell".into())); } } diff --git a/core/lib/tests/local-request-content-type-issue-505.rs b/core/lib/tests/local-request-content-type-issue-505.rs index 4803e929..8d2f9344 100644 --- a/core/lib/tests/local-request-content-type-issue-505.rs +++ b/core/lib/tests/local-request-content-type-issue-505.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -25,12 +25,12 @@ use rocket::data::{self, FromDataSimple}; impl FromDataSimple for HasContentType { type Error = (); - fn from_data(request: &Request, data: Data) -> data::Outcome { - if request.content_type().is_some() { + fn from_data(request: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, Self::Error> { + Box::pin(futures::future::ready(if request.content_type().is_some() { Success(HasContentType) } else { Forward(data) - } + })) } } @@ -65,14 +65,14 @@ mod local_request_content_type_tests { let client = Client::new(rocket()).unwrap(); let mut req = client.post("/"); - assert_eq!(req.clone().dispatch().body_string(), Some("Absent".to_string())); - assert_eq!(req.mut_dispatch().body_string(), Some("Absent".to_string())); - assert_eq!(req.dispatch().body_string(), Some("Absent".to_string())); + assert_eq!(req.clone().dispatch().body_string_wait(), Some("Absent".to_string())); + assert_eq!(req.mut_dispatch().body_string_wait(), Some("Absent".to_string())); + assert_eq!(req.dispatch().body_string_wait(), Some("Absent".to_string())); let mut req = client.post("/data"); - assert_eq!(req.clone().dispatch().body_string(), Some("Data Absent".to_string())); - assert_eq!(req.mut_dispatch().body_string(), Some("Data Absent".to_string())); - assert_eq!(req.dispatch().body_string(), Some("Data Absent".to_string())); + assert_eq!(req.clone().dispatch().body_string_wait(), Some("Data Absent".to_string())); + assert_eq!(req.mut_dispatch().body_string_wait(), Some("Data Absent".to_string())); + assert_eq!(req.dispatch().body_string_wait(), Some("Data Absent".to_string())); } #[test] @@ -80,13 +80,13 @@ mod local_request_content_type_tests { let client = Client::new(rocket()).unwrap(); let mut req = client.post("/").header(ContentType::JSON); - assert_eq!(req.clone().dispatch().body_string(), Some("Present".to_string())); - assert_eq!(req.mut_dispatch().body_string(), Some("Present".to_string())); - assert_eq!(req.dispatch().body_string(), Some("Present".to_string())); + assert_eq!(req.clone().dispatch().body_string_wait(), Some("Present".to_string())); + assert_eq!(req.mut_dispatch().body_string_wait(), Some("Present".to_string())); + assert_eq!(req.dispatch().body_string_wait(), Some("Present".to_string())); let mut req = client.post("/data").header(ContentType::JSON); - assert_eq!(req.clone().dispatch().body_string(), Some("Data Present".to_string())); - assert_eq!(req.mut_dispatch().body_string(), Some("Data Present".to_string())); - assert_eq!(req.dispatch().body_string(), Some("Data Present".to_string())); + assert_eq!(req.clone().dispatch().body_string_wait(), Some("Data Present".to_string())); + assert_eq!(req.mut_dispatch().body_string_wait(), Some("Data Present".to_string())); + assert_eq!(req.dispatch().body_string_wait(), Some("Data Present".to_string())); } } diff --git a/core/lib/tests/local_request_private_cookie-issue-368.rs b/core/lib/tests/local_request_private_cookie-issue-368.rs index deba440e..247ec6c2 100644 --- a/core/lib/tests/local_request_private_cookie-issue-368.rs +++ b/core/lib/tests/local_request_private_cookie-issue-368.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] #[cfg(feature = "private-cookies")] @@ -30,7 +30,7 @@ mod private_cookie_test { let req = client.get("/").private_cookie(Cookie::new("cookie_name", "cookie_value")); let mut response = req.dispatch(); - assert_eq!(response.body_string(), Some("cookie_value".into())); + assert_eq!(response.body_string_wait(), Some("cookie_value".into())); assert_eq!(response.headers().get_one("Set-Cookie"), None); } diff --git a/core/lib/tests/nested-fairing-attaches.rs b/core/lib/tests/nested-fairing-attaches.rs index 19137f4f..ba1554cd 100644 --- a/core/lib/tests/nested-fairing-attaches.rs +++ b/core/lib/tests/nested-fairing-attaches.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -47,14 +47,14 @@ mod nested_fairing_attaches_tests { fn test_counts() { let client = Client::new(rocket()).unwrap(); let mut response = client.get("/").dispatch(); - assert_eq!(response.body_string(), Some("1, 1".into())); + assert_eq!(response.body_string_wait(), Some("1, 1".into())); let mut response = client.get("/").dispatch(); - assert_eq!(response.body_string(), Some("1, 2".into())); + assert_eq!(response.body_string_wait(), Some("1, 2".into())); client.get("/").dispatch(); client.get("/").dispatch(); let mut response = client.get("/").dispatch(); - assert_eq!(response.body_string(), Some("1, 5".into())); + assert_eq!(response.body_string_wait(), Some("1, 5".into())); } } diff --git a/core/lib/tests/precise-content-type-matching.rs b/core/lib/tests/precise-content-type-matching.rs index da7349c6..f1c651dd 100644 --- a/core/lib/tests/precise-content-type-matching.rs +++ b/core/lib/tests/precise-content-type-matching.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -45,7 +45,7 @@ mod tests { } let mut response = req.dispatch(); - let body_str = response.body_string(); + let body_str = response.body_string_wait(); let body: Option<&'static str> = $body; match body { Some(string) => assert_eq!(body_str, Some(string.to_string())), diff --git a/core/lib/tests/redirect_from_catcher-issue-113.rs b/core/lib/tests/redirect_from_catcher-issue-113.rs index f50f2ba3..15c46f27 100644 --- a/core/lib/tests/redirect_from_catcher-issue-113.rs +++ b/core/lib/tests/redirect_from_catcher-issue-113.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/core/lib/tests/responder_lifetime-issue-345.rs b/core/lib/tests/responder_lifetime-issue-345.rs index b35cddde..8ca2964a 100644 --- a/core/lib/tests/responder_lifetime-issue-345.rs +++ b/core/lib/tests/responder_lifetime-issue-345.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #![allow(dead_code)] // This test is only here so that we can ensure it compiles. #[macro_use] extern crate rocket; diff --git a/core/lib/tests/route_guard.rs b/core/lib/tests/route_guard.rs index 64bfe8f7..d914cc65 100644 --- a/core/lib/tests/route_guard.rs +++ b/core/lib/tests/route_guard.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -17,7 +17,7 @@ mod route_guard_tests { fn assert_path(client: &Client, path: &str) { let mut res = client.get(path).dispatch(); - assert_eq!(res.body_string(), Some(path.into())); + assert_eq!(res.body_string_wait(), Some(path.into())); } #[test] diff --git a/core/lib/tests/segments-issues-41-86.rs b/core/lib/tests/segments-issues-41-86.rs index f9bd50a8..0c32d3af 100644 --- a/core/lib/tests/segments-issues-41-86.rs +++ b/core/lib/tests/segments-issues-41-86.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -48,7 +48,7 @@ mod tests { { let path = "this/is/the/path/we/want"; let mut response = client.get(format!("{}/{}", prefix, path)).dispatch(); - assert_eq!(response.body_string(), Some(path.into())); + assert_eq!(response.body_string_wait(), Some(path.into())); } } } diff --git a/core/lib/tests/strict_and_lenient_forms.rs b/core/lib/tests/strict_and_lenient_forms.rs index 4ba5300d..6643b622 100644 --- a/core/lib/tests/strict_and_lenient_forms.rs +++ b/core/lib/tests/strict_and_lenient_forms.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -40,7 +40,7 @@ mod strict_and_lenient_forms_tests { .dispatch(); assert_eq!(response.status(), Status::Ok); - assert_eq!(response.body_string(), Some(FIELD_VALUE.into())); + assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into())); let response = client.post("/strict") .header(ContentType::Form) @@ -59,7 +59,7 @@ mod strict_and_lenient_forms_tests { .dispatch(); assert_eq!(response.status(), Status::Ok); - assert_eq!(response.body_string(), Some(FIELD_VALUE.into())); + assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into())); let mut response = client.post("/lenient") .header(ContentType::Form) @@ -67,6 +67,6 @@ mod strict_and_lenient_forms_tests { .dispatch(); assert_eq!(response.status(), Status::Ok); - assert_eq!(response.body_string(), Some(FIELD_VALUE.into())); + assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into())); } } diff --git a/core/lib/tests/uri-percent-encoding-issue-808.rs b/core/lib/tests/uri-percent-encoding-issue-808.rs index b46cc892..f8c88597 100644 --- a/core/lib/tests/uri-percent-encoding-issue-808.rs +++ b/core/lib/tests/uri-percent-encoding-issue-808.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; @@ -54,6 +54,6 @@ mod tests { let name = Uri::percent_encode(NAME); let mut response = client.get(format!("/hello/{}", name)).dispatch(); assert_eq!(response.status(), Status::Ok); - assert_eq!(response.body_string().unwrap(), format!("Hello, {}!", NAME)); + assert_eq!(response.body_string_wait().unwrap(), format!("Hello, {}!", NAME)); } } diff --git a/examples/config/tests/development.rs b/examples/config/tests/development.rs index 1b5753bc..53dd3ae2 100644 --- a/examples/config/tests/development.rs +++ b/examples/config/tests/development.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/config/tests/production.rs b/examples/config/tests/production.rs index 7208a7a4..262c29ff 100644 --- a/examples/config/tests/production.rs +++ b/examples/config/tests/production.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/config/tests/staging.rs b/examples/config/tests/staging.rs index fc88fee8..e33a0517 100644 --- a/examples/config/tests/staging.rs +++ b/examples/config/tests/staging.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/content_types/src/main.rs b/examples/content_types/src/main.rs index 83b9eda7..d6c1e1a4 100644 --- a/examples/content_types/src/main.rs +++ b/examples/content_types/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate serde_derive; diff --git a/examples/cookies/src/main.rs b/examples/cookies/src/main.rs index 1232def3..a53740c1 100644 --- a/examples/cookies/src/main.rs +++ b/examples/cookies/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/errors/src/main.rs b/examples/errors/src/main.rs index 3aa27066..1f5a24c1 100644 --- a/examples/errors/src/main.rs +++ b/examples/errors/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/fairings/src/main.rs b/examples/fairings/src/main.rs index d1a9e44c..7b7ca500 100644 --- a/examples/fairings/src/main.rs +++ b/examples/fairings/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/form_kitchen_sink/src/main.rs b/examples/form_kitchen_sink/src/main.rs index af494eaf..c73a9376 100644 --- a/examples/form_kitchen_sink/src/main.rs +++ b/examples/form_kitchen_sink/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/form_validation/src/main.rs b/examples/form_validation/src/main.rs index 0495409b..bddca2ae 100644 --- a/examples/form_validation/src/main.rs +++ b/examples/form_validation/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/handlebars_templates/src/main.rs b/examples/handlebars_templates/src/main.rs index e2b29535..def582db 100644 --- a/examples/handlebars_templates/src/main.rs +++ b/examples/handlebars_templates/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate serde_derive; diff --git a/examples/hello_2015/src/main.rs b/examples/hello_2015/src/main.rs index 1001fb52..e77a36a0 100644 --- a/examples/hello_2015/src/main.rs +++ b/examples/hello_2015/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/hello_2018/src/main.rs b/examples/hello_2018/src/main.rs index d8d7c473..abe16b44 100644 --- a/examples/hello_2018/src/main.rs +++ b/examples/hello_2018/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[cfg(test)] mod tests; diff --git a/examples/hello_person/src/main.rs b/examples/hello_person/src/main.rs index 96c4ae2e..0bd7c999 100644 --- a/examples/hello_person/src/main.rs +++ b/examples/hello_person/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/hello_world/src/main.rs b/examples/hello_world/src/main.rs index 6c1111f4..de05d854 100644 --- a/examples/hello_world/src/main.rs +++ b/examples/hello_world/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/json/src/main.rs b/examples/json/src/main.rs index 65698c94..20bcb71e 100644 --- a/examples/json/src/main.rs +++ b/examples/json/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate rocket_contrib; diff --git a/examples/managed_queue/src/main.rs b/examples/managed_queue/src/main.rs index d0588bc6..9ca56264 100644 --- a/examples/managed_queue/src/main.rs +++ b/examples/managed_queue/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/msgpack/src/main.rs b/examples/msgpack/src/main.rs index 2af8c51d..d500ac21 100644 --- a/examples/msgpack/src/main.rs +++ b/examples/msgpack/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate serde_derive; diff --git a/examples/optional_redirect/src/main.rs b/examples/optional_redirect/src/main.rs index 8c81c24a..c3b0c008 100644 --- a/examples/optional_redirect/src/main.rs +++ b/examples/optional_redirect/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/pastebin/src/main.rs b/examples/pastebin/src/main.rs index c561c000..12a34e60 100644 --- a/examples/pastebin/src/main.rs +++ b/examples/pastebin/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/query_params/src/main.rs b/examples/query_params/src/main.rs index c2ba93af..332c607f 100644 --- a/examples/query_params/src/main.rs +++ b/examples/query_params/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/ranking/src/main.rs b/examples/ranking/src/main.rs index 2267fc3d..26e1700d 100644 --- a/examples/ranking/src/main.rs +++ b/examples/ranking/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/raw_sqlite/src/main.rs b/examples/raw_sqlite/src/main.rs index 4daad628..60277f99 100644 --- a/examples/raw_sqlite/src/main.rs +++ b/examples/raw_sqlite/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/raw_upload/src/main.rs b/examples/raw_upload/src/main.rs index 9ea67c59..31614276 100644 --- a/examples/raw_upload/src/main.rs +++ b/examples/raw_upload/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/redirect/src/main.rs b/examples/redirect/src/main.rs index cfd7db27..3df8f7bf 100644 --- a/examples/redirect/src/main.rs +++ b/examples/redirect/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/request_guard/src/main.rs b/examples/request_guard/src/main.rs index 48efc937..6ff46d76 100644 --- a/examples/request_guard/src/main.rs +++ b/examples/request_guard/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/request_local_state/src/main.rs b/examples/request_local_state/src/main.rs index f2a3ccec..27092881 100644 --- a/examples/request_local_state/src/main.rs +++ b/examples/request_local_state/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/session/src/main.rs b/examples/session/src/main.rs index c4de5208..416b2a06 100644 --- a/examples/session/src/main.rs +++ b/examples/session/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/state/src/main.rs b/examples/state/src/main.rs index 9bd48352..5109da79 100644 --- a/examples/state/src/main.rs +++ b/examples/state/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs index 20499be4..8a045514 100644 --- a/examples/stream/src/main.rs +++ b/examples/stream/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/tera_templates/src/main.rs b/examples/tera_templates/src/main.rs index 83c53122..061ef2b8 100644 --- a/examples/tera_templates/src/main.rs +++ b/examples/tera_templates/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate serde_derive; diff --git a/examples/testing/src/main.rs b/examples/testing/src/main.rs index 647d76b7..9027cda3 100644 --- a/examples/testing/src/main.rs +++ b/examples/testing/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/tls/src/main.rs b/examples/tls/src/main.rs index 6c1111f4..de05d854 100644 --- a/examples/tls/src/main.rs +++ b/examples/tls/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; diff --git a/examples/todo/src/main.rs b/examples/todo/src/main.rs index 318a13cd..1e3aca37 100644 --- a/examples/todo/src/main.rs +++ b/examples/todo/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate diesel; diff --git a/examples/uuid/src/main.rs b/examples/uuid/src/main.rs index 38639da9..0a0bb271 100644 --- a/examples/uuid/src/main.rs +++ b/examples/uuid/src/main.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro_hygiene)] +#![feature(proc_macro_hygiene, async_await)] #[macro_use] extern crate rocket; #[macro_use] extern crate lazy_static; diff --git a/site/tests/src/lib.rs b/site/tests/src/lib.rs index d62512ca..9619d661 100644 --- a/site/tests/src/lib.rs +++ b/site/tests/src/lib.rs @@ -1,3 +1,3 @@ -#![feature(external_doc)] +// #![feature(external_doc)] -rocket::rocket_internal_guide_tests!("../guide/*.md"); +// rocket::rocket_internal_guide_tests!("../guide/*.md");