Convert core to async and add support for async routes.

Minimum rustc bump required for rust-lang/rust#61775
This commit is contained in:
Jeb Rosen 2019-06-30 09:45:17 -07:00 committed by Sergio Benitez
parent 96b4142156
commit 5d439bafc0
90 changed files with 1008 additions and 911 deletions

View File

@ -51,7 +51,7 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
let status_code = status.0.code;
// Variables names we'll use and reuse.
define_vars_and_mods!(req, catcher, response, Request, Response);
define_vars_and_mods!(req, catcher, Request, Response, ErrorHandlerFuture);
// Determine the number of parameters that will be passed in.
let (fn_sig, inputs) = match catch.function.sig.inputs.len() {
@ -84,12 +84,14 @@ pub fn _catch(args: TokenStream, input: TokenStream) -> Result<TokenStream> {
/// Rocket code generated wrapping catch function.
#[doc(hidden)]
#vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #response::Result<'_b> {
let __response = #catcher_response;
#Response::build()
.status(#status)
.merge(__response)
.ok()
#vis fn #generated_fn_name<'_b>(#req: &'_b #Request) -> #ErrorHandlerFuture<'_b> {
Box::pin(async move {
let __response = #catcher_response;
#Response::build()
.status(#status)
.merge(__response)
.ok()
})
}
/// Rocket code generated static catcher info.

View File

@ -178,7 +178,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 {
define_vars_and_mods!(req, data, FromData, Outcome, Transform);
let span = ident.span().unstable().join(ty.span()).unwrap().into();
quote_spanned! { span =>
let __transform = <#ty as #FromData>::transform(#req, #data);
let __transform = <#ty as #FromData>::transform(#req, #data).await;
#[allow(unreachable_patterns, unreachable_code)]
let __outcome = match __transform {
@ -195,7 +195,7 @@ fn data_expr(ident: &syn::Ident, ty: &syn::Type) -> TokenStream2 {
};
#[allow(non_snake_case, unreachable_patterns, unreachable_code)]
let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome) {
let #ident: #ty = match <#ty as #FromData>::from_data(#req, __outcome).await {
#Outcome::Success(__d) => __d,
#Outcome::Forward(__d) => return #Outcome::Forward(__d),
#Outcome::Failure((__c, _)) => return #Outcome::Failure(__c),
@ -369,8 +369,18 @@ fn generate_respond_expr(route: &Route) -> TokenStream2 {
let parameter_names = route.inputs.iter()
.map(|(_, rocket_ident, _)| rocket_ident);
let responder_stmt = if route.function.sig.asyncness.is_some() {
quote_spanned! { ret_span =>
let ___responder = #user_handler_fn_name(#(#parameter_names),*).await;
}
} else {
quote_spanned! { ret_span =>
let ___responder = #user_handler_fn_name(#(#parameter_names),*);
}
};
quote_spanned! { ret_span =>
let ___responder = #user_handler_fn_name(#(#parameter_names),*);
#responder_stmt
#handler::Outcome::from(#req, ___responder)
}
}
@ -403,7 +413,7 @@ fn codegen_route(route: Route) -> Result<TokenStream> {
}
// Gather everything we need.
define_vars_and_mods!(req, data, handler, Request, Data, StaticRouteInfo);
define_vars_and_mods!(req, data, Request, Data, StaticRouteInfo, HandlerFuture);
let (vis, user_handler_fn) = (&route.function.vis, &route.function);
let user_handler_fn_name = &user_handler_fn.sig.ident;
let generated_fn_name = user_handler_fn_name.prepend(ROUTE_FN_PREFIX);
@ -424,12 +434,14 @@ fn codegen_route(route: Route) -> Result<TokenStream> {
#vis fn #generated_fn_name<'_b>(
#req: &'_b #Request,
#data: #Data
) -> #handler::Outcome<'_b> {
#(#req_guard_definitions)*
#(#parameter_definitions)*
#data_stmt
) -> #HandlerFuture<'_b> {
Box::pin(async move {
#(#req_guard_definitions)*
#(#parameter_definitions)*
#data_stmt
#generated_respond_expr
#generated_respond_expr
})
}
/// Rocket code generated wrapping URI macro.

View File

@ -1,4 +1,5 @@
#![feature(proc_macro_diagnostic, proc_macro_span)]
#![feature(async_await)]
#![recursion_limit="128"]
#![doc(html_root_url = "https://api.rocket.rs/v0.5")]
@ -96,12 +97,8 @@ vars_and_mods! {
Data => rocket::Data,
StaticRouteInfo => rocket::StaticRouteInfo,
SmallVec => rocket::http::private::SmallVec,
_Option => ::std::option::Option,
_Result => ::std::result::Result,
_Some => ::std::option::Option::Some,
_None => ::std::option::Option::None,
_Ok => ::std::result::Result::Ok,
_Err => ::std::result::Result::Err,
HandlerFuture => rocket::handler::HandlerFuture,
ErrorHandlerFuture => rocket::handler::ErrorHandlerFuture,
}
macro_rules! define_vars_and_mods {

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
// Rocket sometimes generates mangled identifiers that activate the
// non_snake_case lint. We deny the lint in this test to ensure that

View File

@ -16,27 +16,22 @@ edition = "2018"
[features]
default = []
tls = ["rustls", "hyper-sync-rustls"]
tls = ["tokio-rustls"]
private-cookies = ["cookie/private", "cookie/key-expansion"]
[dependencies]
smallvec = "1.0"
percent-encoding = "1"
hyper = { version = "0.12.31", default-features = false, features = ["tokio"] }
hyper = { version = "0.12.31", default-features = false, features = ["runtime"] }
http = "0.1.17"
mime = "0.3.13"
time = "0.2.11"
indexmap = "1.0"
rustls = { version = ">=0.16, <=0.17", optional = true }
state = "0.4"
tokio-rustls = { version = "0.10.3", optional = true }
cookie = { version = "0.14.0", features = ["percent-encode"] }
pear = "0.1"
unicode-xid = "0.2"
[dependencies.hyper-sync-rustls]
version = ">=0.3.0-rc.6, <=0.3.0-rc.17"
features = ["server"]
optional = true
[dev-dependencies]
rocket = { version = "0.5.0-dev", path = "../lib" }

View File

@ -1,5 +1,4 @@
use std::fmt;
use std::cell::RefMut;
use crate::Header;
use cookie::Delta;
@ -129,7 +128,7 @@ mod key {
/// 32`.
pub enum Cookies<'a> {
#[doc(hidden)]
Jarred(RefMut<'a, CookieJar>, &'a Key),
Jarred(CookieJar, &'a Key, Box<dyn FnOnce(CookieJar) + Send + 'a>),
#[doc(hidden)]
Empty(CookieJar)
}
@ -138,8 +137,8 @@ impl<'a> Cookies<'a> {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[inline]
#[doc(hidden)]
pub fn new(jar: RefMut<'a, CookieJar>, key: &'a Key) -> Cookies<'a> {
Cookies::Jarred(jar, key)
pub fn new<F: FnOnce(CookieJar) + Send + 'a>(jar: CookieJar, key: &'a Key, on_drop: F) -> Cookies<'a> {
Cookies::Jarred(jar, key, Box::new(on_drop))
}
/// WARNING: This is unstable! Do not use this method outside of Rocket!
@ -161,7 +160,7 @@ impl<'a> Cookies<'a> {
#[inline]
#[doc(hidden)]
pub fn add_original(&mut self, cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, _) = *self {
if let Cookies::Jarred(ref mut jar, _, _) = *self {
jar.add_original(cookie)
}
}
@ -181,7 +180,7 @@ impl<'a> Cookies<'a> {
/// ```
pub fn get(&self, name: &str) -> Option<&Cookie<'static>> {
match *self {
Cookies::Jarred(ref jar, _) => jar.get(name),
Cookies::Jarred(ref jar, _, _) => jar.get(name),
Cookies::Empty(_) => None
}
}
@ -206,7 +205,7 @@ impl<'a> Cookies<'a> {
/// }
/// ```
pub fn add(&mut self, cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, _) = *self {
if let Cookies::Jarred(ref mut jar, _, _) = *self {
jar.add(cookie)
}
}
@ -232,7 +231,7 @@ impl<'a> Cookies<'a> {
/// }
/// ```
pub fn remove(&mut self, cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, _) = *self {
if let Cookies::Jarred(ref mut jar, _, _) = *self {
jar.remove(cookie)
}
}
@ -243,7 +242,7 @@ impl<'a> Cookies<'a> {
#[doc(hidden)]
pub fn reset_delta(&mut self) {
match *self {
Cookies::Jarred(ref mut jar, _) => jar.reset_delta(),
Cookies::Jarred(ref mut jar, ..) => jar.reset_delta(),
Cookies::Empty(ref mut jar) => jar.reset_delta()
}
}
@ -264,7 +263,7 @@ impl<'a> Cookies<'a> {
/// ```
pub fn iter(&self) -> impl Iterator<Item=&Cookie<'static>> {
match *self {
Cookies::Jarred(ref jar, _) => jar.iter(),
Cookies::Jarred(ref jar, _, _) => jar.iter(),
Cookies::Empty(ref jar) => jar.iter()
}
}
@ -274,12 +273,22 @@ impl<'a> Cookies<'a> {
#[doc(hidden)]
pub fn delta(&self) -> Delta<'_> {
match *self {
Cookies::Jarred(ref jar, _) => jar.delta(),
Cookies::Jarred(ref jar, _, _) => jar.delta(),
Cookies::Empty(ref jar) => jar.delta()
}
}
}
impl<'a> Drop for Cookies<'a> {
fn drop(&mut self) {
if let Cookies::Jarred(ref mut jar, _, ref mut on_drop) = *self {
let jar = std::mem::replace(jar, CookieJar::new());
let on_drop = std::mem::replace(on_drop, Box::new(|_| {}));
on_drop(jar);
}
}
}
#[cfg(feature = "private-cookies")]
impl Cookies<'_> {
/// Returns a reference to the `Cookie` inside this collection with the name
@ -302,7 +311,7 @@ impl Cookies<'_> {
/// ```
pub fn get_private(&mut self, name: &str) -> Option<Cookie<'static>> {
match *self {
Cookies::Jarred(ref mut jar, key) => jar.private(key).get(name),
Cookies::Jarred(ref mut jar, key, _) => jar.private(key).get(name),
Cookies::Empty(_) => None
}
}
@ -338,7 +347,7 @@ impl Cookies<'_> {
/// }
/// ```
pub fn add_private(&mut self, mut cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, key) = *self {
if let Cookies::Jarred(ref mut jar, key, _) = *self {
Cookies::set_private_defaults(&mut cookie);
jar.private(key).add(cookie)
}
@ -348,7 +357,7 @@ impl Cookies<'_> {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)]
pub fn add_original_private(&mut self, mut cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, key) = *self {
if let Cookies::Jarred(ref mut jar, key, _) = *self {
Cookies::set_private_defaults(&mut cookie);
jar.private(key).add_original(cookie)
}
@ -402,7 +411,7 @@ impl Cookies<'_> {
/// }
/// ```
pub fn remove_private(&mut self, mut cookie: Cookie<'static>) {
if let Cookies::Jarred(ref mut jar, key) = *self {
if let Cookies::Jarred(ref mut jar, key, _) = *self {
if cookie.path().is_none() {
cookie.set_path("/");
}
@ -415,7 +424,7 @@ impl Cookies<'_> {
impl fmt::Debug for Cookies<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Cookies::Jarred(ref jar, _) => jar.fmt(f),
Cookies::Jarred(ref jar, _, _) => jar.fmt(f),
Cookies::Empty(ref jar) => jar.fmt(f)
}
}

View File

@ -4,66 +4,44 @@
//! These types will, with certainty, be removed with time, but they reside here
//! while necessary.
#[doc(hidden)] pub use hyper::{Body, Request, Response};
#[doc(hidden)] pub use hyper::{Body, Request, Response, Server};
#[doc(hidden)] pub use hyper::body::Payload as Payload;
#[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::server::Server;
#[doc(hidden)] pub use hyper::service::{MakeService, Service};
#[doc(hidden)] pub use hyper::service::{make_service_fn, MakeService, Service};
#[doc(hidden)] pub use hyper::server::conn::{AddrIncoming, AddrStream};
#[doc(hidden)] pub use hyper::Chunk;
#[doc(hidden)] pub use http::header::HeaderMap;
#[doc(hidden)] pub use http::header::HeaderName as HeaderName;
#[doc(hidden)] pub use http::header::HeaderValue as HeaderValue;
#[doc(hidden)] pub use http::method::Method;
#[doc(hidden)] pub use http::request::Parts;
#[doc(hidden)] pub use http::request::Parts as RequestParts;
#[doc(hidden)] pub use http::response::Builder as ResponseBuilder;
#[doc(hidden)] pub use http::status::StatusCode;
#[doc(hidden)] pub use http::uri::Uri;
/// Type alias to `hyper::Response<'a, hyper::net::Fresh>`.
// TODO #[doc(hidden)] pub type FreshResponse<'a> = self::Response<'a, self::net::Fresh>;
/// Reexported Hyper header types.
/// Reexported http header types.
pub mod header {
use crate::Header;
macro_rules! import_hyper_items {
($($item:ident),*) => ($(pub use hyper::header::$item;)*)
}
macro_rules! import_hyper_headers {
macro_rules! import_http_headers {
($($name:ident),*) => ($(
pub use http::header::$name as $name;
)*)
}
// import_hyper_items! {
// Accept, AcceptCharset, AcceptEncoding, AcceptLanguage, AcceptRanges,
// AccessControlAllowCredentials, AccessControlAllowHeaders,
// AccessControlAllowMethods, AccessControlExposeHeaders,
// AccessControlMaxAge, AccessControlRequestHeaders,
// AccessControlRequestMethod, Allow, Authorization, Basic, Bearer,
// CacheControl, Connection, ContentDisposition, ContentEncoding,
// ContentLanguage, ContentLength, ContentRange, ContentType, Date, ETag,
// EntityTag, Expires, From, Headers, Host, HttpDate, IfModifiedSince,
// IfUnmodifiedSince, LastModified, Location, Origin, Prefer,
// PreferenceApplied, Protocol, Quality, QualityItem, Referer,
// StrictTransportSecurity, TransferEncoding, Upgrade, UserAgent,
// AccessControlAllowOrigin, ByteRangeSpec, CacheDirective, Charset,
// ConnectionOption, ContentRangeSpec, DispositionParam, DispositionType,
// Encoding, Expect, IfMatch, IfNoneMatch, IfRange, Pragma, Preference,
// ProtocolName, Range, RangeUnit, ReferrerPolicy, Vary, Scheme, q, qitem
// }
//
import_hyper_headers! {
ACCEPT, ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS,
import_http_headers! {
ACCEPT, ACCEPT_CHARSET, ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES,
ACCESS_CONTROL_ALLOW_CREDENTIALS, ACCESS_CONTROL_ALLOW_HEADERS,
ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE,
ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ACCEPT_CHARSET,
ACCEPT_ENCODING, ACCEPT_LANGUAGE, ACCEPT_RANGES, ALLOW, CACHE_CONTROL,
CONNECTION, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, DATE, ETAG, EXPECT, EXPIRES, HOST, IF_MATCH,
IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED,
LOCATION, ORIGIN, PRAGMA, RANGE, REFERER,
REFERRER_POLICY, STRICT_TRANSPORT_SECURITY, TRANSFER_ENCODING, UPGRADE,
USER_AGENT, VARY
ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, ALLOW,
AUTHORIZATION, CACHE_CONTROL, CONNECTION, CONTENT_DISPOSITION,
CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_LOCATION,
CONTENT_RANGE, CONTENT_SECURITY_POLICY,
CONTENT_SECURITY_POLICY_REPORT_ONLY, CONTENT_TYPE, DATE, ETAG, EXPECT,
EXPIRES, FORWARDED, FROM, HOST, IF_MATCH, IF_MODIFIED_SINCE,
IF_NONE_MATCH, IF_RANGE, IF_UNMODIFIED_SINCE, LAST_MODIFIED, LINK,
LOCATION, ORIGIN, PRAGMA, RANGE, REFERER, REFERRER_POLICY, REFRESH,
STRICT_TRANSPORT_SECURITY, TE, TRANSFER_ENCODING, UPGRADE, USER_AGENT,
VARY
}
}

View File

@ -1,9 +1,7 @@
extern crate http;
use std::fmt;
use std::str::FromStr;
use crate::{hyper, uncased::uncased_eq};
use crate::uncased::uncased_eq;
use self::Method::*;

View File

@ -1,2 +1,8 @@
pub use hyper_sync_rustls::{util, WrappedStream, ServerSession, TlsServer};
pub use rustls::{Certificate, PrivateKey};
pub use tokio_rustls::TlsAcceptor;
pub use tokio_rustls::rustls;
pub use rustls::internal::pemfile;
pub use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
// TODO.async: extract from hyper-sync-rustls some convenience
// functions to load certs and keys

View File

@ -24,12 +24,12 @@ tls = ["rocket_http/tls"]
private-cookies = ["rocket_http/private-cookies"]
[dependencies]
futures = "0.1"
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" }
futures-preview = { version = "0.3.0-alpha.14", features = ["compat", "io-compat"] }
tokio = "0.1.16"
yansi = "0.5"
log = "0.4"
log = { version = "0.4", features = ["std"] }
toml = "0.4.7"
num_cpus = "1.0"
state = "0.4.1"

View File

@ -3,8 +3,8 @@
use yansi::{Paint, Color::{Red, Yellow, Blue}};
// Specifies the minimum nightly version needed to compile Rocket.
const MIN_DATE: &'static str = "2019-04-05";
const MIN_VERSION: &'static str = "1.35.0-nightly";
const MIN_DATE: &'static str = "2019-07-03";
const MIN_VERSION: &'static str = "1.37.0-nightly";
macro_rules! err {
($version:expr, $date:expr, $msg:expr) => (

View File

@ -1,3 +1,5 @@
use futures::future::Future;
use crate::response;
use crate::handler::ErrorHandler;
use crate::codegen::StaticCatchInfo;
@ -59,7 +61,6 @@ use yansi::Color::*;
///
/// A function decorated with `catch` must take exactly zero or one arguments.
/// If the catcher takes an argument, it must be of type [`&Request`](Request).
#[derive(Clone)]
pub struct Catcher {
/// The HTTP status code to match against.
pub code: u16,
@ -99,7 +100,7 @@ impl Catcher {
}
#[inline(always)]
pub(crate) fn handle<'r>(&self, req: &'r Request<'_>) -> response::Result<'r> {
pub(crate) fn handle<'r>(&self, req: &'r Request<'_>) -> impl Future<Output = response::Result<'r>> {
(self.handler)(req)
}
@ -152,10 +153,12 @@ macro_rules! default_catchers {
let mut map = HashMap::new();
$(
fn $fn_name<'r>(req: &'r Request<'_>) -> response::Result<'r> {
status::Custom(Status::from_code($code).unwrap(),
content::Html(error_page_template!($code, $name, $description))
).respond_to(req)
fn $fn_name<'r>(req: &'r Request<'_>) -> std::pin::Pin<Box<dyn std::future::Future<Output = response::Result<'r>> + Send + 'r>> {
(async move {
status::Custom(Status::from_code($code).unwrap(),
content::Html(error_page_template!($code, $name, $description))
).respond_to(req)
}).boxed()
}
map.insert($code, Catcher::new_default($code, $fn_name));
@ -167,6 +170,7 @@ macro_rules! default_catchers {
pub mod defaults {
use super::Catcher;
use futures::future::FutureExt;
use std::collections::HashMap;

View File

@ -1,9 +1,11 @@
use futures::future::Future;
use crate::{Request, Data};
use crate::handler::{Outcome, ErrorHandler};
use crate::http::{Method, MediaType};
/// Type of a static handler, which users annotate with Rocket's attribute.
pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> Outcome<'r>;
pub type StaticHandler = for<'r> fn(&'r Request<'_>, Data) -> std::pin::Pin<Box<dyn Future<Output = Outcome<'r>> + Send + 'r>>;
/// Information generated by the `route` attribute during codegen.
pub struct StaticRouteInfo {

View File

@ -574,23 +574,33 @@ impl Config {
/// ```
#[cfg(feature = "tls")]
pub fn set_tls(&mut self, certs_path: &str, key_path: &str) -> Result<()> {
use crate::http::tls::util::{self, Error};
use crate::http::tls::pemfile::{certs, rsa_private_keys};
use std::fs::File;
use std::io::BufReader;
let pem_err = "malformed PEM file";
// TODO.async: Fully copy from hyper-sync-rustls, move to http/src/tls
// Partially extracted from hyper-sync-rustls
// Load the certificates.
let certs = util::load_certs(self.root_relative(certs_path))
.map_err(|e| match e {
Error::Io(e) => ConfigError::Io(e, "tls.certs"),
_ => self.bad_type("tls", pem_err, "a valid certificates file")
})?;
let certs = match File::open(self.root_relative(certs_path)) {
Ok(file) => certs(&mut BufReader::new(file)).map_err(|_| {
self.bad_type("tls", pem_err, "a valid certificates file")
}),
Err(e) => Err(ConfigError::Io(e, "tls.certs"))?,
}?;
// And now the private key.
let key = util::load_private_key(self.root_relative(key_path))
.map_err(|e| match e {
Error::Io(e) => ConfigError::Io(e, "tls.key"),
_ => self.bad_type("tls", pem_err, "a valid private key file")
})?;
let mut keys = match File::open(self.root_relative(key_path)) {
Ok(file) => rsa_private_keys(&mut BufReader::new(file)).map_err(|_| {
self.bad_type("tls", pem_err, "a valid private key file")
}),
Err(e) => Err(ConfigError::Io(e, "tls.key")),
}?;
// TODO.async: Proper check for one key
let key = keys.remove(0);
self.tls = Some(TlsConfig { certs, key });
Ok(())

View File

@ -460,7 +460,7 @@ mod test {
use std::env;
use std::sync::Mutex;
use super::{FullConfig, ConfigError, ConfigBuilder};
use super::{Config, FullConfig, ConfigError, ConfigBuilder};
use super::{Environment, GLOBAL_ENV_NAME};
use super::environment::CONFIG_ENV;
use super::Environment::*;
@ -1071,19 +1071,6 @@ mod test {
}
}
macro_rules! check_value {
($key:expr, $val:expr, $config:expr) => (
match $key {
"log" => assert_eq!($config.log_level, $val.parse().unwrap()),
"port" => assert_eq!($config.port, $val.parse().unwrap()),
"address" => assert_eq!($config.address, $val),
"extra_extra" => assert_eq!($config.get_bool($key).unwrap(), true),
"workers" => assert_eq!($config.workers, $val.parse().unwrap()),
_ => panic!("Unexpected key: {}", $key)
}
)
}
#[test]
fn test_env_override() {
// Take the lock so changing the environment doesn't cause races.
@ -1094,6 +1081,17 @@ mod test {
("address", "1.2.3.4"), ("EXTRA_EXTRA", "true"), ("workers", "3")
];
let check_value = |key: &str, val: &str, config: &Config| {
match key {
"log" => assert_eq!(config.log_level, val.parse().unwrap()),
"port" => assert_eq!(config.port, val.parse::<u16>().unwrap()),
"address" => assert_eq!(config.address, val),
"extra_extra" => assert_eq!(config.get_bool(key).unwrap(), true),
"workers" => assert_eq!(config.workers, val.parse::<u16>().unwrap()),
_ => panic!("Unexpected key: {}", key)
}
};
// Check that setting the environment variable actually changes the
// config for the default active and nonactive environments.
for &(key, val) in &pairs {
@ -1103,13 +1101,13 @@ mod test {
for env in &Environment::ALL {
env::set_var(CONFIG_ENV, env.to_string());
let rconfig = env_default().unwrap();
check_value!(&*key.to_lowercase(), val, rconfig.active());
check_value(&*key.to_lowercase(), val, rconfig.active());
}
// And non-active configs.
let rconfig = env_default().unwrap();
for env in &Environment::ALL {
check_value!(&*key.to_lowercase(), val, rconfig.get(*env));
check_value(&*key.to_lowercase(), val, rconfig.get(*env));
}
}
@ -1144,11 +1142,11 @@ mod test {
let mut r = FullConfig::parse(toml, TEST_CONFIG_FILENAME).unwrap();
r.override_from_env().unwrap();
check_value!(&*key.to_lowercase(), val, r.active());
check_value(&*key.to_lowercase(), val, r.active());
// And non-active configs.
for env in &Environment::ALL {
check_value!(&*key.to_lowercase(), val, r.get(*env));
check_value(&*key.to_lowercase(), val, r.get(*env));
}
}

View File

@ -1,16 +1,16 @@
use std::io::{self, Read, Write, Cursor, Chain};
use std::path::Path;
use std::fs::File;
use std::time::Duration;
use std::pin::Pin;
#[cfg(feature = "tls")] use super::net_stream::HttpsStream;
use futures::compat::{Future01CompatExt, Stream01CompatExt, AsyncWrite01CompatExt};
use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite};
use futures::future::Future;
use futures::stream::TryStreamExt;
use super::data_stream::{DataStream, /* TODO kill_stream */};
use super::net_stream::NetStream;
use crate::ext::ReadExt;
use super::data_stream::DataStream;
use crate::http::hyper::{self, Payload};
use futures::{Async, Future};
use crate::http::hyper;
use crate::ext::AsyncReadExt;
/// The number of bytes to read into the "peek" buffer.
const PEEK_BYTES: usize = 512;
@ -48,7 +48,9 @@ const PEEK_BYTES: usize = 512;
/// body data. This enables partially or fully reading from a `Data` object
/// without consuming the `Data` object.
pub struct Data {
body: Vec<u8>,
buffer: Vec<u8>,
is_complete: bool,
stream: Box<dyn AsyncRead + Unpin + Send>,
}
impl Data {
@ -69,11 +71,15 @@ impl Data {
/// }
/// ```
pub fn open(mut self) -> DataStream {
// FIXME: Insert a `BufReader` in front of the `NetStream` with capacity
// 4096. We need the new `Chain` methods to get the inner reader to
// actually do this, however.
let stream = ::std::mem::replace(&mut self.body, vec![]);
DataStream(Cursor::new(stream))
let buffer = std::mem::replace(&mut self.buffer, vec![]);
let stream = std::mem::replace(&mut self.stream, Box::new(&[][..]));
DataStream(buffer, stream)
}
pub(crate) fn from_hyp(body: hyper::Body) -> impl Future<Output = Data> {
// TODO.async: This used to also set the read timeout to 5 seconds.
Data::new(body)
}
/// Retrieve the `peek` buffer.
@ -94,10 +100,10 @@ impl Data {
/// ```
#[inline(always)]
pub fn peek(&self) -> &[u8] {
if self.body.len() > PEEK_BYTES {
&self.body[..PEEK_BYTES]
if self.buffer.len() > PEEK_BYTES {
&self.buffer[..PEEK_BYTES]
} else {
&self.body
&self.buffer
}
}
@ -118,8 +124,7 @@ impl Data {
/// ```
#[inline(always)]
pub fn peek_complete(&self) -> bool {
// TODO self.is_complete
true
self.is_complete
}
/// A helper method to write the body of the request to any `Write` type.
@ -139,8 +144,11 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to<W: Write>(self, writer: &mut W) -> io::Result<u64> {
io::copy(&mut self.open(), writer)
pub fn stream_to<'w, W: AsyncWrite + Unpin>(self, writer: &'w mut W) -> impl Future<Output = io::Result<u64>> + 'w {
Box::pin(async move {
let stream = self.open();
stream.copy_into(writer).await
})
}
/// A helper method to write the body of the request to a file at the path
@ -161,8 +169,11 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path>>(self, path: P) -> io::Result<u64> {
io::copy(&mut self.open(), &mut File::create(path)?)
pub fn stream_to_file<P: AsRef<Path> + Send + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
Box::pin(async move {
let mut file = tokio::fs::File::create(path).compat().await?.compat();
self.stream_to(&mut file).await
})
}
// Creates a new data object with an internal buffer `buf`, where the cursor
@ -170,8 +181,56 @@ impl Data {
// bytes `vec[pos..cap]` are buffered and unread. The remainder of the data
// bytes can be read from `stream`.
#[inline(always)]
pub(crate) fn new(body: Vec<u8>) -> Data {
Data { body }
pub(crate) fn new(body: hyper::Body) -> Pin<Box<dyn Future<Output = Data> + Send>> {
trace_!("Data::new({:?})", body);
let mut stream = body.compat().map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
}).into_async_read();
Box::pin(async {
let mut peek_buf = vec![0; PEEK_BYTES];
let eof = match stream.read_max(&mut peek_buf[..]).await {
Ok(n) => {
trace_!("Filled peek buf with {} bytes.", n);
// TODO.async: This has not gone away, and I don't entirely
// understand what's happening here
// We can use `set_len` here instead of `truncate`, but we'll
// take the performance hit to avoid `unsafe`. All of this code
// should go away when we migrate away from hyper 0.10.x.
peek_buf.truncate(n);
n < PEEK_BYTES
}
Err(e) => {
error_!("Failed to read into peek buffer: {:?}.", e);
// Likewise here as above.
peek_buf.truncate(0);
false
}
};
trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES);
Data { buffer: peek_buf, stream: Box::new(stream), is_complete: eof }
})
}
/// This creates a `data` object from a local data source `data`.
#[inline]
pub(crate) fn local(data: Vec<u8>) -> Data {
Data {
buffer: data,
stream: Box::new(&[][..]),
is_complete: true,
}
}
}
impl std::borrow::Borrow<()> for Data {
fn borrow(&self) -> &() {
&()
}
}

View File

@ -1,50 +1,36 @@
use std::io::{self, Chain, Cursor, Read, Write};
use std::net::Shutdown;
use std::pin::Pin;
pub type InnerStream = Cursor<Vec<u8>>;
use futures::io::{AsyncRead, Error as IoError};
use futures::task::{Poll, Context};
// TODO.async: Consider storing the real type here instead of a Box to avoid
// the dynamic dispatch
/// Raw data stream of a request body.
///
/// This stream can only be obtained by calling
/// [`Data::open()`](crate::data::Data::open()). The stream contains all of the data
/// in the body of the request. It exposes no methods directly. Instead, it must
/// be used as an opaque [`Read`] structure.
pub struct DataStream(pub(crate) InnerStream);
pub struct DataStream(pub(crate) Vec<u8>, pub(crate) Box<dyn AsyncRead + Unpin + Send>);
// TODO.async: Consider implementing `AsyncBufRead`
// TODO: Have a `BufRead` impl for `DataStream`. At the moment, this isn't
// possible since Hyper's `HttpReader` doesn't implement `BufRead`.
impl Read for DataStream {
impl AsyncRead for DataStream {
#[inline(always)]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace_!("DataStream::read()");
self.0.read(buf)
}
}
/* pub fn kill_stream(stream: &mut BodyReader) {
// Only do the expensive reading if we're not sure we're done.
// TODO use self::HttpReader::*;
match *stream {
SizedReader(_, n) | ChunkedReader(_, Some(n)) if n > 0 => { /* continue */ },
_ => return
};
// Take <= 1k from the stream. If there might be more data, force close.
const FLUSH_LEN: u64 = 1024;
match io::copy(&mut stream.take(FLUSH_LEN), &mut io::sink()) {
Ok(FLUSH_LEN) | Err(_) => {
warn_!("Data left unread. Force closing network stream.");
let (_, network) = stream.get_mut().get_mut();
if let Err(e) = network.close(Shutdown::Read) {
error_!("Failed to close network stream: {:?}", e);
}
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
trace_!("DataStream::poll_read()");
if self.0.len() > 0 {
let count = std::cmp::min(buf.len(), self.0.len());
trace_!("Reading peeked {} into dest {} = {} bytes", self.0.len(), buf.len(), count);
let next = self.0.split_off(count);
(&mut buf[..count]).copy_from_slice(&self.0[..]);
self.0 = next;
Poll::Ready(Ok(count))
} else {
trace_!("Delegating to remaining stream");
Pin::new(&mut self.1).poll_read(cx, buf)
}
Ok(n) => debug!("flushed {} unread bytes", n)
}
}*/
impl Drop for DataStream {
fn drop(&mut self) {
// TODO kill_stream(&mut self.0.get_mut().1);
}
}

View File

@ -1,4 +1,8 @@
use std::borrow::Borrow;
use std::pin::Pin;
use futures::future::{ready, Future, FutureExt};
use futures::io::AsyncReadExt;
use crate::outcome::{self, IntoOutcome};
use crate::outcome::Outcome::*;
@ -108,6 +112,9 @@ pub type Transformed<'a, T> =
Outcome<&'a <T as FromData<'a>>::Borrowed, <T as FromData<'a>>::Error>
>;
pub type TransformFuture<'a, T, E> = Pin<Box<dyn Future<Output = Transform<Outcome<T, E>>> + Send + 'a>>;
pub type FromDataFuture<'a, T, E> = Pin<Box<dyn Future<Output = Outcome<T, E>> + Send + 'a>>;
/// Trait implemented by data guards to derive a value from request body data.
///
/// # Data Guards
@ -321,7 +328,7 @@ pub type Transformed<'a, T> =
/// [`FromDataSimple`] documentation.
pub trait FromData<'a>: Sized {
/// The associated error to be returned when the guard fails.
type Error;
type Error: Send;
/// The owned type returned from [`FromData::transform()`].
///
@ -354,7 +361,7 @@ pub trait FromData<'a>: Sized {
/// If transformation succeeds, an outcome of `Success` is returned.
/// If the data is not appropriate given the type of `Self`, `Forward` is
/// returned. On failure, `Failure` is returned.
fn transform(request: &Request<'_>, data: Data) -> Transform<Outcome<Self::Owned, Self::Error>>;
fn transform(request: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error>;
/// Validates, parses, and converts the incoming request body data into an
/// instance of `Self`.
@ -384,23 +391,23 @@ pub trait FromData<'a>: Sized {
/// # unimplemented!()
/// # }
/// ```
fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> Outcome<Self, Self::Error>;
fn from_data(request: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error>;
}
/// The identity implementation of `FromData`. Always returns `Success`.
impl<'f> FromData<'f> for Data {
impl<'a> FromData<'a> for Data {
type Error = std::convert::Infallible;
type Owned = Data;
type Borrowed = Data;
type Borrowed = ();
#[inline(always)]
fn transform(_: &Request<'_>, data: Data) -> Transform<Outcome<Self::Owned, Self::Error>> {
Transform::Owned(Success(data))
fn transform(_: &Request<'_>, data: Data) -> TransformFuture<'a, Self::Owned, Self::Error> {
Box::pin(ready(Transform::Owned(Success(data))))
}
#[inline(always)]
fn from_data(_: &Request<'_>, outcome: Transformed<'f, Self>) -> Outcome<Self, Self::Error> {
outcome.owned()
fn from_data(_: &Request<'_>, outcome: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> {
Box::pin(ready(outcome.owned()))
}
}
@ -494,8 +501,9 @@ impl<'f> FromData<'f> for Data {
/// # fn main() { }
/// ```
pub trait FromDataSimple: Sized {
// TODO.async: Can/should we relax this 'static? And how?
/// The associated error to be returned when the guard fails.
type Error;
type Error: Send + 'static;
/// Validates, parses, and converts an instance of `Self` from the incoming
/// request body data.
@ -503,22 +511,25 @@ pub trait FromDataSimple: Sized {
/// If validation and parsing succeeds, an outcome of `Success` is returned.
/// If the data is not appropriate given the type of `Self`, `Forward` is
/// returned. If parsing fails, `Failure` is returned.
fn from_data(request: &Request<'_>, data: Data) -> Outcome<Self, Self::Error>;
fn from_data(request: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error>;
}
impl<'a, T: FromDataSimple> FromData<'a> for T {
impl<'a, T: FromDataSimple + 'a> FromData<'a> for T {
type Error = T::Error;
type Owned = Data;
type Borrowed = Data;
type Borrowed = ();
#[inline(always)]
fn transform(_: &Request<'_>, d: Data) -> Transform<Outcome<Self::Owned, Self::Error>> {
Transform::Owned(Success(d))
fn transform(_: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> {
Box::pin(ready(Transform::Owned(Success(d))))
}
#[inline(always)]
fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> Outcome<Self, Self::Error> {
T::from_data(req, try_outcome!(o.owned()))
fn from_data(req: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> {
match o.owned() {
Success(data) => T::from_data(req, data),
_ => unreachable!(),
}
}
}
@ -528,17 +539,17 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Result<T, T::Error> {
type Borrowed = T::Borrowed;
#[inline(always)]
fn transform(r: &Request<'_>, d: Data) -> Transform<Outcome<Self::Owned, Self::Error>> {
fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> {
T::transform(r, d)
}
#[inline(always)]
fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome<Self, Self::Error> {
match T::from_data(r, o) {
fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> {
Box::pin(T::from_data(r, o).map(|x| match x {
Success(val) => Success(Ok(val)),
Forward(data) => Forward(data),
Failure((_, e)) => Success(Err(e)),
}
}))
}
}
@ -548,46 +559,52 @@ impl<'a, T: FromData<'a> + 'a> FromData<'a> for Option<T> {
type Borrowed = T::Borrowed;
#[inline(always)]
fn transform(r: &Request<'_>, d: Data) -> Transform<Outcome<Self::Owned, Self::Error>> {
fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'a, Self::Owned, Self::Error> {
T::transform(r, d)
}
#[inline(always)]
fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> Outcome<Self, Self::Error> {
match T::from_data(r, o) {
fn from_data(r: &Request<'_>, o: Transformed<'a, Self>) -> FromDataFuture<'a, Self, Self::Error> {
Box::pin(T::from_data(r, o).map(|x| match x {
Success(val) => Success(Some(val)),
Failure(_) | Forward(_) => Success(None),
}
}))
}
}
#[cfg(debug_assertions)]
use std::io::{self, Read};
#[cfg(debug_assertions)]
impl FromDataSimple for String {
type Error = io::Error;
type Error = std::io::Error;
#[inline(always)]
fn from_data(_: &Request<'_>, data: Data) -> Outcome<Self, Self::Error> {
let mut string = String::new();
match data.open().read_to_string(&mut string) {
Ok(_) => Success(string),
Err(e) => Failure((Status::BadRequest, e))
}
fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> {
Box::pin(async {
let mut stream = data.open();
let mut buf = Vec::new();
if let Err(e) = stream.read_to_end(&mut buf).await {
return Failure((Status::BadRequest, e));
}
match String::from_utf8(buf) {
Ok(s) => Success(s),
Err(e) => Failure((Status::BadRequest, std::io::Error::new(std::io::ErrorKind::Other, e))),
}
})
}
}
#[cfg(debug_assertions)]
impl FromDataSimple for Vec<u8> {
type Error = io::Error;
type Error = std::io::Error;
#[inline(always)]
fn from_data(_: &Request<'_>, data: Data) -> Outcome<Self, Self::Error> {
let mut bytes = Vec::new();
match data.open().read_to_end(&mut bytes) {
Ok(_) => Success(bytes),
Err(e) => Failure((Status::BadRequest, e))
}
fn from_data(_: &Request<'_>, data: Data) -> FromDataFuture<'static, Self, Self::Error> {
Box::pin(async {
let mut stream = data.open();
let mut buf = Vec::new();
match stream.read_to_end(&mut buf).await {
Ok(_) => Success(buf),
Err(e) => Failure((Status::BadRequest, e)),
}
})
}
}

View File

@ -2,9 +2,8 @@
mod data;
mod data_stream;
mod net_stream;
mod from_data;
pub use self::data::Data;
pub use self::data_stream::DataStream;
pub use self::from_data::{FromData, FromDataSimple, Outcome, Transform, Transformed};
pub use self::from_data::{FromData, FromDataFuture, FromDataSimple, Outcome, Transform, Transformed, TransformFuture};

View File

@ -1,94 +0,0 @@
use std::io;
use std::net::{SocketAddr, Shutdown};
use std::time::Duration;
#[cfg(feature = "tls")] use crate::http::tls::{WrappedStream, ServerSession};
// TODO use http::hyper::net::{HttpStream, NetworkStream};
use self::NetStream::*;
#[cfg(feature = "tls")] pub type HttpsStream = WrappedStream<ServerSession>;
// This is a representation of all of the possible network streams we might get.
// This really shouldn't be necessary, but, you know, Hyper.
#[derive(Clone)]
pub enum NetStream {
Http/* TODO (HttpStream) */,
#[cfg(feature = "tls")]
Https(HttpsStream),
Empty,
}
impl io::Read for NetStream {
#[inline(always)]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
trace_!("NetStream::read()");
let res = match *self {
Http/*(ref mut stream)*/ => Ok(0) /* TODO stream.read(buf)*/,
#[cfg(feature = "tls")] Https(ref mut stream) => stream.read(buf),
Empty => Ok(0),
};
trace_!("NetStream::read() -- complete");
res
}
}
impl io::Write for NetStream {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
trace_!("NetStream::write()");
match *self {
Http/* TODO (ref mut stream) => stream.write(buf)*/ => Ok(0),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.write(buf),
Empty => Ok(0),
}
}
#[inline(always)]
fn flush(&mut self) -> io::Result<()> {
match *self {
Http/* TODO (ref mut stream) => stream.flush()*/ => Ok(()),
#[cfg(feature = "tls")] Https(ref mut stream) => stream.flush(),
Empty => Ok(()),
}
}
}
//impl NetworkStream for NetStream {
// #[inline(always)]
// fn peer_addr(&mut self) -> io::Result<SocketAddr> {
// match *self {
// Http/* TODO (ref mut stream) => stream.peer_addr()*/ => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)),
// #[cfg(feature = "tls")] Https(ref mut stream) => stream.peer_addr(),
// Empty => Err(io::Error::from(io::ErrorKind::AddrNotAvailable)),
// }
// }
//
// #[inline(always)]
// fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
// match *self {
// Http/* TODO (ref stream) => stream.set_read_timeout(dur)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref stream) => stream.set_read_timeout(dur),
// Empty => Ok(()),
// }
// }
//
// #[inline(always)]
// fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
// match *self {
// Http/* TODO (ref stream) => stream.set_write_timeout(dur)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref stream) => stream.set_write_timeout(dur),
// Empty => Ok(()),
// }
// }
//
// #[inline(always)]
// fn close(&mut self, how: Shutdown) -> io::Result<()> {
// match *self {
// Http/* TODO (ref mut stream) => stream.close(how)*/ => Ok(()),
// #[cfg(feature = "tls")] Https(ref mut stream) => stream.close(how),
// Empty => Ok(()),
// }
// }
//}

View File

@ -19,7 +19,7 @@ use crate::router::Route;
#[derive(Debug)]
pub enum LaunchErrorKind {
/// Binding to the provided address/port failed.
Bind(std::io::Error),
Bind(hyper::Error),
/// An I/O error occurred during launch.
Io(io::Error),
/// Route collisions were detected.
@ -123,10 +123,9 @@ impl LaunchError {
impl From<hyper::Error> for LaunchError {
#[inline]
fn from(error: hyper::Error) -> LaunchError {
match error {
// TODO hyper::Error::Io(e) => LaunchError::new(LaunchErrorKind::Io(e)),
e => LaunchError::new(LaunchErrorKind::Unknown(Box::new(e)))
}
// TODO.async: Should "hyper error" be another variant of LaunchErrorKind?
// Or should this use LaunchErrorKind::Io?
LaunchError::new(LaunchErrorKind::Unknown(Box::new(error)))
}
}

View File

@ -1,19 +1,95 @@
use std::io;
use std::pin::Pin;
pub trait ReadExt: io::Read {
fn read_max(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let start_len = buf.len();
while !buf.is_empty() {
match self.read(buf) {
Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
use futures::io::{AsyncRead, AsyncReadExt as _};
use futures::future::{Future};
use futures::stream::Stream;
use futures::task::{Poll, Context};
use crate::http::hyper::Chunk;
// Based on std::io::Take, but for AsyncRead instead of Read
pub struct Take<R>{
inner: R,
limit: u64,
}
// TODO.async: Verify correctness of this implementation.
impl<R> AsyncRead for Take<R> where R: AsyncRead + Unpin {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
if self.limit == 0 {
return Poll::Ready(Ok(0));
}
Ok(start_len - buf.len())
let max = std::cmp::min(buf.len() as u64, self.limit) as usize;
match Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
self.limit -= n as u64;
Poll::Ready(Ok(n))
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
}
}
}
impl<T: io::Read> ReadExt for T { }
pub struct IntoChunkStream<R> {
inner: R,
buf_size: usize,
buffer: Vec<u8>,
}
// TODO.async: Verify correctness of this implementation.
impl<R> Stream for IntoChunkStream<R>
where R: AsyncRead + Unpin
{
type Item = Result<Chunk, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
assert!(self.buffer.len() == self.buf_size);
let Self { ref mut inner, ref mut buffer, buf_size } = *self;
match Pin::new(inner).poll_read(cx, &mut buffer[..]) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None),
Poll::Ready(Ok(n)) => {
let mut next = std::mem::replace(buffer, vec![0; buf_size]);
next.truncate(n);
Poll::Ready(Some(Ok(Chunk::from(next))))
}
}
}
}
pub trait AsyncReadExt: AsyncRead {
fn take(self, limit: u64) -> Take<Self> where Self: Sized {
Take { inner: self, limit }
}
fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream<Self> where Self: Sized {
IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] }
}
// TODO.async: Verify correctness of this implementation.
fn read_max<'a>(&'a mut self, mut buf: &'a mut [u8]) -> Pin<Box<dyn Future<Output=io::Result<usize>> + Send + '_>>
where Self: Send + Unpin
{
Box::pin(async move {
let start_len = buf.len();
while !buf.is_empty() {
match self.read(buf).await {
Ok(0) => break,
Ok(n) => { let tmp = buf; buf = &mut tmp[n..]; }
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(start_len - buf.len())
})
}
}
impl<T: AsyncRead> AsyncReadExt for T { }

View File

@ -1,5 +1,7 @@
//! Types and traits for request and error handlers and their return values.
use futures::future::Future;
use crate::data::Data;
use crate::request::Request;
use crate::response::{self, Response, Responder};
@ -9,6 +11,9 @@ use crate::outcome;
/// Type alias for the `Outcome` of a `Handler`.
pub type Outcome<'r> = outcome::Outcome<Response<'r>, Status, Data>;
/// Type alias for the unwieldy `Handler` return type
pub type HandlerFuture<'r> = std::pin::Pin<Box<dyn Future<Output = Outcome<'r>> + Send + 'r>>;
/// Trait implemented by types that can handle requests.
///
/// In general, you will never need to implement `Handler` manually or be
@ -142,7 +147,7 @@ pub trait Handler: Cloneable + Send + Sync + 'static {
/// a response. Otherwise, if the return value is `Forward(Data)`, the next
/// matching route is attempted. If there are no other matching routes, the
/// `404` error catcher is invoked.
fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> Outcome<'r>;
fn handle<'r>(&self, request: &'r Request<'_>, data: Data) -> HandlerFuture<'r>;
}
/// Unfortunate but necessary hack to be able to clone a `Box<Handler>`.
@ -170,16 +175,18 @@ impl Clone for Box<dyn Handler> {
}
impl<F: Clone + Sync + Send + 'static> Handler for F
where for<'r> F: Fn(&'r Request<'_>, Data) -> Outcome<'r>
where for<'r> F: Fn(&'r Request<'_>, Data) -> HandlerFuture<'r>
{
#[inline(always)]
fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> Outcome<'r> {
fn handle<'r>(&self, req: &'r Request<'_>, data: Data) -> HandlerFuture<'r> {
self(req, data)
}
}
/// The type of an error handler.
pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> response::Result<'r>;
pub type ErrorHandler = for<'r> fn(&'r Request<'_>) -> ErrorHandlerFuture<'r>;
pub type ErrorHandlerFuture<'r> = std::pin::Pin<Box<dyn Future<Output = response::Result<'r>> + Send + 'r>>;
impl<'r> Outcome<'r> {
/// Return the `Outcome` of response to `req` from `responder`.

View File

@ -1,4 +1,5 @@
#![feature(proc_macro_hygiene)]
#![feature(async_await)]
#![recursion_limit="256"]

View File

@ -107,9 +107,7 @@ impl<'c> LocalRequest<'c> {
uri: Cow<'c, str>
) -> LocalRequest<'c> {
// We set a dummy string for now and check the user's URI on dispatch.
let config = &client.rocket().config;
let state = &client.rocket().state;
let request = Request::new(config, state, method, Origin::dummy());
let request = Request::new(client.rocket(), method, Origin::dummy());
// Set up any cookies we know about.
if let Some(ref jar) = client.cookies {
@ -399,40 +397,46 @@ impl<'c> LocalRequest<'c> {
uri: &str,
data: Vec<u8>
) -> LocalResponse<'c> {
let maybe_uri = Origin::parse(uri);
// First, validate the URI, returning an error response (generated from
// an error catcher) immediately if it's invalid.
if let Ok(uri) = Origin::parse(uri) {
if let Ok(uri) = maybe_uri {
request.set_uri(uri.into_owned());
} else {
error!("Malformed request URI: {}", uri);
let res = client.rocket().handle_error(Status::BadRequest, request);
return LocalResponse { _request: owned_request, response: res };
return futures::executor::block_on(async move {
let res = client.rocket().handle_error(Status::BadRequest, request).await;
LocalResponse { _request: owned_request, response: res }
})
}
// Actually dispatch the request.
let response = client.rocket().dispatch(request, Data::new(data));
futures::executor::block_on(async move {
// Actually dispatch the request.
let response = client.rocket().dispatch(request, Data::local(data)).await;
// If the client is tracking cookies, updates the internal cookie jar
// with the changes reflected by `response`.
if let Some(ref jar) = client.cookies {
let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock");
// If the client is tracking cookies, updates the internal cookie jar
// with the changes reflected by `response`.
if let Some(ref jar) = client.cookies {
let mut jar = jar.write().expect("LocalRequest::_dispatch() write lock");
let current_time = time::OffsetDateTime::now_utc();
for cookie in response.cookies() {
if let Some(expires) = cookie.expires() {
if expires <= current_time {
jar.force_remove(cookie);
continue;
for cookie in response.cookies() {
if let Some(expires) = cookie.expires() {
if expires <= current_time {
jar.force_remove(cookie);
continue;
}
}
jar.add(cookie.into_owned());
}
jar.add(cookie.into_owned());
}
}
LocalResponse {
_request: owned_request,
response: response
}
LocalResponse {
_request: owned_request,
response: response
}
})
}
}
@ -454,6 +458,16 @@ pub struct LocalResponse<'c> {
response: Response<'c>,
}
impl LocalResponse<'_> {
pub fn body_string_wait(&mut self) -> Option<String> {
futures::executor::block_on(self.body_string())
}
pub fn body_bytes_wait(&mut self) -> Option<Vec<u8>> {
futures::executor::block_on(self.body_bytes())
}
}
impl<'c> Deref for LocalResponse<'c> {
type Target = Response<'c>;

View File

@ -158,16 +158,16 @@ pub(crate) fn try_init(level: LoggingLevel, verbose: bool) -> bool {
}
push_max_level(level);
/* if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) {
if let Err(e) = log::set_boxed_logger(Box::new(RocketLogger(level))) {
if verbose {
eprintln!("Logger failed to initialize: {}", e);
}
pop_max_level();
return false;
}*/
}
false
true
}
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};

View File

@ -1,9 +1,12 @@
use std::ops::Deref;
use futures::io::AsyncReadExt;
use crate::outcome::Outcome::*;
use crate::request::{Request, form::{FromForm, FormItems, FormDataError}};
use crate::data::{Outcome, Transform, Transformed, Data, FromData};
use crate::data::{Outcome, Transform, Transformed, Data, FromData, TransformFuture, FromDataFuture};
use crate::http::{Status, uri::{Query, FromUriParam}};
use crate::ext::AsyncReadExt as _;
/// A data guard for parsing [`FromForm`] types strictly.
///
@ -184,7 +187,7 @@ impl<'f, T: FromForm<'f>> Form<T> {
///
/// All relevant warnings and errors are written to the console in Rocket
/// logging format.
impl<'f, T: FromForm<'f>> FromData<'f> for Form<T> {
impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form<T> {
type Error = FormDataError<'f, T::Error>;
type Owned = String;
type Borrowed = str;
@ -192,26 +195,31 @@ impl<'f, T: FromForm<'f>> FromData<'f> for Form<T> {
fn transform(
request: &Request<'_>,
data: Data
) -> Transform<Outcome<Self::Owned, Self::Error>> {
use std::{cmp::min, io::Read};
) -> TransformFuture<'f, Self::Owned, Self::Error> {
if !request.content_type().map_or(false, |ct| ct.is_form()) {
warn_!("Form data does not have form content type.");
return Transform::Borrowed(Forward(data))
return Box::pin(futures::future::ready(Transform::Borrowed(Forward(data))));
}
let limit = request.limits().forms;
let mut stream = data.open().take(limit);
let mut form_string = String::with_capacity(min(4096, limit) as usize);
if let Err(e) = stream.read_to_string(&mut form_string) {
return Transform::Borrowed(Failure((Status::InternalServerError, FormDataError::Io(e))))
}
Box::pin(async move {
let mut buf = Vec::new();
if let Err(e) = stream.read_to_end(&mut buf).await {
return Transform::Borrowed(Failure((Status::InternalServerError, FormDataError::Io(e))));
}
Transform::Borrowed(Success(form_string))
Transform::Borrowed(match String::from_utf8(buf) {
Ok(s) => Success(s),
Err(e) => Failure((Status::BadRequest, FormDataError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))),
})
})
}
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome<Self, Self::Error> {
<Form<T>>::from_data(try_outcome!(o.borrowed()), true).map(Form)
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> {
Box::pin(futures::future::ready(o.borrowed().and_then(|data| {
<Form<T>>::from_data(data, true).map(Form)
})))
}
}

View File

@ -93,7 +93,7 @@ use crate::request::FormItems;
/// ```
pub trait FromForm<'f>: Sized {
/// The associated error to be returned when parsing fails.
type Error;
type Error: Send;
/// Parses an instance of `Self` from the iterator of form items `it`.
///

View File

@ -1,7 +1,7 @@
use std::ops::Deref;
use crate::request::{Request, form::{Form, FormDataError, FromForm}};
use crate::data::{Data, Transform, Transformed, FromData, Outcome};
use crate::data::{Data, Transformed, FromData, TransformFuture, FromDataFuture};
use crate::http::uri::{Query, FromUriParam};
/// A data guard for parsing [`FromForm`] types leniently.
@ -95,17 +95,19 @@ impl<T> Deref for LenientForm<T> {
}
}
impl<'f, T: FromForm<'f>> FromData<'f> for LenientForm<T> {
impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for LenientForm<T> {
type Error = FormDataError<'f, T::Error>;
type Owned = String;
type Borrowed = str;
fn transform(r: &Request<'_>, d: Data) -> Transform<Outcome<Self::Owned, Self::Error>> {
fn transform(r: &Request<'_>, d: Data) -> TransformFuture<'f, Self::Owned, Self::Error> {
<Form<T>>::transform(r, d)
}
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> Outcome<Self, Self::Error> {
<Form<T>>::from_data(try_outcome!(o.borrowed()), false).map(LenientForm)
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> {
Box::pin(futures::future::ready(o.borrowed().and_then(|form| {
<Form<T>>::from_data(form, false).map(LenientForm)
})))
}
}

View File

@ -1,10 +1,7 @@
use std::rc::Rc;
use std::cell::{Cell, RefCell};
use std::sync::{Arc, RwLock, Mutex};
use std::net::{IpAddr, SocketAddr};
use std::fmt;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use yansi::Paint;
use state::{Container, Storage};
@ -15,7 +12,7 @@ use crate::request::{FromFormValue, FormItems, FormItem};
use crate::rocket::Rocket;
use crate::router::Route;
use crate::config::{Config, Limits};
use crate::http::{hyper, uri::{Origin, Segments, Uri}};
use crate::http::{hyper, uri::{Origin, Segments}};
use crate::http::{Method, Header, HeaderMap, Cookies};
use crate::http::{RawStr, ContentType, Accept, MediaType};
use crate::http::private::{Indexed, SmallVec, CookieJar};
@ -28,26 +25,26 @@ type Indices = (usize, usize);
/// should likely only be used when writing [`FromRequest`] implementations. It
/// contains all of the information for a given web request except for the body
/// data. This includes the HTTP method, URI, cookies, headers, and more.
#[derive(Clone)]
//#[derive(Clone)]
pub struct Request<'r> {
method: Cell<Method>,
method: RwLock<Method>,
uri: Origin<'r>,
headers: HeaderMap<'r>,
remote: Option<SocketAddr>,
pub(crate) state: RequestState<'r>,
}
#[derive(Clone)]
//#[derive(Clone)]
pub(crate) struct RequestState<'r> {
pub config: &'r Config,
pub managed: &'r Container,
pub path_segments: SmallVec<[Indices; 12]>,
pub query_items: Option<SmallVec<[IndexedFormItem; 6]>>,
pub route: Cell<Option<&'r Route>>,
pub cookies: RefCell<CookieJar>,
pub route: RwLock<Option<&'r Route>>,
pub cookies: Mutex<Option<CookieJar>>,
pub accept: Storage<Option<Accept>>,
pub content_type: Storage<Option<ContentType>>,
pub cache: Rc<Container>,
pub cache: Arc<Container>,
}
#[derive(Clone)]
@ -61,26 +58,25 @@ impl<'r> Request<'r> {
/// Create a new `Request` with the given `method` and `uri`.
#[inline(always)]
pub(crate) fn new<'s: 'r>(
config: &'r Config,
managed: &'r Container,
rocket: &'r Rocket,
method: Method,
uri: Origin<'s>
) -> Request<'r> {
let mut request = Request {
method: Cell::new(method),
method: RwLock::new(method),
uri: uri,
headers: HeaderMap::new(),
remote: None,
state: RequestState {
path_segments: SmallVec::new(),
query_items: None,
config,
managed,
route: Cell::new(None),
cookies: RefCell::new(CookieJar::new()),
config: &rocket.config,
managed: &rocket.state,
route: RwLock::new(None),
cookies: Mutex::new(Some(CookieJar::new())),
accept: Storage::new(),
content_type: Storage::new(),
cache: Rc::new(Container::new()),
cache: Arc::new(Container::new()),
}
};
@ -103,7 +99,7 @@ impl<'r> Request<'r> {
/// ```
#[inline(always)]
pub fn method(&self) -> Method {
self.method.get()
*self.method.read().unwrap()
}
/// Set the method of `self`.
@ -292,9 +288,13 @@ impl<'r> Request<'r> {
/// ```
pub fn cookies(&self) -> Cookies<'_> {
// FIXME: Can we do better? This is disappointing.
match self.state.cookies.try_borrow_mut() {
Ok(jar) => Cookies::new(jar, self.state.config.secret_key()),
Err(_) => {
let mut guard = self.state.cookies.lock().expect("cookies lock");
match guard.take() {
Some(jar) => {
let mutex = &self.state.cookies;
Cookies::new(jar, self.state.config.secret_key(), move |jar| *mutex.lock().expect("cookies lock") = Some(jar))
}
None => {
error_!("Multiple `Cookies` instances are active at once.");
info_!("An instance of `Cookies` must be dropped before another \
can be retrieved.");
@ -499,7 +499,7 @@ impl<'r> Request<'r> {
/// # });
/// ```
pub fn route(&self) -> Option<&'r Route> {
self.state.route.get()
*self.state.route.read().unwrap()
}
/// Invokes the request guard implementation for `T`, returning its outcome.
@ -702,7 +702,7 @@ impl<'r> Request<'r> {
pub fn example<F: Fn(&mut Request<'_>)>(method: Method, uri: &str, f: F) {
let rocket = Rocket::custom(Config::development());
let uri = Origin::parse(uri).expect("invalid URI in example");
let mut request = Request::new(&rocket.config, &rocket.state, method, uri);
let mut request = Request::new(&rocket, method, uri);
f(&mut request);
}
@ -773,79 +773,66 @@ impl<'r> Request<'r> {
/// was `route`. Use during routing when attempting a given route.
#[inline(always)]
pub(crate) fn set_route(&self, route: &'r Route) {
self.state.route.set(Some(route));
*self.state.route.write().unwrap() = Some(route);
}
/// Set the method of `self`, even when `self` is a shared reference. Used
/// during routing to override methods for re-routing.
#[inline(always)]
pub(crate) fn _set_method(&self, method: Method) {
self.method.set(method);
*self.method.write().unwrap() = method;
}
/// Convert from Hyper types into a Rocket Request.
pub(crate) fn from_hyp(
config: &'r Config,
managed: &'r Container,
request_parts: &hyper::Parts,
rocket: &'r Rocket,
h_method: hyper::Method,
h_headers: hyper::HeaderMap<hyper::HeaderValue>,
h_uri: hyper::Uri,
h_addr: SocketAddr,
) -> Result<Request<'r>, String> {
let h_uri = &request_parts.uri;
let h_headers = &request_parts.headers;
let h_version = &request_parts.version;
let h_method = &request_parts.method;;
// if !h_uri.is_absolute() {
// return Err(format!("Bad URI: {}", h_uri));
// };
// TODO.async: Can we avoid this allocation?
// TODO.async: Assert that uri is "absolute"
// Get a copy of the URI for later use.
let uri = h_uri.to_string();
// Ensure that the method is known. TODO: Allow made-up methods?
let method = match Method::from_hyp(h_method) {
let method = match Method::from_hyp(&h_method) {
Some(method) => method,
None => return Err(format!("Unknown method: {}", h_method))
None => return Err(format!("Unknown or invalid method: {}", h_method))
};
// We need to re-parse the URI since we don't trust Hyper... :(
let uri = Origin::parse_owned(format!("{}", h_uri)).map_err(|e| e.to_string())?;
let uri = Origin::parse_owned(format!("{}", uri)).map_err(|e| e.to_string())?;
// Construct the request object.
let mut request = Request::new(config, managed, method, uri);
// request.set_remote(match hyp_req.remote_addr() {
// Some(remote) => remote,
// None => return Err(String::from("Missing remote address"))
// });
let mut request = Request::new(rocket, method, uri);
request.set_remote(h_addr);
// Set the request cookies, if they exist.
let cookie_headers = h_headers.get_all("Cookie").iter();
// TODO if cookie_headers.peek().is_some() {
let mut cookie_jar = CookieJar::new();
for header in cookie_headers {
let raw_str = match ::std::str::from_utf8(header.as_bytes()) {
Ok(string) => string,
Err(_) => continue
};
let mut cookie_jar = CookieJar::new();
for header in h_headers.get_all("Cookie") {
// TODO.async: This used to only allow UTF-8 but now only allows ASCII
// (needs verification)
let raw_str = match header.to_str() {
Ok(string) => string,
Err(_) => continue
};
for cookie_str in raw_str.split(';').map(|s| s.trim()) {
if let Some(cookie) = Cookies::parse_cookie(cookie_str) {
cookie_jar.add_original(cookie);
}
for cookie_str in raw_str.split(';').map(|s| s.trim()) {
if let Some(cookie) = Cookies::parse_cookie(cookie_str) {
cookie_jar.add_original(cookie);
}
}
request.state.cookies = RefCell::new(cookie_jar);
// TODO }
}
request.state.cookies = Mutex::new(Some(cookie_jar));
// Set the rest of the headers.
for (name, value) in h_headers.iter() {
// TODO if let Some(header_values) = h_headers.get_all(hyp.name()) {
// This is not totally correct since values needn't be UTF8.
let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned();
let header = Header::new(name.to_string(), value_str);
request.add_header(header);
// TODO }
// This is not totally correct since values needn't be UTF8.
let value_str = String::from_utf8_lossy(value.as_bytes()).into_owned();
let header = Header::new(name.to_string(), value_str);
request.add_header(header);
}
Ok(request)

View File

@ -7,13 +7,13 @@ use crate::http::hyper;
macro_rules! assert_headers {
($($key:expr => [$($value:expr),+]),+) => ({
// Set up the parameters to the hyper request object.
let h_method = hyper::Method::Get;
let h_uri = hyper::RequestUri::AbsolutePath("/test".to_string());
let h_method = hyper::Method::GET;
let h_uri = "/test".parse().unwrap();
let h_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000);
let mut h_headers = hyper::header::Headers::new();
let mut h_headers = hyper::HeaderMap::new();
// Add all of the passed in headers to the request.
$($(h_headers.append_raw($key.to_string(), $value.as_bytes().into());)+)+
$($(h_headers.append($key, hyper::HeaderValue::from_str($value).unwrap());)+)+
// Build up what we expect the headers to actually be.
let mut expected = HashMap::new();

View File

@ -1,6 +1,8 @@
use std::fs::File;
use std::io::{Cursor, BufReader};
use futures::compat::AsyncRead01CompatExt;
use crate::http::{Status, ContentType, StatusClass};
use crate::response::{self, Response, Body};
use crate::request::Request;
@ -241,10 +243,11 @@ impl Responder<'_> for Vec<u8> {
/// Returns a response with a sized body for the file. Always returns `Ok`.
impl Responder<'_> for File {
fn respond_to(self, _: &Request<'_>) -> response::Result<'static> {
let (metadata, file) = (self.metadata(), BufReader::new(self));
let metadata = self.metadata();
let stream = BufReader::new(tokio::fs::File::from_std(self)).compat();
match metadata {
Ok(md) => Response::build().raw_body(Body::Sized(file, md.len())).ok(),
Err(_) => Response::build().streamed_body(file).ok()
Ok(md) => Response::build().raw_body(Body::Sized(stream, md.len())).ok(),
Err(_) => Response::build().streamed_body(stream).ok()
}
}
}

View File

@ -1,8 +1,13 @@
use std::{io, fmt, str};
use std::borrow::Cow;
use std::pin::Pin;
use futures::future::{Future, FutureExt};
use futures::io::{AsyncRead, AsyncReadExt};
use crate::response::Responder;
use crate::http::{Header, HeaderMap, Status, ContentType, Cookie};
use crate::ext::AsyncReadExt as _;
/// The default size, in bytes, of a chunk for streamed responses.
pub const DEFAULT_CHUNK_SIZE: u64 = 4096;
@ -59,31 +64,34 @@ impl<T> Body<T> {
}
}
impl<T: io::Read> Body<T> {
impl<T: AsyncRead + Unpin> Body<T> {
/// Attempts to read `self` into a `Vec` and returns it. If reading fails,
/// returns `None`.
pub fn into_bytes(self) -> Option<Vec<u8>> {
let mut vec = Vec::new();
let mut body = self.into_inner();
if let Err(e) = body.read_to_end(&mut vec) {
error_!("Error reading body: {:?}", e);
return None;
}
pub fn into_bytes(self) -> impl Future<Output=Option<Vec<u8>>> {
Box::pin(async move {
let mut vec = Vec::new();
let mut body = self.into_inner();
if let Err(e) = body.read_to_end(&mut vec).await {
error_!("Error reading body: {:?}", e);
return None;
}
Some(vec)
Some(vec)
})
}
/// Attempts to read `self` into a `String` and returns it. If reading or
/// conversion fails, returns `None`.
pub fn into_string(self) -> Option<String> {
self.into_bytes()
.and_then(|bytes| match String::from_utf8(bytes) {
pub fn into_string(self) -> impl Future<Output = Option<String>> {
self.into_bytes().map(|bytes| {
bytes.and_then(|bytes| match String::from_utf8(bytes) {
Ok(string) => Some(string),
Err(e) => {
error_!("Body is invalid UTF-8: {}", e);
None
}
})
})
}
}
@ -350,7 +358,7 @@ impl<'r> ResponseBuilder<'r> {
/// ```
#[inline(always)]
pub fn sized_body<B>(&mut self, body: B) -> &mut ResponseBuilder<'r>
where B: io::Read + io::Seek + 'r
where B: AsyncRead + io::Seek + Send + Unpin + 'r
{
self.response.set_sized_body(body);
self
@ -376,7 +384,7 @@ impl<'r> ResponseBuilder<'r> {
/// ```
#[inline(always)]
pub fn streamed_body<B>(&mut self, body: B) -> &mut ResponseBuilder<'r>
where B: io::Read + 'r
where B: AsyncRead + Send + 'r
{
self.response.set_streamed_body(body);
self
@ -402,7 +410,7 @@ impl<'r> ResponseBuilder<'r> {
/// # }
/// ```
#[inline(always)]
pub fn chunked_body<B: io::Read + 'r>(&mut self, body: B, chunk_size: u64)
pub fn chunked_body<B: AsyncRead + Send + 'r>(&mut self, body: B, chunk_size: u64)
-> &mut ResponseBuilder<'r>
{
self.response.set_chunked_body(body, chunk_size);
@ -425,7 +433,7 @@ impl<'r> ResponseBuilder<'r> {
/// .finalize();
/// ```
#[inline(always)]
pub fn raw_body<T: io::Read + 'r>(&mut self, body: Body<T>)
pub fn raw_body<T: AsyncRead + Send + Unpin + 'r>(&mut self, body: Body<T>)
-> &mut ResponseBuilder<'r>
{
self.response.set_raw_body(body);
@ -560,7 +568,7 @@ impl<'r> ResponseBuilder<'r> {
pub struct Response<'r> {
status: Option<Status>,
headers: HeaderMap<'r>,
body: Option<Body<Box<dyn io::Read + 'r>>>,
body: Option<Body<Pin<Box<dyn AsyncRead + Send + 'r>>>>,
}
impl<'r> Response<'r> {
@ -889,7 +897,7 @@ impl<'r> Response<'r> {
/// assert_eq!(response.body_string(), Some("Hello, world!".to_string()));
/// ```
#[inline(always)]
pub fn body(&mut self) -> Option<Body<&mut dyn io::Read>> {
pub fn body(&mut self) -> Option<Body<&mut (dyn AsyncRead + Unpin + Send)>> {
// Looks crazy, right? Needed so Rust infers lifetime correctly. Weird.
match self.body.as_mut() {
Some(body) => Some(match body.as_mut() {
@ -919,8 +927,14 @@ impl<'r> Response<'r> {
/// assert!(response.body().is_none());
/// ```
#[inline(always)]
pub fn body_string(&mut self) -> Option<String> {
self.take_body().and_then(Body::into_string)
pub fn body_string(&mut self) -> impl Future<Output = Option<String>> + 'r {
let body = self.take_body();
Box::pin(async move {
match body {
Some(body) => body.into_string().await,
None => None,
}
})
}
/// Consumes `self's` body and reads it into a `Vec` of `u8` bytes. If
@ -941,8 +955,14 @@ impl<'r> Response<'r> {
/// assert!(response.body().is_none());
/// ```
#[inline(always)]
pub fn body_bytes(&mut self) -> Option<Vec<u8>> {
self.take_body().and_then(Body::into_bytes)
pub fn body_bytes(&mut self) -> impl Future<Output = Option<Vec<u8>>> + 'r {
let body = self.take_body();
Box::pin(async move {
match body {
Some(body) => body.into_bytes().await,
None => None,
}
})
}
/// Moves the body of `self` out and returns it, if there is one, leaving no
@ -966,17 +986,17 @@ impl<'r> Response<'r> {
/// assert!(response.body().is_none());
/// ```
#[inline(always)]
pub fn take_body(&mut self) -> Option<Body<Box<dyn io::Read + 'r>>> {
pub fn take_body(&mut self) -> Option<Body<Pin<Box<dyn AsyncRead + Send + 'r>>>> {
self.body.take()
}
// Makes the `Read`er in the body empty but leaves the size of the body if
// Makes the `AsyncRead`er in the body empty but leaves the size of the body if
// it exists. Only meant to be used to handle HEAD requests automatically.
#[inline(always)]
pub(crate) fn strip_body(&mut self) {
if let Some(body) = self.take_body() {
self.body = match body {
Body::Sized(_, n) => Some(Body::Sized(Box::new(io::empty()), n)),
Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::empty()), n)),
Body::Chunked(..) => None
};
}
@ -1004,13 +1024,13 @@ impl<'r> Response<'r> {
/// ```
#[inline]
pub fn set_sized_body<B>(&mut self, mut body: B)
where B: io::Read + io::Seek + 'r
where B: AsyncRead + io::Seek + Send + Unpin + 'r
{
let size = body.seek(io::SeekFrom::End(0))
.expect("Attempted to retrieve size by seeking, but failed.");
body.seek(io::SeekFrom::Start(0))
.expect("Attempted to reset body by seeking after getting size.");
self.body = Some(Body::Sized(Box::new(body.take(size)), size));
self.body = Some(Body::Sized(Box::pin(body.take(size)), size));
}
/// Sets the body of `self` to be `body`, which will be streamed. The chunk
@ -1021,7 +1041,7 @@ impl<'r> Response<'r> {
/// # Example
///
/// ```rust
/// use std::io::{Read, repeat};
/// use std::io::{AsyncRead, repeat};
/// use rocket::Response;
///
/// let mut response = Response::new();
@ -1029,7 +1049,7 @@ impl<'r> Response<'r> {
/// assert_eq!(response.body_string(), Some("aaaaa".to_string()));
/// ```
#[inline(always)]
pub fn set_streamed_body<B>(&mut self, body: B) where B: io::Read + 'r {
pub fn set_streamed_body<B>(&mut self, body: B) where B: AsyncRead + Send + 'r {
self.set_chunked_body(body, DEFAULT_CHUNK_SIZE);
}
@ -1039,7 +1059,7 @@ impl<'r> Response<'r> {
/// # Example
///
/// ```rust
/// use std::io::{Read, repeat};
/// use std::io::{AsyncRead, repeat};
/// use rocket::Response;
///
/// let mut response = Response::new();
@ -1048,8 +1068,8 @@ impl<'r> Response<'r> {
/// ```
#[inline(always)]
pub fn set_chunked_body<B>(&mut self, body: B, chunk_size: u64)
where B: io::Read + 'r {
self.body = Some(Body::Chunked(Box::new(body), chunk_size));
where B: AsyncRead + Send + 'r {
self.body = Some(Body::Chunked(Box::pin(body), chunk_size));
}
/// Sets the body of `self` to be `body`. This method should typically not
@ -1070,10 +1090,11 @@ impl<'r> Response<'r> {
/// assert_eq!(response.body_string(), Some("Hello!".to_string()));
/// ```
#[inline(always)]
pub fn set_raw_body<T: io::Read + 'r>(&mut self, body: Body<T>) {
pub fn set_raw_body<T>(&mut self, body: Body<T>)
where T: AsyncRead + Send + Unpin + 'r {
self.body = Some(match body {
Body::Sized(b, n) => Body::Sized(Box::new(b.take(n)), n),
Body::Chunked(b, n) => Body::Chunked(Box::new(b), n),
Body::Sized(b, n) => Body::Sized(Box::pin(b.take(n)), n),
Body::Chunked(b, n) => Body::Chunked(Box::pin(b), n),
});
}

View File

@ -1,19 +1,20 @@
use std::io::Read;
use std::fmt::{self, Debug};
use futures::io::AsyncRead;
use crate::request::Request;
use crate::response::{Response, Responder, DEFAULT_CHUNK_SIZE};
use crate::http::Status;
/// Streams a response to a client from an arbitrary `Read`er type.
/// Streams a response to a client from an arbitrary `AsyncRead`er type.
///
/// The client is sent a "chunked" response, where the chunk size is at most
/// 4KiB. This means that at most 4KiB are stored in memory while the response
/// is being sent. This type should be used when sending responses that are
/// arbitrarily large in size, such as when streaming from a local socket.
pub struct Stream<T: Read>(T, u64);
pub struct Stream<T: AsyncRead>(T, u64);
impl<T: Read> Stream<T> {
impl<T: AsyncRead> Stream<T> {
/// Create a new stream from the given `reader` and sets the chunk size for
/// each streamed chunk to `chunk_size` bytes.
///
@ -34,7 +35,7 @@ impl<T: Read> Stream<T> {
}
}
impl<T: Read + Debug> Debug for Stream<T> {
impl<T: AsyncRead + Debug> Debug for Stream<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Stream").field(&self.0).finish()
}
@ -54,7 +55,7 @@ impl<T: Read + Debug> Debug for Stream<T> {
/// # #[allow(unused_variables)]
/// let response = Stream::from(io::stdin());
/// ```
impl<T: Read> From<T> for Stream<T> {
impl<T: AsyncRead> From<T> for Stream<T> {
fn from(reader: T) -> Self {
Stream(reader, DEFAULT_CHUNK_SIZE)
}
@ -68,7 +69,7 @@ impl<T: Read> From<T> for Stream<T> {
/// If reading from the input stream fails at any point during the response, the
/// response is abandoned, and the response ends abruptly. An error is printed
/// to the console with an indication of what went wrong.
impl<'r, T: Read + 'r> Responder<'r> for Stream<T> {
impl<'r, T: AsyncRead + Send + 'r> Responder<'r> for Stream<T> {
fn respond_to(self, _: &Request<'_>) -> Result<Response<'r>, Status> {
Response::build().chunked_body(self.0, self.1).ok()
}

View File

@ -1,25 +1,25 @@
use std::collections::HashMap;
use std::convert::From;
use std::str::{from_utf8, FromStr};
use std::convert::{From, TryInto};
use std::cmp::min;
use std::io::{self, Write};
use std::time::Duration;
use std::io;
use std::mem;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use futures::{Future, Stream};
use futures::future::{self, FutureResult};
use futures::compat::{Compat, Executor01CompatExt, Sink01CompatExt};
use futures::future::{Future, FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use yansi::Paint;
use state::Container;
use tokio::net::TcpListener;
use tokio::prelude::{Future as _, Stream as _};
#[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor;
use crate::{logger, handler};
use crate::ext::ReadExt;
use crate::config::{Config, FullConfig, ConfigError, LoggedValue};
use crate::request::{Request, FormItems};
use crate::data::Data;
@ -30,6 +30,7 @@ use crate::outcome::Outcome;
use crate::error::{LaunchError, LaunchErrorKind};
use crate::fairing::{Fairing, Fairings};
use crate::logger::PaintExt;
use crate::ext::AsyncReadExt;
use crate::http::{Method, Status, Header};
use crate::http::hyper::{self, header};
@ -46,45 +47,26 @@ pub struct Rocket {
fairings: Fairings,
}
struct RocketArcs {
config: Arc<Config>,
router: Arc<Router>,
default_catchers: Arc<HashMap<u16, Catcher>>,
catchers: Arc<HashMap<u16, Catcher>>,
state: Arc<Container>,
fairings: Arc<Fairings>,
struct RocketHyperService {
rocket: Arc<Rocket>,
spawn: Box<dyn futures::task::Spawn + Send>,
remote_addr: std::net::SocketAddr,
}
impl<Ctx> hyper::MakeService<Ctx> for RocketArcs {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::Error;
type Service = RocketHyperService;
type Future = FutureResult<Self::Service, Self::MakeError>;
type MakeError = Self::Error;
impl std::ops::Deref for RocketHyperService {
type Target = Rocket;
fn make_service(&mut self, _: Ctx) -> Self::Future {
future::ok(RocketHyperService::new(self))
fn deref(&self) -> &Self::Target {
&*self.rocket
}
}
#[derive(Clone)]
pub struct RocketHyperService {
config: Arc<Config>,
router: Arc<Router>,
default_catchers: Arc<HashMap<u16, Catcher>>,
catchers: Arc<HashMap<u16, Catcher>>,
state: Arc<Container>,
fairings: Arc<Fairings>,
}
#[doc(hidden)]
impl hyper::Service for RocketHyperService {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = hyper::Error;
//type Future = FutureResult<hyper::Response<Self::ResBody>, Self::Error>;
type Future = Box<future::Future<Item = hyper::Response<Self::ResBody>, Error = Self::Error> + Send>;
type Error = io::Error;
type Future = Compat<Pin<Box<dyn Future<Output = Result<hyper::Response<Self::ResBody>, Self::Error>> + Send>>>;
// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
@ -95,57 +77,142 @@ impl hyper::Service for RocketHyperService {
&mut self,
hyp_req: hyper::Request<Self::ReqBody>,
) -> Self::Future {
let (parts, body) = hyp_req.into_parts();
let rocket = self.rocket.clone();
let h_addr = self.remote_addr;
// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&self.config, &self.state, &parts);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&self.config, &self.state, Method::Get, Origin::dummy());
let r = self.handle_error(Status::BadRequest, &dummy);
return Box::new(future::ok(hyper::Response::from(r)));
}
};
// This future must return a hyper::Response, but that's not easy
// because the response body might borrow from the request. Instead,
// we do the body writing in another future that will send us
// the response metadata (and a body channel) beforehand.
let (tx, rx) = futures::channel::oneshot::channel();
let this = self.clone();
self.spawn.spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();
let response = body.concat2()
.map(move |chunk| {
let body = chunk.iter().rev().cloned().collect::<Vec<u8>>();
let data = Data::new(body);
// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
}
};
// TODO: Due to life time constraints the clone of the service has been made.
// TODO: It should not be necessary but it is required to find a better solution
let mut req = Request::from_hyp(&this.config, &this.state, &parts).unwrap();
// Dispatch the request to get a response, then write that response out.
let response = this.dispatch(&mut req, data);
hyper::Response::from(response)
});
// Retrieve the data from the hyper body.
let data = Data::from_hyp(h_body).await;
Box::new(response)
// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");
async move {
Ok(rx.await.expect("TODO.async: sender was dropped, error instead"))
}.boxed().compat()
}
}
impl RocketHyperService {
impl Rocket {
// TODO.async: Reconsider io::Result
#[inline]
fn new(rocket: &RocketArcs) -> RocketHyperService {
RocketHyperService {
config: rocket.config.clone(),
router: rocket.router.clone(),
default_catchers: rocket.default_catchers.clone(),
catchers: rocket.catchers.clone(),
state: rocket.state.clone(),
fairings: rocket.fairings.clone(),
fn issue_response<'r>(
&self,
response: Response<'r>,
tx: futures::channel::oneshot::Sender<hyper::Response<hyper::Body>>,
) -> impl Future<Output = ()> + 'r {
let result = self.write_response(response, tx);
async move {
match result.await {
Ok(()) => {
info_!("{}", Paint::green("Response succeeded."));
}
Err(e) => {
error_!("Failed to write response: {:?}.", e);
}
}
}
}
#[inline]
fn write_response<'r>(
&self,
mut response: Response<'r>,
tx: futures::channel::oneshot::Sender<hyper::Response<hyper::Body>>,
) -> impl Future<Output = io::Result<()>> + 'r {
async move {
let mut hyp_res = hyper::Response::builder();
hyp_res.status(response.status().code);
for header in response.headers().iter() {
let name = header.name.as_str();
let value = header.value.as_bytes();
hyp_res.header(name, value);
}
let send_response = move |mut hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> {
let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tx.send(response).expect("channel receiver should not be dropped");
Ok(())
};
match response.body() {
None => {
hyp_res.header(header::CONTENT_LENGTH, "0");
send_response(hyp_res, hyper::Body::empty())?;
}
Some(Body::Sized(body, size)) => {
hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(4096);
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});
while let Some(next) = stream.next().await {
sink.send(next?).await?;
}
// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
Some(Body::Chunked(body, chunk_size)) => {
// TODO.async: This is identical to Body::Sized except for the chunk size
let (sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});
while let Some(next) = stream.next().await {
sink.send(next?).await?;
}
// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
};
Ok(())
}
}
}
impl Rocket {
/// Preprocess the request for Rocket things. Currently, this means:
///
/// * Rewriting the method in the request if _method form field exists.
@ -159,7 +226,7 @@ impl RocketHyperService {
let is_form = req.content_type().map_or(false, |ct| ct.is_form());
if is_form && req.method() == Method::Post && data_len >= min_len {
if let Ok(form) = from_utf8(&data.peek()[..min(data_len, max_len)]) {
if let Ok(form) = std::str::from_utf8(&data.peek()[..min(data_len, max_len)]) {
let method: Option<Result<Method, _>> = FormItems::from(form)
.filter(|item| item.key.as_str() == "_method")
.map(|item| item.value.parse())
@ -173,75 +240,80 @@ impl RocketHyperService {
}
#[inline]
pub(crate) fn dispatch<'s, 'r>(
pub(crate) fn dispatch<'s, 'r: 's>(
&'s self,
request: &'r mut Request<'s>,
data: Data
) -> Response<'r> {
info!("{}:", request);
) -> impl Future<Output = Response<'r>> + 's {
async move {
info!("{}:", request);
// Do a bit of preprocessing before routing.
self.preprocess_request(request, &data);
// Do a bit of preprocessing before routing.
self.preprocess_request(request, &data);
// Run the request fairings.
self.fairings.handle_request(request, &data);
// Run the request fairings.
self.fairings.handle_request(request, &data);
// Remember if the request is a `HEAD` request for later body stripping.
let was_head_request = request.method() == Method::Head;
// Remember if the request is a `HEAD` request for later body stripping.
let was_head_request = request.method() == Method::Head;
// Route the request and run the user's handlers.
let mut response = self.route_and_process(request, data);
// Route the request and run the user's handlers.
let mut response = self.route_and_process(request, data).await;
// Add a default 'Server' header if it isn't already there.
// TODO: If removing Hyper, write out `Date` header too.
if !response.headers().contains("Server") {
response.set_header(Header::new("Server", "Rocket"));
// Add a default 'Server' header if it isn't already there.
// TODO: If removing Hyper, write out `Date` header too.
if !response.headers().contains("Server") {
response.set_header(Header::new("Server", "Rocket"));
}
// Run the response fairings.
self.fairings.handle_response(request, &mut response);
// Strip the body if this is a `HEAD` request.
if was_head_request {
response.strip_body();
}
response
}
// Run the response fairings.
self.fairings.handle_response(request, &mut response);
// Strip the body if this is a `HEAD` request.
if was_head_request {
response.strip_body();
}
response
}
/// Route the request and process the outcome to eventually get a response.
fn route_and_process<'s, 'r>(
fn route_and_process<'s, 'r: 's>(
&'s self,
request: &'r Request<'s>,
data: Data
) -> Response<'r> {
let mut response = match self.route(request, data) {
Outcome::Success(response) => response,
Outcome::Forward(data) => {
// There was no matching route. Autohandle `HEAD` requests.
if request.method() == Method::Head {
info_!("Autohandling {} request.", Paint::default("HEAD").bold());
) -> impl Future<Output = Response<'r>> + Send + 's {
async move {
let mut response = match self.route(request, data).await {
Outcome::Success(response) => response,
Outcome::Forward(data) => {
// There was no matching route. Autohandle `HEAD` requests.
if request.method() == Method::Head {
info_!("Autohandling {} request.", Paint::default("HEAD").bold());
// Dispatch the request again with Method `GET`.
request._set_method(Method::Get);
// Dispatch the request again with Method `GET`.
request._set_method(Method::Get);
// Return early so we don't set cookies twice.
return self.route_and_process(request, data);
} else {
// No match was found and it can't be autohandled. 404.
self.handle_error(Status::NotFound, request)
// Return early so we don't set cookies twice.
let try_next: Pin<Box<dyn Future<Output = _> + Send>> = Box::pin(self.route_and_process(request, data));
return try_next.await;
} else {
// No match was found and it can't be autohandled. 404.
self.handle_error(Status::NotFound, request).await
}
}
Outcome::Failure(status) => self.handle_error(status, request).await,
};
// Set the cookies. Note that error responses will only include
// cookies set by the error handler. See `handle_error` for more.
for cookie in request.cookies().delta() {
response.adjoin_header(cookie);
}
Outcome::Failure(status) => self.handle_error(status, request),
};
// Set the cookies. Note that error responses will only include cookies
// set by the error handler. See `handle_error` for more.
for cookie in request.cookies().delta() {
response.adjoin_header(cookie);
response
}
response
}
/// Tries to find a `Responder` for a given `request`. It does this by
@ -256,87 +328,75 @@ impl RocketHyperService {
// (ensuring `handler` takes an immutable borrow), any caller to `route`
// should be able to supply an `&mut` and retain an `&` after the call.
#[inline]
pub(crate) fn route<'s, 'r>(
pub(crate) fn route<'s, 'r: 's>(
&'s self,
request: &'r Request<'s>,
mut data: Data,
) -> handler::Outcome<'r> {
// Go through the list of matching routes until we fail or succeed.
let matches = self.router.route(request);
for route in matches {
// Retrieve and set the requests parameters.
info_!("Matched: {}", route);
request.set_route(route);
) -> impl Future<Output = handler::Outcome<'r>> + 's {
async move {
// Go through the list of matching routes until we fail or succeed.
let matches = self.router.route(request);
for route in matches {
// Retrieve and set the requests parameters.
info_!("Matched: {}", route);
request.set_route(route);
// Dispatch the request to the handler.
let outcome = route.handler.handle(request, data);
// Dispatch the request to the handler.
let outcome = route.handler.handle(request, data).await;
// Check if the request processing completed or if the request needs
// to be forwarded. If it does, continue the loop to try again.
info_!("{} {}", Paint::default("Outcome:").bold(), outcome);
match outcome {
o@Outcome::Success(_) | o@Outcome::Failure(_) => return o,
Outcome::Forward(unused_data) => data = unused_data,
};
// Check if the request processing completed (Some) or if the request needs
// to be forwarded. If it does, continue the loop (None) to try again.
info_!("{} {}", Paint::default("Outcome:").bold(), outcome);
match outcome {
o@Outcome::Success(_) | o@Outcome::Failure(_) => return o,
Outcome::Forward(unused_data) => data = unused_data,
}
}
error_!("No matching routes for {}.", request);
Outcome::Forward(data)
}
error_!("No matching routes for {}.", request);
Outcome::Forward(data)
}
// Finds the error catcher for the status `status` and executes it for the
// given request `req`; the cookies in `req` are reset to their original
// state before invoking the error handler. If a user has registered a
// catcher for `status`, the catcher is called. If the catcher fails to
// return a good response, the 500 catcher is executed. If there is no
// registered catcher for `status`, the default catcher is used.
pub(crate) fn handle_error<'r>(
&self,
// given request `req`. If a user has registered a catcher for `status`, the
// catcher is called. If the catcher fails to return a good response, the
// 500 catcher is executed. If there is no registered catcher for `status`,
// the default catcher is used.
pub(crate) fn handle_error<'s, 'r: 's>(
&'s self,
status: Status,
req: &'r Request<'_>
) -> Response<'r> {
warn_!("Responding with {} catcher.", Paint::red(&status));
req: &'r Request<'s>
) -> impl Future<Output = Response<'r>> + 's {
async move {
warn_!("Responding with {} catcher.", Paint::red(&status));
// For now, we reset the delta state to prevent any modifications from
// earlier, unsuccessful paths from being reflected in error response.
// We may wish to relax this in the future.
req.cookies().reset_delta();
// For now, we reset the delta state to prevent any modifications
// from earlier, unsuccessful paths from being reflected in error
// response. We may wish to relax this in the future.
req.cookies().reset_delta();
// Try to get the active catcher but fallback to user's 500 catcher.
let catcher = self.catchers.get(&status.code).unwrap_or_else(|| {
error_!("No catcher found for {}. Using 500 catcher.", status);
self.catchers.get(&500).expect("500 catcher.")
});
// Try to get the active catcher but fallback to user's 500 catcher.
let catcher = self.catchers.get(&status.code).unwrap_or_else(|| {
error_!("No catcher found for {}. Using 500 catcher.", status);
self.catchers.get(&500).expect("500 catcher.")
});
// Dispatch to the user's catcher. If it fails, use the default 500.
catcher.handle(req).unwrap_or_else(|err_status| {
error_!("Catcher failed with status: {}!", err_status);
warn_!("Using default 500 error catcher.");
let default = self.default_catchers.get(&500).expect("Default 500");
default.handle(req).expect("Default 500 response.")
})
// Dispatch to the user's catcher. If it fails, use the default 500.
match catcher.handle(req).await {
Ok(r) => return r,
Err(err_status) => {
error_!("Catcher failed with status: {}!", err_status);
warn_!("Using default 500 error catcher.");
let default = self.default_catchers.get(&500).expect("Default 500");
default.handle(req).await.expect("Default 500 response.")
}
}
}
}
}
impl Rocket {
#[inline]
pub(crate) fn dispatch<'s, 'r>(
&'s self,
request: &'r mut Request<'s>,
data: Data
) -> Response<'r> {
unimplemented!("TODO")
}
pub(crate) fn handle_error<'r>(
&self,
status: Status,
req: &'r Request
) -> Response<'r> {
unimplemented!("TODO")
}
/// Create a new `Rocket` application using the configuration information in
/// `Rocket.toml`. If the file does not exist or if there is an I/O error
/// reading the file, the defaults, overridden by any environment-based
@ -528,7 +588,6 @@ impl Rocket {
panic!("Invalid mount point.");
}
let mut router = self.router.clone();
for mut route in routes.into() {
let path = route.uri.clone();
if let Err(e) = route.set_uri(base_uri.clone(), path) {
@ -537,11 +596,9 @@ impl Rocket {
}
info_!("{}", route);
router.add(route);
self.router.add(route);
}
self.router = router;
self
}
@ -576,8 +633,6 @@ impl Rocket {
pub fn register(mut self, catchers: Vec<Catcher>) -> Self {
info!("{}{}", Paint::emoji("👾 "), Paint::magenta("Catchers:"));
let mut current_catchers = self.catchers.clone();
for c in catchers {
if self.catchers.get(&c.code).map_or(false, |e| !e.is_default) {
info_!("{} {}", c, Paint::yellow("(warning: duplicate catcher!)"));
@ -585,11 +640,9 @@ impl Rocket {
info_!("{}", c);
}
current_catchers.insert(c.code, c);
self.catchers.insert(c.code, c);
}
self.catchers = current_catchers;
self
}
@ -706,6 +759,8 @@ impl Rocket {
/// # }
/// ```
pub fn launch(mut self) -> LaunchError {
#[cfg(feature = "tls")] use crate::http::tls;
self = match self.prelaunch_check() {
Ok(rocket) => rocket,
Err(launch_error) => return launch_error
@ -720,56 +775,29 @@ impl Rocket {
.build()
.expect("Cannot build runtime!");
let threads = self.config.workers as usize;
let full_addr = format!("{}:{}", self.config.address, self.config.port);
let addrs = match full_addr.to_socket_addrs() {
Ok(a) => a.collect::<Vec<_>>(),
// TODO.async: Reconsider this error type
Err(e) => return From::from(io::Error::new(io::ErrorKind::Other, e)),
};
let full_addr = format!("{}:{}", self.config.address, self.config.port)
.to_socket_addrs()
.expect("A valid socket address")
.next()
.unwrap();
// TODO.async: support for TLS, unix sockets.
// Likely will be implemented with a custom "Incoming" type.
let listener = match TcpListener::bind(&full_addr) {
Ok(listener) => listener,
let mut incoming = match hyper::AddrIncoming::bind(&addrs[0]) {
Ok(incoming) => incoming,
Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)),
};
// Determine the address and port we actually binded to.
match listener.local_addr() {
Ok(server_addr) => /* TODO self.config.port = */ server_addr.port(),
Err(e) => return LaunchError::from(e),
};
self.config.port = incoming.local_addr().port();
let proto;
let incoming;
let proto = "http://";
#[cfg(feature = "tls")]
{
// TODO.async: Can/should we make the clone unnecessary (by reference, or by moving out?)
if let Some(tls) = self.config.tls.clone() {
proto = "https://";
let mut config = tls::rustls::ServerConfig::new(tls::rustls::NoClientAuth::new());
config.set_single_cert(tls.certs, tls.key).expect("invalid key or certificate");
// TODO.async: I once observed an unhandled AlertReceived(UnknownCA) but
// have no idea what happened and cannot reproduce.
let config = TlsAcceptor::from(Arc::new(config));
incoming = Box::new(listener.incoming().and_then(move |stream| {
config.accept(stream)
.map(|stream| Box::new(stream))
}));
}
else {
proto = "http://";
incoming = Box::new(listener.incoming().map(|stream| Box::new(stream)));
}
}
#[cfg(not(feature = "tls"))]
{
proto = "http://";
incoming = Box::new(listener.incoming().map(|stream| Box::new(stream)));
}
// Set the keep-alive.
let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64));
incoming.set_keepalive(timeout);
// Freeze managed state for synchronization-free accesses later.
self.state.freeze();
@ -786,17 +814,25 @@ impl Rocket {
// Restore the log level back to what it originally was.
logger::pop_max_level();
let arcs = RocketArcs::from(self);
let rocket = Arc::new(self);
let spawn = Box::new(runtime.executor().compat());
let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| {
futures::future::ok::<_, Box<dyn std::error::Error + Send + Sync>>(RocketHyperService {
rocket: rocket.clone(),
spawn: spawn.clone(),
remote_addr: socket.remote_addr(),
}).compat()
});
// NB: executor must be passed manually here, see hyperium/hyper#1537
let server = hyper::Server::builder(incoming)
.executor(runtime.executor())
.serve(arcs);
.serve(service);
// TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error>
runtime.block_on(server).expect("TODO.async handle error");
unreachable!("the call to `handle_threads` should block on success")
unreachable!("the call to `block_on` should block on success")
}
/// Returns an iterator over all of the routes mounted on this instance of
@ -881,47 +917,3 @@ impl Rocket {
&self.config
}
}
impl From<Rocket> for RocketArcs {
fn from(mut rocket: Rocket) -> Self {
RocketArcs {
config: Arc::new(rocket.config),
router: Arc::new(rocket.router),
default_catchers: Arc::new(rocket.default_catchers),
catchers: Arc::new(rocket.catchers),
state: Arc::new(rocket.state),
fairings: Arc::new(rocket.fairings),
}
}
}
// TODO: consider try_from here?
impl<'a> From<Response<'a>> for hyper::Response<hyper::Body> {
fn from(mut response: Response) -> Self {
let mut builder = hyper::Response::builder();
builder.status(hyper::StatusCode::from_u16(response.status().code).expect(""));
for header in response.headers().iter() {
// FIXME: Using hyper here requires two allocations.
let name = hyper::HeaderName::from_str(&header.name.into_string()).unwrap();
let value = hyper::HeaderValue::from_bytes(header.value.as_bytes()).unwrap();
builder.header(name, value);
}
match response.body() {
None => {
builder.body(hyper::Body::empty())
},
Some(Body::Sized(body, size)) => {
let mut buffer = Vec::with_capacity(size as usize);
body.read_to_end(&mut buffer);
builder.header(header::CONTENT_LENGTH, hyper::HeaderValue::from(size));
builder.body(hyper::Body::from(buffer))
},
Some(Body::Chunked(mut body, chunk_size)) => {
unimplemented!()
}
}.unwrap()
}
}

View File

@ -3,6 +3,8 @@ mod route;
use std::collections::hash_map::HashMap;
use futures::future::{Future, FutureExt};
pub use self::route::Route;
use crate::request::Request;
@ -12,11 +14,11 @@ use crate::http::Method;
type Selector = Method;
// A handler to use when one is needed temporarily.
pub(crate) fn dummy_handler<'r>(r: &'r crate::Request<'_>, _: crate::Data) -> crate::handler::Outcome<'r> {
crate::Outcome::from(r, ())
pub(crate) fn dummy_handler<'r>(r: &'r Request<'_>, _: crate::Data) -> std::pin::Pin<Box<dyn Future<Output = crate::handler::Outcome<'r>> + Send + 'r>> {
futures::future::ready(crate::Outcome::from(r, ())).boxed()
}
#[derive(Default, Clone)]
#[derive(Default)]
pub struct Router {
routes: HashMap<Selector, Vec<Route>>,
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
use rocket;
@ -51,8 +51,8 @@ fn test_derive_reexports() {
let client = Client::new(rocket).unwrap();
let mut response = client.get("/").dispatch();
assert_eq!(response.body_string().unwrap(), "hello");
assert_eq!(response.body_string_wait().unwrap(), "hello");
let mut response = client.get("/?thing=b").dispatch();
assert_eq!(response.body_string().unwrap(), "b");
assert_eq!(response.body_string_wait().unwrap(), "b");
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -35,13 +35,15 @@ mod fairing_before_head_strip {
}))
.attach(AdHoc::on_response("Check HEAD 2", |req, res| {
assert_eq!(req.method(), Method::Head);
assert_eq!(res.body_string(), Some(RESPONSE_STRING.into()));
// TODO.async: Needs async on_response fairings
// assert_eq!(res.body_string().await, Some(RESPONSE_STRING.into()));
}));
let client = Client::new(rocket).unwrap();
let mut response = client.head("/").dispatch();
assert_eq!(response.status(), Status::Ok);
assert!(response.body().is_none());
// TODO.async: See above
// assert!(response.body().is_none());
}
#[test]
@ -62,12 +64,14 @@ mod fairing_before_head_strip {
}))
.attach(AdHoc::on_response("Check GET", |req, res| {
assert_eq!(req.method(), Method::Get);
assert_eq!(res.body_string(), Some(RESPONSE_STRING.into()));
// TODO.async: Needs async on_response fairings
// assert_eq!(res.body_string().await, Some(RESPONSE_STRING.into()));
}));
let client = Client::new(rocket).unwrap();
let mut response = client.head("/").dispatch();
assert_eq!(response.status(), Status::Ok);
assert!(response.body().is_none());
// TODO.async: See above
// assert!(response.body().is_none());
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -49,7 +49,7 @@ mod flash_lazy_remove_tests {
// Now use it.
let mut response = client.get("/use").dispatch();
assert_eq!(response.body_string(), Some(FLASH_MESSAGE.into()));
assert_eq!(response.body_string_wait(), Some(FLASH_MESSAGE.into()));
// Now it should be gone.
let response = client.get("/unused").dispatch();

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -28,7 +28,7 @@ mod tests {
.body("_method=patch&form_data=Form+data")
.dispatch();
assert_eq!(response.body_string(), Some("OK".into()));
assert_eq!(response.body_string_wait(), Some("OK".into()));
}
#[test]

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -28,7 +28,7 @@ mod tests {
.dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(Some(decoded.to_string()), response.body_string());
assert_eq!(Some(decoded.to_string()), response.body_string_wait());
}
#[test]

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> {
mod head_handling_tests {
use super::*;
use std::io::Read;
use futures::io::AsyncReadExt;
use rocket::Route;
use rocket::local::Client;
@ -33,13 +33,15 @@ mod head_handling_tests {
routes![index, empty, other]
}
fn assert_empty_sized_body<T: Read>(body: Body<T>, expected_size: u64) {
fn assert_empty_sized_body<T: futures::AsyncRead + Unpin>(body: Body<T>, expected_size: u64) {
match body {
Body::Sized(mut body, size) => {
let mut buffer = vec![];
let n = body.read_to_end(&mut buffer).unwrap();
futures::executor::block_on(async {
body.read_to_end(&mut buffer).await.unwrap();
});
assert_eq!(size, expected_size);
assert_eq!(n, 0);
assert_eq!(buffer.len(), 0);
}
_ => panic!("Expected a sized body.")
}
@ -57,7 +59,7 @@ mod head_handling_tests {
let mut response = client.head("/empty").dispatch();
assert_eq!(response.status(), Status::NoContent);
assert!(response.body_bytes().is_none());
assert!(response.body_bytes_wait().is_none());
}
#[test]

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -36,7 +36,7 @@ mod limits_tests {
.header(ContentType::Form)
.dispatch();
assert_eq!(response.body_string(), Some("Hello world".into()));
assert_eq!(response.body_string_wait(), Some("Hello world".into()));
}
#[test]
@ -47,7 +47,7 @@ mod limits_tests {
.header(ContentType::Form)
.dispatch();
assert_eq!(response.body_string(), Some("Hello world".into()));
assert_eq!(response.body_string_wait(), Some("Hello world".into()));
}
#[test]
@ -69,6 +69,6 @@ mod limits_tests {
.header(ContentType::Form)
.dispatch();
assert_eq!(response.body_string(), Some("Hell".into()));
assert_eq!(response.body_string_wait(), Some("Hell".into()));
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -25,12 +25,12 @@ use rocket::data::{self, FromDataSimple};
impl FromDataSimple for HasContentType {
type Error = ();
fn from_data(request: &Request, data: Data) -> data::Outcome<Self, ()> {
if request.content_type().is_some() {
fn from_data(request: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, Self::Error> {
Box::pin(futures::future::ready(if request.content_type().is_some() {
Success(HasContentType)
} else {
Forward(data)
}
}))
}
}
@ -65,14 +65,14 @@ mod local_request_content_type_tests {
let client = Client::new(rocket()).unwrap();
let mut req = client.post("/");
assert_eq!(req.clone().dispatch().body_string(), Some("Absent".to_string()));
assert_eq!(req.mut_dispatch().body_string(), Some("Absent".to_string()));
assert_eq!(req.dispatch().body_string(), Some("Absent".to_string()));
assert_eq!(req.clone().dispatch().body_string_wait(), Some("Absent".to_string()));
assert_eq!(req.mut_dispatch().body_string_wait(), Some("Absent".to_string()));
assert_eq!(req.dispatch().body_string_wait(), Some("Absent".to_string()));
let mut req = client.post("/data");
assert_eq!(req.clone().dispatch().body_string(), Some("Data Absent".to_string()));
assert_eq!(req.mut_dispatch().body_string(), Some("Data Absent".to_string()));
assert_eq!(req.dispatch().body_string(), Some("Data Absent".to_string()));
assert_eq!(req.clone().dispatch().body_string_wait(), Some("Data Absent".to_string()));
assert_eq!(req.mut_dispatch().body_string_wait(), Some("Data Absent".to_string()));
assert_eq!(req.dispatch().body_string_wait(), Some("Data Absent".to_string()));
}
#[test]
@ -80,13 +80,13 @@ mod local_request_content_type_tests {
let client = Client::new(rocket()).unwrap();
let mut req = client.post("/").header(ContentType::JSON);
assert_eq!(req.clone().dispatch().body_string(), Some("Present".to_string()));
assert_eq!(req.mut_dispatch().body_string(), Some("Present".to_string()));
assert_eq!(req.dispatch().body_string(), Some("Present".to_string()));
assert_eq!(req.clone().dispatch().body_string_wait(), Some("Present".to_string()));
assert_eq!(req.mut_dispatch().body_string_wait(), Some("Present".to_string()));
assert_eq!(req.dispatch().body_string_wait(), Some("Present".to_string()));
let mut req = client.post("/data").header(ContentType::JSON);
assert_eq!(req.clone().dispatch().body_string(), Some("Data Present".to_string()));
assert_eq!(req.mut_dispatch().body_string(), Some("Data Present".to_string()));
assert_eq!(req.dispatch().body_string(), Some("Data Present".to_string()));
assert_eq!(req.clone().dispatch().body_string_wait(), Some("Data Present".to_string()));
assert_eq!(req.mut_dispatch().body_string_wait(), Some("Data Present".to_string()));
assert_eq!(req.dispatch().body_string_wait(), Some("Data Present".to_string()));
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use]
#[cfg(feature = "private-cookies")]
@ -30,7 +30,7 @@ mod private_cookie_test {
let req = client.get("/").private_cookie(Cookie::new("cookie_name", "cookie_value"));
let mut response = req.dispatch();
assert_eq!(response.body_string(), Some("cookie_value".into()));
assert_eq!(response.body_string_wait(), Some("cookie_value".into()));
assert_eq!(response.headers().get_one("Set-Cookie"), None);
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -47,14 +47,14 @@ mod nested_fairing_attaches_tests {
fn test_counts() {
let client = Client::new(rocket()).unwrap();
let mut response = client.get("/").dispatch();
assert_eq!(response.body_string(), Some("1, 1".into()));
assert_eq!(response.body_string_wait(), Some("1, 1".into()));
let mut response = client.get("/").dispatch();
assert_eq!(response.body_string(), Some("1, 2".into()));
assert_eq!(response.body_string_wait(), Some("1, 2".into()));
client.get("/").dispatch();
client.get("/").dispatch();
let mut response = client.get("/").dispatch();
assert_eq!(response.body_string(), Some("1, 5".into()));
assert_eq!(response.body_string_wait(), Some("1, 5".into()));
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -45,7 +45,7 @@ mod tests {
}
let mut response = req.dispatch();
let body_str = response.body_string();
let body_str = response.body_string_wait();
let body: Option<&'static str> = $body;
match body {
Some(string) => assert_eq!(body_str, Some(string.to_string())),

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#![allow(dead_code)] // This test is only here so that we can ensure it compiles.
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -17,7 +17,7 @@ mod route_guard_tests {
fn assert_path(client: &Client, path: &str) {
let mut res = client.get(path).dispatch();
assert_eq!(res.body_string(), Some(path.into()));
assert_eq!(res.body_string_wait(), Some(path.into()));
}
#[test]

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -48,7 +48,7 @@ mod tests {
{
let path = "this/is/the/path/we/want";
let mut response = client.get(format!("{}/{}", prefix, path)).dispatch();
assert_eq!(response.body_string(), Some(path.into()));
assert_eq!(response.body_string_wait(), Some(path.into()));
}
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -40,7 +40,7 @@ mod strict_and_lenient_forms_tests {
.dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.body_string(), Some(FIELD_VALUE.into()));
assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into()));
let response = client.post("/strict")
.header(ContentType::Form)
@ -59,7 +59,7 @@ mod strict_and_lenient_forms_tests {
.dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.body_string(), Some(FIELD_VALUE.into()));
assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into()));
let mut response = client.post("/lenient")
.header(ContentType::Form)
@ -67,6 +67,6 @@ mod strict_and_lenient_forms_tests {
.dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.body_string(), Some(FIELD_VALUE.into()));
assert_eq!(response.body_string_wait(), Some(FIELD_VALUE.into()));
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
@ -54,6 +54,6 @@ mod tests {
let name = Uri::percent_encode(NAME);
let mut response = client.get(format!("/hello/{}", name)).dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.body_string().unwrap(), format!("Hello, {}!", NAME));
assert_eq!(response.body_string_wait().unwrap(), format!("Hello, {}!", NAME));
}
}

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate serde_derive;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate serde_derive;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[cfg(test)] mod tests;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate rocket_contrib;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate serde_derive;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate serde_derive;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate diesel;

View File

@ -1,4 +1,4 @@
#![feature(proc_macro_hygiene)]
#![feature(proc_macro_hygiene, async_await)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate lazy_static;

View File

@ -1,3 +1,3 @@
#![feature(external_doc)]
// #![feature(external_doc)]
rocket::rocket_internal_guide_tests!("../guide/*.md");
// rocket::rocket_internal_guide_tests!("../guide/*.md");