Upgrade to tokio 0.2.0.

* Update 'tokio', 'tokio-rustls', and 'hyper'.
* Remove unused dependencies on some `futures-*` crates.
* Rework 'spawn_on', which is now 'serve'.
* Simplify Ctrl-C handling.
This commit is contained in:
Paolo Barbolini 2019-12-10 16:34:23 -08:00 committed by Sergio Benitez
parent 7c4cd068d1
commit 003bf77c29
39 changed files with 147 additions and 163 deletions

View File

@ -73,7 +73,7 @@ pub fn database_attr(attr: TokenStream, input: TokenStream) -> Result<TokenStrea
let databases = quote_spanned!(span => ::rocket_contrib::databases); let databases = quote_spanned!(span => ::rocket_contrib::databases);
let Poolable = quote_spanned!(span => #databases::Poolable); let Poolable = quote_spanned!(span => #databases::Poolable);
let r2d2 = quote_spanned!(span => #databases::r2d2); let r2d2 = quote_spanned!(span => #databases::r2d2);
let tokio_executor = quote_spanned!(span => #databases::tokio_executor); let spawn_blocking = quote_spanned!(span => #databases::spawn_blocking);
let request = quote!(::rocket::request); let request = quote!(::rocket::request);
let generated_types = quote_spanned! { span => let generated_types = quote_spanned! { span =>
@ -149,14 +149,16 @@ pub fn database_attr(attr: TokenStream, input: TokenStream) -> Result<TokenStrea
Box::pin(async move { Box::pin(async move {
let pool = ::rocket::try_outcome!(request.guard::<::rocket::State<'_, #pool_type>>()).0.clone(); let pool = ::rocket::try_outcome!(request.guard::<::rocket::State<'_, #pool_type>>()).0.clone();
#tokio_executor::blocking::run(move || { #spawn_blocking(move || {
match pool.get() { match pool.get() {
Ok(conn) => Outcome::Success(#guard_type(conn)), Ok(conn) => Outcome::Success(#guard_type(conn)),
Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())), Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())),
} }
}).await }).await.expect("failed to spawn a blocking task to get a pooled connection")
}) })
} }
} }
// TODO.async: What about spawn_blocking on drop?
}.into()) }.into())
} }

View File

@ -9,6 +9,7 @@ struct Unknown;
//~^^^ ERROR no method named `get` //~^^^ ERROR no method named `get`
struct A(Unknown); struct A(Unknown);
//~^ ERROR Unknown: rocket_contrib::databases::Poolable //~^ ERROR Unknown: rocket_contrib::databases::Poolable
//~^^ ERROR Unknown: rocket_contrib::databases::Poolable
#[database("foo")] #[database("foo")]
//~^ ERROR Vec<i32>: rocket_contrib::databases::Poolable //~^ ERROR Vec<i32>: rocket_contrib::databases::Poolable
@ -16,5 +17,6 @@ struct A(Unknown);
//~^^^ ERROR no method named `get` //~^^^ ERROR no method named `get`
struct B(Vec<i32>); struct B(Vec<i32>);
//~^ ERROR Vec<i32>: rocket_contrib::databases::Poolable //~^ ERROR Vec<i32>: rocket_contrib::databases::Poolable
//~^^ ERROR Vec<i32>: rocket_contrib::databases::Poolable
fn main() { } fn main() { }

View File

@ -14,12 +14,12 @@ edition = "2018"
[features] [features]
# Internal use only. # Internal use only.
templates = ["serde", "serde_json", "glob", "notify"] templates = ["serde", "serde_json", "glob", "notify"]
databases = ["r2d2", "tokio-executor", "rocket_contrib_codegen/database_attribute"] databases = ["r2d2", "tokio/blocking", "tokio/rt-threaded", "rocket_contrib_codegen/database_attribute"]
# User-facing features. # User-facing features.
default = ["json", "serve"] default = ["json", "serve"]
json = ["serde", "serde_json"] json = ["serde", "serde_json", "tokio/io-util"]
msgpack = ["serde", "rmp-serde"] msgpack = ["serde", "rmp-serde", "tokio/io-util"]
tera_templates = ["tera", "templates"] tera_templates = ["tera", "templates"]
handlebars_templates = ["handlebars", "templates"] handlebars_templates = ["handlebars", "templates"]
helmet = ["time"] helmet = ["time"]
@ -42,8 +42,7 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"]
[dependencies] [dependencies]
# Global dependencies. # Global dependencies.
futures-util-preview = "0.3.0-alpha.19" tokio = { version = "0.2.0", optional = true }
tokio-io = "=0.2.0-alpha.6"
rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true }
rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false }
log = "0.4" log = "0.4"
@ -66,7 +65,6 @@ uuid = { version = ">=0.7.0, <0.9.0", optional = true }
diesel = { version = "1.0", default-features = false, optional = true } diesel = { version = "1.0", default-features = false, optional = true }
postgres = { version = "0.17", optional = true } postgres = { version = "0.17", optional = true }
r2d2 = { version = "0.8", optional = true } r2d2 = { version = "0.8", optional = true }
tokio-executor = { version = "0.2.0-alpha.6", optional = true }
r2d2_postgres = { version = "0.16", optional = true } r2d2_postgres = { version = "0.16", optional = true }
mysql = { version = "17.0", optional = true } mysql = { version = "17.0", optional = true }
r2d2_mysql = { version = "17.0", optional = true } r2d2_mysql = { version = "17.0", optional = true }
@ -89,7 +87,7 @@ brotli = { version = "3.3", optional = true }
flate2 = { version = "1.0", optional = true } flate2 = { version = "1.0", optional = true }
[dev-dependencies] [dev-dependencies]
tokio-timer = "=0.3.0-alpha.6" tokio = { version = "0.2.0", features = ["time"] }
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

View File

@ -398,7 +398,7 @@
pub extern crate r2d2; pub extern crate r2d2;
#[doc(hidden)] #[doc(hidden)]
pub extern crate tokio_executor; pub use tokio::task::spawn_blocking;
#[cfg(any(feature = "diesel_sqlite_pool", #[cfg(any(feature = "diesel_sqlite_pool",
feature = "diesel_postgres_pool", feature = "diesel_postgres_pool",

View File

@ -18,7 +18,7 @@ use std::ops::{Deref, DerefMut};
use std::io; use std::io;
use std::iter::FromIterator; use std::iter::FromIterator;
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
use rocket::request::Request; use rocket::request::Request;
use rocket::outcome::Outcome::*; use rocket::outcome::Outcome::*;

View File

@ -16,7 +16,7 @@
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
use rocket::request::Request; use rocket::request::Request;
use rocket::outcome::Outcome::*; use rocket::outcome::Outcome::*;

View File

@ -169,7 +169,7 @@ mod templates_tests {
} }
// otherwise, retry a few times, waiting 250ms in between // otherwise, retry a few times, waiting 250ms in between
tokio_timer::delay_for(Duration::from_millis(250)).await; tokio::time::delay_for(Duration::from_millis(250)).await;
} }
panic!("failed to reload modified template in 1.5s"); panic!("failed to reload modified template in 1.5s");

View File

@ -28,6 +28,5 @@ version_check = "0.9.1"
[dev-dependencies] [dev-dependencies]
rocket = { version = "0.5.0-dev", path = "../lib" } rocket = { version = "0.5.0-dev", path = "../lib" }
futures-preview = "0.3.0-alpha.19" tokio = { version = "0.2.0", features = ["io-util"] }
tokio-io = "0.2.0-alpha.6"
compiletest_rs = { version = "0.3", features = ["stable"] } compiletest_rs = { version = "0.3", features = ["stable"] }

View File

@ -22,7 +22,7 @@ impl FromDataSimple for Simple {
fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> {
Box::pin(async { Box::pin(async {
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut string = String::new(); let mut string = String::new();
let mut stream = data.open().take(64); let mut stream = data.open().take(64);

View File

@ -30,7 +30,7 @@ impl FromDataSimple for Simple {
fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> {
Box::pin(async move { Box::pin(async move {
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
let mut string = String::new(); let mut string = String::new();
let mut stream = data.open().take(64); let mut stream = data.open().take(64);

View File

@ -22,21 +22,17 @@ private-cookies = ["cookie/private", "cookie/key-expansion"]
[dependencies] [dependencies]
smallvec = "1.0" smallvec = "1.0"
percent-encoding = "1" percent-encoding = "1"
# TODO.async: stop using stream-unstable hyper = { version = "0.13.0", default-features = false }
hyper = { version = "=0.13.0-alpha.4", default-features = false, features = ["unstable-stream"] } http = "0.2"
http = "0.1.17"
mime = "0.3.13" mime = "0.3.13"
time = "0.2.11" time = "0.2.11"
indexmap = "1.0" indexmap = "1.0"
state = "0.4" state = "0.4"
tokio-rustls = { version = "0.12.0-alpha.4", optional = true } tokio-rustls = { version = "0.12.0", optional = true }
tokio-io = "=0.2.0-alpha.6" tokio = { version = "0.2.0", features = ["sync", "tcp", "time"] }
tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp"] }
tokio-timer = "=0.3.0-alpha.6"
cookie = { version = "0.14.0", features = ["percent-encode"] } cookie = { version = "0.14.0", features = ["percent-encode"] }
pear = "0.1" pear = "0.1"
unicode-xid = "0.2" unicode-xid = "0.2"
futures-core-preview = "0.3.0-alpha.19"
log = "0.4" log = "0.4"
[dev-dependencies] [dev-dependencies]

View File

@ -5,11 +5,11 @@
//! while necessary. //! while necessary.
#[doc(hidden)] pub use hyper::{Body, Request, Response, Server}; #[doc(hidden)] pub use hyper::{Body, Request, Response, Server};
#[doc(hidden)] pub use hyper::body::{Payload, Sender as BodySender}; #[doc(hidden)] pub use hyper::body::{Bytes, HttpBody, Sender as BodySender};
#[doc(hidden)] pub use hyper::error::Error; #[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, MakeService, Service}; #[doc(hidden)] pub use hyper::rt::Executor;
#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, Service};
#[doc(hidden)] pub use hyper::Chunk;
#[doc(hidden)] pub use http::header::HeaderMap; #[doc(hidden)] pub use http::header::HeaderMap;
#[doc(hidden)] pub use http::header::HeaderName as HeaderName; #[doc(hidden)] pub use http::header::HeaderName as HeaderName;
#[doc(hidden)] pub use http::header::HeaderValue as HeaderValue; #[doc(hidden)] pub use http::header::HeaderValue as HeaderValue;

View File

@ -4,15 +4,15 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::Duration;
use hyper::server::accept::Accept; use hyper::server::accept::Accept;
use log::{debug, error}; use log::{debug, error};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio::time::Delay;
use tokio_net::tcp::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
// TODO.async: 'Listener' and 'Connection' provide common enough functionality // TODO.async: 'Listener' and 'Connection' provide common enough functionality
// that they could be introduced in upstream libraries. // that they could be introduced in upstream libraries.
@ -100,8 +100,7 @@ impl<L: Listener> Incoming<L> {
error!("accept error: {}", e); error!("accept error: {}", e);
// Sleep for the specified duration // Sleep for the specified duration
let delay = Instant::now() + duration; let mut error_delay = tokio::time::delay_for(duration);
let mut error_delay = tokio_timer::delay(delay);
match Pin::new(&mut error_delay).poll(cx) { match Pin::new(&mut error_delay).poll(cx) {
Poll::Ready(()) => { Poll::Ready(()) => {
@ -127,8 +126,7 @@ impl<L: Listener + Unpin> Accept for Incoming<L> {
type Error = io::Error; type Error = io::Error;
fn poll_accept(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Conn, Self::Error>>> { fn poll_accept(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let result = futures_core::ready!(self.poll_next(cx)); self.poll_next(cx).map(Some)
Poll::Ready(Some(result))
} }
} }
@ -170,10 +168,7 @@ impl Listener for TcpListener {
} }
fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Connection, io::Error>> { fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Connection, io::Error>> {
// NB: This is only okay because TcpListener::accept() is stateless. self.poll_accept(cx).map_ok(|(stream, _addr)| stream)
let mut accept = self.accept();
let accept = unsafe { Pin::new_unchecked(&mut accept) };
accept.poll(cx).map_ok(|(stream, _addr)| stream)
} }
} }

View File

@ -7,7 +7,7 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio_net::tcp::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::{TlsAcceptor, server::TlsStream}; use tokio_rustls::{TlsAcceptor, server::TlsStream};
use tokio_rustls::rustls; use tokio_rustls::rustls;
@ -91,7 +91,7 @@ impl Listener for TlsListener {
TlsListenerState::Listening => { TlsListenerState::Listening => {
match self.listener.poll_accept(cx) { match self.listener.poll_accept(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(stream)) => { Poll::Ready(Ok((stream, _addr))) => {
self.state = TlsListenerState::Accepting(Box::pin(self.acceptor.accept(stream))); self.state = TlsListenerState::Accepting(Box::pin(self.acceptor.accept(stream)));
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {

View File

@ -27,11 +27,8 @@ ctrl_c_shutdown = ["tokio/signal"]
[dependencies] [dependencies]
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" } rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" } rocket_http = { version = "0.5.0-dev", path = "../http" }
futures-core-preview = "0.3.0-alpha.19" futures-util = "0.3.0"
futures-channel-preview = "0.3.0-alpha.19" tokio = { version = "0.2.0", features = ["fs", "io-std", "io-util", "rt-threaded", "sync"] }
futures-util-preview = "0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"
tokio-io = "=0.2.0-alpha.6"
yansi = "0.5" yansi = "0.5"
log = { version = "0.4", features = ["std"] } log = { version = "0.4", features = ["std"] }
toml = "0.4.7" toml = "0.4.7"
@ -48,7 +45,6 @@ yansi = "0.5"
version_check = "0.9.1" version_check = "0.9.1"
[dev-dependencies] [dev-dependencies]
futures-preview = "0.3.0-alpha.19"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }
# TODO: Find a way to not depend on this. # TODO: Find a way to not depend on this.
lazy_static = "1.0" lazy_static = "1.0"
tokio = { version = "0.2.0", features = ["macros"] }

View File

@ -154,7 +154,7 @@ macro_rules! default_catchers {
let mut map = HashMap::new(); let mut map = HashMap::new();
$( $(
fn $fn_name<'r>(req: &'r Request<'_>) -> futures_core::future::BoxFuture<'r, response::Result<'r>> { fn $fn_name<'r>(req: &'r Request<'_>) -> futures_util::future::BoxFuture<'r, response::Result<'r>> {
status::Custom(Status::from_code($code).unwrap(), status::Custom(Status::from_code($code).unwrap(),
content::Html(error_page_template!($code, $name, $description)) content::Html(error_page_template!($code, $name, $description))
).respond_to(req) ).respond_to(req)

View File

@ -1,4 +1,4 @@
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::{Request, Data}; use crate::{Request, Data};
use crate::handler::{Outcome, ErrorHandler}; use crate::handler::{Outcome, ErrorHandler};

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use tokio_io::{AsyncRead, AsyncWrite, AsyncReadExt as _}; use tokio::io::{AsyncRead, AsyncWrite};
use super::data_stream::DataStream; use super::data_stream::DataStream;
@ -128,7 +128,7 @@ impl Data {
/// A helper method to write the body of the request to any `Write` type. /// A helper method to write the body of the request to any `Write` type.
/// ///
/// This method is identical to `io::copy(&mut data.open(), writer)`. /// This method is identical to `tokio::io::copy(&mut data.open(), &mut writer)`.
/// ///
/// # Example /// # Example
/// ///
@ -146,7 +146,7 @@ impl Data {
pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future<Output = io::Result<u64>> + 'w { pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future<Output = io::Result<u64>> + 'w {
Box::pin(async move { Box::pin(async move {
let mut stream = self.open(); let mut stream = self.open();
stream.copy(&mut writer).await tokio::io::copy(&mut stream, &mut writer).await
}) })
} }

View File

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio_io::AsyncRead; use tokio::io::AsyncRead;
// TODO.async: Consider storing the real type here instead of a Box to avoid // TODO.async: Consider storing the real type here instead of a Box to avoid
// the dynamic dispatch // the dynamic dispatch

View File

@ -1,8 +1,8 @@
use std::borrow::Borrow; use std::borrow::Borrow;
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use futures_util::future::{ready, FutureExt}; use futures_util::future::{ready, FutureExt};
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
use crate::outcome::{self, IntoOutcome}; use crate::outcome::{self, IntoOutcome};
use crate::outcome::Outcome::*; use crate::outcome::Outcome::*;

View File

@ -2,23 +2,23 @@ use std::io::{self, Cursor};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use futures_core::{ready, future::BoxFuture, stream::Stream}; use futures_util::{ready, future::BoxFuture, stream::Stream};
use tokio_io::{AsyncRead, AsyncReadExt as _}; use tokio::io::{AsyncRead, AsyncReadExt as _};
use crate::http::hyper; use crate::http::hyper;
use hyper::{Chunk, Payload}; use hyper::{Bytes, HttpBody};
pub struct IntoChunkStream<R> { pub struct IntoBytesStream<R> {
inner: R, inner: R,
buf_size: usize, buf_size: usize,
buffer: Vec<u8>, buffer: Vec<u8>,
} }
// TODO.async: Verify correctness of this implementation. // TODO.async: Verify correctness of this implementation.
impl<R> Stream for IntoChunkStream<R> impl<R> Stream for IntoBytesStream<R>
where R: AsyncRead + Unpin where R: AsyncRead + Unpin
{ {
type Item = Result<Chunk, io::Error>; type Item = Result<Bytes, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
assert!(self.buffer.len() == self.buf_size); assert!(self.buffer.len() == self.buf_size);
@ -32,15 +32,15 @@ impl<R> Stream for IntoChunkStream<R>
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
let mut next = std::mem::replace(buffer, vec![0; buf_size]); let mut next = std::mem::replace(buffer, vec![0; buf_size]);
next.truncate(n); next.truncate(n);
Poll::Ready(Some(Ok(Chunk::from(next)))) Poll::Ready(Some(Ok(Bytes::from(next))))
} }
} }
} }
} }
pub trait AsyncReadExt: AsyncRead { pub trait AsyncReadExt: AsyncRead {
fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream<Self> where Self: Sized { fn into_bytes_stream(self, buf_size: usize) -> IntoBytesStream<Self> where Self: Sized {
IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] } IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] }
} }
// TODO.async: Verify correctness of this implementation. // TODO.async: Verify correctness of this implementation.
@ -72,7 +72,7 @@ pub struct AsyncReadBody {
enum AsyncReadBodyState { enum AsyncReadBodyState {
Pending, Pending,
Partial(Cursor<Chunk>), Partial(Cursor<Bytes>),
Done, Done,
} }
@ -88,7 +88,7 @@ impl AsyncRead for AsyncReadBody {
match self.state { match self.state {
AsyncReadBodyState::Pending => { AsyncReadBodyState::Pending => {
match ready!(Pin::new(&mut self.inner).poll_data(cx)) { match ready!(Pin::new(&mut self.inner).poll_data(cx)) {
Some(Ok(chunk)) => self.state = AsyncReadBodyState::Partial(Cursor::new(chunk)), Some(Ok(bytes)) => self.state = AsyncReadBodyState::Partial(Cursor::new(bytes)),
Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
None => self.state = AsyncReadBodyState::Done, None => self.state = AsyncReadBodyState::Done,
} }

View File

@ -1,6 +1,6 @@
use std::sync::Mutex; use std::sync::Mutex;
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::{Rocket, Request, Response, Data}; use crate::{Rocket, Request, Response, Data};
use crate::fairing::{Fairing, Kind, Info}; use crate::fairing::{Fairing, Kind, Info};

View File

@ -47,7 +47,7 @@
//! of other `Fairings` are not jeopardized. For instance, unless it is made //! of other `Fairings` are not jeopardized. For instance, unless it is made
//! abundantly clear, a fairing should not rewrite every request. //! abundantly clear, a fairing should not rewrite every request.
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::{Rocket, Request, Response, Data}; use crate::{Rocket, Request, Response, Data};

View File

@ -1,6 +1,6 @@
//! Types and traits for request and error handlers and their return values. //! Types and traits for request and error handlers and their return values.
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::data::Data; use crate::data::Data;
use crate::request::Request; use crate::request::Request;

View File

@ -153,5 +153,10 @@ pub fn custom(config: config::Config) -> Rocket {
/// WARNING: This is unstable! Do not use this method outside of Rocket! /// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)] #[doc(hidden)]
pub fn async_test<R>(fut: impl std::future::Future<Output = R> + Send) -> R { pub fn async_test<R>(fut: impl std::future::Future<Output = R> + Send) -> R {
tokio::runtime::current_thread::Runtime::new().expect("create tokio runtime").block_on(fut) tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.expect("create tokio runtime")
.block_on(fut)
} }

View File

@ -1,6 +1,6 @@
use std::ops::Deref; use std::ops::Deref;
use tokio_io::AsyncReadExt; use tokio::io::AsyncReadExt;
use crate::outcome::Outcome::*; use crate::outcome::Outcome::*;
use crate::request::{Request, form::{FromForm, FormItems, FormDataError}}; use crate::request::{Request, form::{FromForm, FormItems, FormDataError}};

View File

@ -1,7 +1,7 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
use crate::router::Route; use crate::router::Route;
use crate::request::Request; use crate::request::Request;

View File

@ -49,4 +49,4 @@ pub use self::debug::Debug;
/// Type alias for the `Result` of a `Responder::respond` call. /// Type alias for the `Result` of a `Responder::respond` call.
pub type Result<'r> = std::result::Result<self::Response<'r>, crate::http::Status>; pub type Result<'r> = std::result::Result<self::Response<'r>, crate::http::Status>;
/// Type alias for the `Result` of a `Responder::respond` call. /// Type alias for the `Result` of a `Responder::respond` call.
pub type ResultFuture<'r> = futures_core::future::BoxFuture<'r, Result<'r>>; pub type ResultFuture<'r> = futures_util::future::BoxFuture<'r, Result<'r>>;

View File

@ -2,7 +2,7 @@ use std::fs::File;
use std::io::Cursor; use std::io::Cursor;
use futures_util::future::ready; use futures_util::future::ready;
use tokio_io::BufReader; use tokio::io::BufReader;
use crate::http::{Status, ContentType, StatusClass}; use crate::http::{Status, ContentType, StatusClass};
use crate::response::{self, Response, Body}; use crate::response::{self, Response, Body};

View File

@ -3,7 +3,7 @@ use std::borrow::Cow;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use tokio_io::{AsyncRead, AsyncReadExt}; use tokio::io::{AsyncRead, AsyncReadExt};
use futures_util::future::FutureExt; use futures_util::future::FutureExt;
use crate::response::{Responder, ResultFuture}; use crate::response::{Responder, ResultFuture};
@ -1057,14 +1057,12 @@ impl<'r> Response<'r> {
/// # Example /// # Example
/// ///
/// ```rust /// ```rust
/// use futures::io::repeat; /// use tokio::io::{repeat, AsyncReadExt};
/// use futures_tokio_compat::Compat;
/// use tokio::io::AsyncReadExt;
/// use rocket::Response; /// use rocket::Response;
/// ///
/// # rocket::async_test(async { /// # rocket::async_test(async {
/// let mut response = Response::new(); /// let mut response = Response::new();
/// response.set_streamed_body(Compat::new(repeat(97)).take(5)); /// response.set_streamed_body(repeat(97).take(5));
/// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string()));
/// # }) /// # })
/// ``` /// ```
@ -1079,14 +1077,12 @@ impl<'r> Response<'r> {
/// # Example /// # Example
/// ///
/// ```rust /// ```rust
/// use futures::io::repeat; /// use tokio::io::{repeat, AsyncReadExt};
/// use futures_tokio_compat::Compat;
/// use tokio::io::AsyncReadExt;
/// use rocket::Response; /// use rocket::Response;
/// ///
/// # rocket::async_test(async { /// # rocket::async_test(async {
/// let mut response = Response::new(); /// let mut response = Response::new();
/// response.set_chunked_body(Compat::new(repeat(97)).take(5), 10); /// response.set_chunked_body(repeat(97).take(5), 10);
/// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string()));
/// # }) /// # })
/// ``` /// ```

View File

@ -1,6 +1,6 @@
use std::fmt::{self, Debug}; use std::fmt::{self, Debug};
use tokio_io::AsyncRead; use tokio::io::AsyncRead;
use crate::request::Request; use crate::request::Request;
use crate::response::{Response, Responder, ResultFuture, DEFAULT_CHUNK_SIZE}; use crate::response::{Response, Responder, ResultFuture, DEFAULT_CHUNK_SIZE};

View File

@ -5,9 +5,9 @@ use std::io;
use std::mem; use std::mem;
use std::sync::Arc; use std::sync::Arc;
use futures_core::future::{Future, BoxFuture}; use futures_util::future::{Future, FutureExt, BoxFuture};
use futures_channel::{mpsc, oneshot}; use futures_util::stream::StreamExt;
use futures_util::{future::FutureExt, stream::StreamExt}; use tokio::sync::{mpsc, oneshot};
use yansi::Paint; use yansi::Paint;
use state::Container; use state::Container;
@ -120,16 +120,16 @@ impl Rocket {
tx: oneshot::Sender<hyper::Response<hyper::Body>>, tx: oneshot::Sender<hyper::Response<hyper::Body>>,
) -> impl Future<Output = io::Result<()>> + 'r { ) -> impl Future<Output = io::Result<()>> + 'r {
async move { async move {
let mut hyp_res = hyper::Response::builder(); let mut hyp_res = hyper::Response::builder()
hyp_res.status(response.status().code); .status(response.status().code);
for header in response.headers().iter() { for header in response.headers().iter() {
let name = header.name.as_str(); let name = header.name.as_str();
let value = header.value.as_bytes(); let value = header.value.as_bytes();
hyp_res.header(name, value); hyp_res = hyp_res.header(name, value);
} }
let send_response = move |mut hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> {
let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tx.send(response).expect("channel receiver should not be dropped"); tx.send(response).expect("channel receiver should not be dropped");
Ok(()) Ok(())
@ -137,15 +137,15 @@ impl Rocket {
match response.body() { match response.body() {
None => { None => {
hyp_res.header(header::CONTENT_LENGTH, "0"); hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0");
send_response(hyp_res, hyper::Body::empty())?; send_response(hyp_res, hyper::Body::empty())?;
} }
Some(Body::Sized(body, size)) => { Some(Body::Sized(body, size)) => {
hyp_res.header(header::CONTENT_LENGTH, size.to_string()); hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (mut sender, hyp_body) = hyper::Body::channel(); let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?; send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(4096); let mut stream = body.into_bytes_stream(4096);
while let Some(next) = stream.next().await { while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
} }
@ -156,7 +156,7 @@ impl Rocket {
let (mut sender, hyp_body) = hyper::Body::channel(); let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?; send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow")); let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
while let Some(next) = stream.next().await { while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
} }
@ -760,41 +760,39 @@ impl Rocket {
} }
}); });
// NB: executor must be passed manually here, see hyperium/hyper#1537 #[derive(Clone)]
struct TokioExecutor;
impl<Fut> hyper::Executor<Fut> for TokioExecutor where Fut: Future + Send + 'static, Fut::Output: Send {
fn execute(&self, fut: Fut) {
tokio::spawn(fut);
}
}
hyper::Server::builder(Incoming::from_listener(listener)) hyper::Server::builder(Incoming::from_listener(listener))
.executor(tokio::executor::DefaultExecutor::current()) .executor(TokioExecutor)
.serve(service) .serve(service)
.with_graceful_shutdown(async move { shutdown_receiver.next().await; }) .with_graceful_shutdown(async move { shutdown_receiver.recv().await; })
.await .await
.map_err(crate::error::Error::Run) .map_err(crate::error::Error::Run)
} }
/// Similar to `launch()`, but using a custom Tokio runtime and returning /// Returns a `Future` that completes when the server is shut down or
/// a `Future` that completes along with the server. The runtime has no /// errors. If the `ctrl_c_shutdown` feature is enabled, `Ctrl-C` will be
/// restrictions other than being Tokio-based, and can have other tasks /// handled as a shutdown signal.
/// running on it.
/// ///
/// # Example /// # Example
/// ///
/// ```rust /// ```rust
/// use futures::future::FutureExt; /// #[tokio::main]
/// /// async fn main() {
/// // This gives us the default behavior. Alternatively, we could use a
/// // `tokio::runtime::Builder` to configure with greater detail.
/// let runtime = tokio::runtime::Runtime::new().expect("error creating runtime");
///
/// # if false { /// # if false {
/// let server_done = rocket::ignite().spawn_on(&runtime); /// let result = rocket::ignite().serve().await;
/// runtime.block_on(async move {
/// let result = server_done.await;
/// assert!(result.is_ok()); /// assert!(result.is_ok());
/// });
/// # } /// # }
/// }
/// ``` /// ```
pub fn spawn_on( pub async fn serve(self) -> Result<(), crate::error::Error> {
self,
runtime: &tokio::runtime::Runtime,
) -> impl Future<Output = Result<(), crate::error::Error>> {
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use crate::error::Error::Launch; use crate::error::Error::Launch;
@ -802,7 +800,7 @@ impl Rocket {
let full_addr = format!("{}:{}", self.config.address, self.config.port); let full_addr = format!("{}:{}", self.config.address, self.config.port);
let addrs = match full_addr.to_socket_addrs() { let addrs = match full_addr.to_socket_addrs() {
Ok(a) => a.collect::<Vec<_>>(), Ok(a) => a.collect::<Vec<_>>(),
Err(e) => return futures_util::future::ready(Err(Launch(From::from(e)))).boxed(), Err(e) => return Err(Launch(From::from(e))),
}; };
let addr = addrs[0]; let addr = addrs[0];
@ -812,26 +810,26 @@ impl Rocket {
(cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver) (cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver)
) = ( ) = (
self.get_shutdown_handle(), self.get_shutdown_handle(),
oneshot::channel() oneshot::channel(),
); );
let server = async move { let server = {
macro_rules! listen_on { macro_rules! listen_on {
($expr:expr) => {{ ($expr:expr) => {{
let listener = match $expr { let listener = match $expr {
Ok(ok) => ok, Ok(ok) => ok,
Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))), Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))),
}; };
self.listen_on(listener).await self.listen_on(listener)
}}; }};
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
{ {
if let Some(tls) = self.config.tls.clone() { if let Some(tls) = self.config.tls.clone() {
listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await) listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await).boxed()
} else { } else {
listen_on!(crate::http::private::bind_tcp(addr).await) listen_on!(crate::http::private::bind_tcp(addr).await).boxed()
} }
} }
#[cfg(not(feature = "tls"))] #[cfg(not(feature = "tls"))]
@ -846,31 +844,30 @@ impl Rocket {
}); });
#[cfg(feature = "ctrl_c_shutdown")] #[cfg(feature = "ctrl_c_shutdown")]
match tokio::net::signal::ctrl_c() { {
Ok(mut ctrl_c) => { tokio::spawn(async move {
runtime.spawn(async move { use futures_util::future::{select, Either};
// Stop listening for `ctrl_c` if the server shuts down
// a different way to avoid waiting forever.
futures_util::future::select(
ctrl_c.next(),
cancel_ctrl_c_listener_receiver,
).await;
// Request the server shutdown. let either = select(
shutdown_handle.shutdown(); tokio::signal::ctrl_c().boxed(),
}); cancel_ctrl_c_listener_receiver,
}, ).await;
Err(err) => {
// Signal handling isn't strictly necessary, so we can skip it match either {
// if necessary. It's a good idea to let the user know we're Either::Left((Ok(()), _)) | Either::Right((_, _)) => shutdown_handle.shutdown(),
// doing so in case they are expecting certain behavior. Either::Left((Err(err), _)) => {
let message = "Not listening for shutdown keybinding."; // Signal handling isn't strictly necessary, so we can skip it
warn!("{}", Paint::yellow(message)); // if necessary. It's a good idea to let the user know we're
info_!("Error: {}", err); // doing so in case they are expecting certain behavior.
}, let message = "Not listening for shutdown keybinding.";
warn!("{}", Paint::yellow(message));
info_!("Error: {}", err);
}
}
});
} }
server.boxed() server.await
} }
/// Starts the application server and begins listening for and dispatching /// Starts the application server and begins listening for and dispatching
@ -893,14 +890,15 @@ impl Rocket {
/// # } /// # }
/// ``` /// ```
pub fn launch(self) -> Result<(), crate::error::Error> { pub fn launch(self) -> Result<(), crate::error::Error> {
// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime // Initialize the tokio runtime
let runtime = tokio::runtime::Builder::new() let mut runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize) .threaded_scheduler()
.num_threads(self.config.workers as usize)
.enable_all()
.build() .build()
.expect("Cannot build runtime!"); .expect("Cannot build runtime!");
runtime.block_on(self.spawn_on(&runtime)) runtime.block_on(async move { self.serve().await })
} }
/// Returns a [`ShutdownHandle`], which can be used to gracefully terminate /// Returns a [`ShutdownHandle`], which can be used to gracefully terminate

View File

@ -3,7 +3,7 @@ mod route;
use std::collections::hash_map::HashMap; use std::collections::hash_map::HashMap;
use futures_core::future::BoxFuture; use futures_util::future::BoxFuture;
pub use self::route::Route; pub use self::route::Route;

View File

@ -1,5 +1,5 @@
use crate::request::{FromRequest, Outcome, Request}; use crate::request::{FromRequest, Outcome, Request};
use futures_channel::mpsc; use tokio::sync::mpsc;
/// # Example /// # Example
/// ///
@ -34,6 +34,7 @@ impl ShutdownHandle {
// Intentionally ignore any error, as the only scenarios this can happen // Intentionally ignore any error, as the only scenarios this can happen
// is sending too many shutdown requests or we're already shut down. // is sending too many shutdown requests or we're already shut down.
let _ = self.0.try_send(()); let _ = self.0.try_send(());
info!("Server shutdown requested, waiting for all pending requests to finish.");
} }
} }

View File

@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> {
mod head_handling_tests { mod head_handling_tests {
use super::*; use super::*;
use tokio_io::{AsyncRead, AsyncReadExt}; use tokio::io::{AsyncRead, AsyncReadExt};
use rocket::Route; use rocket::Route;
use rocket::local::Client; use rocket::local::Client;

View File

@ -6,7 +6,7 @@ edition = "2018"
publish = false publish = false
[dependencies] [dependencies]
tokio = "0.2.0-alpha.6" tokio = { version = "0.2.0", features = ["io-util"] }
rocket = { path = "../../core/lib" } rocket = { path = "../../core/lib" }
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"

View File

@ -7,4 +7,4 @@ publish = false
[dependencies] [dependencies]
rocket = { path = "../../core/lib" } rocket = { path = "../../core/lib" }
tokio = "=0.2.0-alpha.6" tokio = { version = "0.2.0", features = ["io-util"] }

View File

@ -7,6 +7,4 @@ publish = false
[dependencies] [dependencies]
rocket = { path = "../../core/lib" } rocket = { path = "../../core/lib" }
tokio = "0.2.0-alpha.6" tokio = { version = "0.2.0", features = ["fs", "io-util"] }
futures-preview = "0.3.0-alpha.19"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }

View File

@ -6,17 +6,15 @@
use rocket::response::{content, Stream}; use rocket::response::{content, Stream};
use futures::io::repeat;
use futures_tokio_compat::Compat;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::{repeat, AsyncRead, AsyncReadExt};
// Generate this file using: head -c BYTES /dev/random > big_file.dat // Generate this file using: head -c BYTES /dev/random > big_file.dat
const FILENAME: &str = "big_file.dat"; const FILENAME: &str = "big_file.dat";
#[get("/")] #[get("/")]
fn root() -> content::Plain<Stream<impl AsyncRead>> { fn root() -> content::Plain<Stream<impl AsyncRead>> {
content::Plain(Stream::from(Compat::new(repeat('a' as u8)).take(25000))) content::Plain(Stream::from(repeat('a' as u8).take(25000)))
} }
#[get("/big_file")] #[get("/big_file")]