diff --git a/contrib/codegen/src/database.rs b/contrib/codegen/src/database.rs index c1a599b2..3843837e 100644 --- a/contrib/codegen/src/database.rs +++ b/contrib/codegen/src/database.rs @@ -73,7 +73,7 @@ pub fn database_attr(attr: TokenStream, input: TokenStream) -> Result ::rocket_contrib::databases); let Poolable = quote_spanned!(span => #databases::Poolable); let r2d2 = quote_spanned!(span => #databases::r2d2); - let tokio_executor = quote_spanned!(span => #databases::tokio_executor); + let spawn_blocking = quote_spanned!(span => #databases::spawn_blocking); let request = quote!(::rocket::request); let generated_types = quote_spanned! { span => @@ -149,14 +149,16 @@ pub fn database_attr(attr: TokenStream, input: TokenStream) -> Result>()).0.clone(); - #tokio_executor::blocking::run(move || { + #spawn_blocking(move || { match pool.get() { Ok(conn) => Outcome::Success(#guard_type(conn)), Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())), } - }).await + }).await.expect("failed to spawn a blocking task to get a pooled connection") }) } } + + // TODO.async: What about spawn_blocking on drop? }.into()) } diff --git a/contrib/codegen/tests/ui-fail/database-types.rs b/contrib/codegen/tests/ui-fail/database-types.rs index 34bd3e7a..1b676733 100644 --- a/contrib/codegen/tests/ui-fail/database-types.rs +++ b/contrib/codegen/tests/ui-fail/database-types.rs @@ -9,6 +9,7 @@ struct Unknown; //~^^^ ERROR no method named `get` struct A(Unknown); //~^ ERROR Unknown: rocket_contrib::databases::Poolable +//~^^ ERROR Unknown: rocket_contrib::databases::Poolable #[database("foo")] //~^ ERROR Vec: rocket_contrib::databases::Poolable @@ -16,5 +17,6 @@ struct A(Unknown); //~^^^ ERROR no method named `get` struct B(Vec); //~^ ERROR Vec: rocket_contrib::databases::Poolable +//~^^ ERROR Vec: rocket_contrib::databases::Poolable fn main() { } diff --git a/contrib/lib/Cargo.toml b/contrib/lib/Cargo.toml index 4c156623..5ee47a0c 100644 --- a/contrib/lib/Cargo.toml +++ b/contrib/lib/Cargo.toml @@ -14,12 +14,12 @@ edition = "2018" [features] # Internal use only. templates = ["serde", "serde_json", "glob", "notify"] -databases = ["r2d2", "tokio-executor", "rocket_contrib_codegen/database_attribute"] +databases = ["r2d2", "tokio/blocking", "tokio/rt-threaded", "rocket_contrib_codegen/database_attribute"] # User-facing features. default = ["json", "serve"] -json = ["serde", "serde_json"] -msgpack = ["serde", "rmp-serde"] +json = ["serde", "serde_json", "tokio/io-util"] +msgpack = ["serde", "rmp-serde", "tokio/io-util"] tera_templates = ["tera", "templates"] handlebars_templates = ["handlebars", "templates"] helmet = ["time"] @@ -42,8 +42,7 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"] [dependencies] # Global dependencies. -futures-util-preview = "0.3.0-alpha.19" -tokio-io = "=0.2.0-alpha.6" +tokio = { version = "0.2.0", optional = true } rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } log = "0.4" @@ -66,7 +65,6 @@ uuid = { version = ">=0.7.0, <0.9.0", optional = true } diesel = { version = "1.0", default-features = false, optional = true } postgres = { version = "0.17", optional = true } r2d2 = { version = "0.8", optional = true } -tokio-executor = { version = "0.2.0-alpha.6", optional = true } r2d2_postgres = { version = "0.16", optional = true } mysql = { version = "17.0", optional = true } r2d2_mysql = { version = "17.0", optional = true } @@ -89,7 +87,7 @@ brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } [dev-dependencies] -tokio-timer = "=0.3.0-alpha.6" +tokio = { version = "0.2.0", features = ["time"] } [package.metadata.docs.rs] all-features = true diff --git a/contrib/lib/src/databases.rs b/contrib/lib/src/databases.rs index 2e0f9cdc..4f12b130 100644 --- a/contrib/lib/src/databases.rs +++ b/contrib/lib/src/databases.rs @@ -398,7 +398,7 @@ pub extern crate r2d2; #[doc(hidden)] -pub extern crate tokio_executor; +pub use tokio::task::spawn_blocking; #[cfg(any(feature = "diesel_sqlite_pool", feature = "diesel_postgres_pool", diff --git a/contrib/lib/src/json.rs b/contrib/lib/src/json.rs index 0962c449..55a50b5e 100644 --- a/contrib/lib/src/json.rs +++ b/contrib/lib/src/json.rs @@ -18,7 +18,7 @@ use std::ops::{Deref, DerefMut}; use std::io; use std::iter::FromIterator; -use tokio_io::AsyncReadExt; +use tokio::io::AsyncReadExt; use rocket::request::Request; use rocket::outcome::Outcome::*; diff --git a/contrib/lib/src/msgpack.rs b/contrib/lib/src/msgpack.rs index 3ed02629..855e1bc6 100644 --- a/contrib/lib/src/msgpack.rs +++ b/contrib/lib/src/msgpack.rs @@ -16,7 +16,7 @@ use std::ops::{Deref, DerefMut}; -use tokio_io::AsyncReadExt; +use tokio::io::AsyncReadExt; use rocket::request::Request; use rocket::outcome::Outcome::*; diff --git a/contrib/lib/tests/templates.rs b/contrib/lib/tests/templates.rs index 7250ab9c..2f9b029e 100644 --- a/contrib/lib/tests/templates.rs +++ b/contrib/lib/tests/templates.rs @@ -169,7 +169,7 @@ mod templates_tests { } // otherwise, retry a few times, waiting 250ms in between - tokio_timer::delay_for(Duration::from_millis(250)).await; + tokio::time::delay_for(Duration::from_millis(250)).await; } panic!("failed to reload modified template in 1.5s"); diff --git a/core/codegen/Cargo.toml b/core/codegen/Cargo.toml index 07237e73..67953ae9 100644 --- a/core/codegen/Cargo.toml +++ b/core/codegen/Cargo.toml @@ -28,6 +28,5 @@ version_check = "0.9.1" [dev-dependencies] rocket = { version = "0.5.0-dev", path = "../lib" } -futures-preview = "0.3.0-alpha.19" -tokio-io = "0.2.0-alpha.6" +tokio = { version = "0.2.0", features = ["io-util"] } compiletest_rs = { version = "0.3", features = ["stable"] } diff --git a/core/codegen/tests/route-data.rs b/core/codegen/tests/route-data.rs index 3a4f38b0..f5c92b7e 100644 --- a/core/codegen/tests/route-data.rs +++ b/core/codegen/tests/route-data.rs @@ -22,7 +22,7 @@ impl FromDataSimple for Simple { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { Box::pin(async { - use tokio_io::AsyncReadExt; + use tokio::io::AsyncReadExt; let mut string = String::new(); let mut stream = data.open().take(64); diff --git a/core/codegen/tests/route.rs b/core/codegen/tests/route.rs index d72c1fdc..eeb56494 100644 --- a/core/codegen/tests/route.rs +++ b/core/codegen/tests/route.rs @@ -30,7 +30,7 @@ impl FromDataSimple for Simple { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { Box::pin(async move { - use tokio_io::AsyncReadExt; + use tokio::io::AsyncReadExt; let mut string = String::new(); let mut stream = data.open().take(64); diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index fb166fd9..cf544858 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -22,21 +22,17 @@ private-cookies = ["cookie/private", "cookie/key-expansion"] [dependencies] smallvec = "1.0" percent-encoding = "1" -# TODO.async: stop using stream-unstable -hyper = { version = "=0.13.0-alpha.4", default-features = false, features = ["unstable-stream"] } -http = "0.1.17" +hyper = { version = "0.13.0", default-features = false } +http = "0.2" mime = "0.3.13" time = "0.2.11" indexmap = "1.0" state = "0.4" -tokio-rustls = { version = "0.12.0-alpha.4", optional = true } -tokio-io = "=0.2.0-alpha.6" -tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp"] } -tokio-timer = "=0.3.0-alpha.6" +tokio-rustls = { version = "0.12.0", optional = true } +tokio = { version = "0.2.0", features = ["sync", "tcp", "time"] } cookie = { version = "0.14.0", features = ["percent-encode"] } pear = "0.1" unicode-xid = "0.2" -futures-core-preview = "0.3.0-alpha.19" log = "0.4" [dev-dependencies] diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index 36ba9b01..13725130 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -5,11 +5,11 @@ //! while necessary. #[doc(hidden)] pub use hyper::{Body, Request, Response, Server}; -#[doc(hidden)] pub use hyper::body::{Payload, Sender as BodySender}; +#[doc(hidden)] pub use hyper::body::{Bytes, HttpBody, Sender as BodySender}; #[doc(hidden)] pub use hyper::error::Error; -#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, MakeService, Service}; +#[doc(hidden)] pub use hyper::rt::Executor; +#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, Service}; -#[doc(hidden)] pub use hyper::Chunk; #[doc(hidden)] pub use http::header::HeaderMap; #[doc(hidden)] pub use http::header::HeaderName as HeaderName; #[doc(hidden)] pub use http::header::HeaderValue as HeaderValue; diff --git a/core/http/src/listener.rs b/core/http/src/listener.rs index 755e1267..637ec2e2 100644 --- a/core/http/src/listener.rs +++ b/core/http/src/listener.rs @@ -4,15 +4,15 @@ use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; use hyper::server::accept::Accept; use log::{debug, error}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; -use tokio_net::tcp::{TcpListener, TcpStream}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::time::Delay; +use tokio::net::{TcpListener, TcpStream}; // TODO.async: 'Listener' and 'Connection' provide common enough functionality // that they could be introduced in upstream libraries. @@ -100,8 +100,7 @@ impl Incoming { error!("accept error: {}", e); // Sleep for the specified duration - let delay = Instant::now() + duration; - let mut error_delay = tokio_timer::delay(delay); + let mut error_delay = tokio::time::delay_for(duration); match Pin::new(&mut error_delay).poll(cx) { Poll::Ready(()) => { @@ -127,8 +126,7 @@ impl Accept for Incoming { type Error = io::Error; fn poll_accept(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let result = futures_core::ready!(self.poll_next(cx)); - Poll::Ready(Some(result)) + self.poll_next(cx).map(Some) } } @@ -170,10 +168,7 @@ impl Listener for TcpListener { } fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { - // NB: This is only okay because TcpListener::accept() is stateless. - let mut accept = self.accept(); - let accept = unsafe { Pin::new_unchecked(&mut accept) }; - accept.poll(cx).map_ok(|(stream, _addr)| stream) + self.poll_accept(cx).map_ok(|(stream, _addr)| stream) } } diff --git a/core/http/src/tls.rs b/core/http/src/tls.rs index cf8b61b9..1d04c9f9 100644 --- a/core/http/src/tls.rs +++ b/core/http/src/tls.rs @@ -7,7 +7,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio_net::tcp::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::{TlsAcceptor, server::TlsStream}; use tokio_rustls::rustls; @@ -91,7 +91,7 @@ impl Listener for TlsListener { TlsListenerState::Listening => { match self.listener.poll_accept(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(stream)) => { + Poll::Ready(Ok((stream, _addr))) => { self.state = TlsListenerState::Accepting(Box::pin(self.acceptor.accept(stream))); } Poll::Ready(Err(e)) => { diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 9dd9b107..bb988ea0 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -27,11 +27,8 @@ ctrl_c_shutdown = ["tokio/signal"] [dependencies] rocket_codegen = { version = "0.5.0-dev", path = "../codegen" } rocket_http = { version = "0.5.0-dev", path = "../http" } -futures-core-preview = "0.3.0-alpha.19" -futures-channel-preview = "0.3.0-alpha.19" -futures-util-preview = "0.3.0-alpha.19" -tokio = "=0.2.0-alpha.6" -tokio-io = "=0.2.0-alpha.6" +futures-util = "0.3.0" +tokio = { version = "0.2.0", features = ["fs", "io-std", "io-util", "rt-threaded", "sync"] } yansi = "0.5" log = { version = "0.4", features = ["std"] } toml = "0.4.7" @@ -48,7 +45,6 @@ yansi = "0.5" version_check = "0.9.1" [dev-dependencies] -futures-preview = "0.3.0-alpha.19" -futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" } # TODO: Find a way to not depend on this. lazy_static = "1.0" +tokio = { version = "0.2.0", features = ["macros"] } diff --git a/core/lib/src/catcher.rs b/core/lib/src/catcher.rs index e37a6ce0..5c011dbd 100644 --- a/core/lib/src/catcher.rs +++ b/core/lib/src/catcher.rs @@ -154,7 +154,7 @@ macro_rules! default_catchers { let mut map = HashMap::new(); $( - fn $fn_name<'r>(req: &'r Request<'_>) -> futures_core::future::BoxFuture<'r, response::Result<'r>> { + fn $fn_name<'r>(req: &'r Request<'_>) -> futures_util::future::BoxFuture<'r, response::Result<'r>> { status::Custom(Status::from_code($code).unwrap(), content::Html(error_page_template!($code, $name, $description)) ).respond_to(req) diff --git a/core/lib/src/codegen.rs b/core/lib/src/codegen.rs index c517c432..76142669 100644 --- a/core/lib/src/codegen.rs +++ b/core/lib/src/codegen.rs @@ -1,4 +1,4 @@ -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use crate::{Request, Data}; use crate::handler::{Outcome, ErrorHandler}; diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index b996510c..eeeebbaf 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::io; use std::path::Path; -use tokio_io::{AsyncRead, AsyncWrite, AsyncReadExt as _}; +use tokio::io::{AsyncRead, AsyncWrite}; use super::data_stream::DataStream; @@ -128,7 +128,7 @@ impl Data { /// A helper method to write the body of the request to any `Write` type. /// - /// This method is identical to `io::copy(&mut data.open(), writer)`. + /// This method is identical to `tokio::io::copy(&mut data.open(), &mut writer)`. /// /// # Example /// @@ -146,7 +146,7 @@ impl Data { pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future> + 'w { Box::pin(async move { let mut stream = self.open(); - stream.copy(&mut writer).await + tokio::io::copy(&mut stream, &mut writer).await }) } diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index dcc370b6..9e17da45 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use tokio_io::AsyncRead; +use tokio::io::AsyncRead; // TODO.async: Consider storing the real type here instead of a Box to avoid // the dynamic dispatch diff --git a/core/lib/src/data/from_data.rs b/core/lib/src/data/from_data.rs index 9ebc93f4..cc179a0f 100644 --- a/core/lib/src/data/from_data.rs +++ b/core/lib/src/data/from_data.rs @@ -1,8 +1,8 @@ use std::borrow::Borrow; -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use futures_util::future::{ready, FutureExt}; -use tokio_io::AsyncReadExt; +use tokio::io::AsyncReadExt; use crate::outcome::{self, IntoOutcome}; use crate::outcome::Outcome::*; diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 8c8e2da6..bf5a6264 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -2,23 +2,23 @@ use std::io::{self, Cursor}; use std::pin::Pin; use std::task::{Poll, Context}; -use futures_core::{ready, future::BoxFuture, stream::Stream}; -use tokio_io::{AsyncRead, AsyncReadExt as _}; +use futures_util::{ready, future::BoxFuture, stream::Stream}; +use tokio::io::{AsyncRead, AsyncReadExt as _}; use crate::http::hyper; -use hyper::{Chunk, Payload}; +use hyper::{Bytes, HttpBody}; -pub struct IntoChunkStream { +pub struct IntoBytesStream { inner: R, buf_size: usize, buffer: Vec, } // TODO.async: Verify correctness of this implementation. -impl Stream for IntoChunkStream +impl Stream for IntoBytesStream where R: AsyncRead + Unpin { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ assert!(self.buffer.len() == self.buf_size); @@ -32,15 +32,15 @@ impl Stream for IntoChunkStream Poll::Ready(Ok(n)) => { let mut next = std::mem::replace(buffer, vec![0; buf_size]); next.truncate(n); - Poll::Ready(Some(Ok(Chunk::from(next)))) + Poll::Ready(Some(Ok(Bytes::from(next)))) } } } } pub trait AsyncReadExt: AsyncRead { - fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream where Self: Sized { - IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] } + fn into_bytes_stream(self, buf_size: usize) -> IntoBytesStream where Self: Sized { + IntoBytesStream { inner: self, buf_size, buffer: vec![0; buf_size] } } // TODO.async: Verify correctness of this implementation. @@ -72,7 +72,7 @@ pub struct AsyncReadBody { enum AsyncReadBodyState { Pending, - Partial(Cursor), + Partial(Cursor), Done, } @@ -88,7 +88,7 @@ impl AsyncRead for AsyncReadBody { match self.state { AsyncReadBodyState::Pending => { match ready!(Pin::new(&mut self.inner).poll_data(cx)) { - Some(Ok(chunk)) => self.state = AsyncReadBodyState::Partial(Cursor::new(chunk)), + Some(Ok(bytes)) => self.state = AsyncReadBodyState::Partial(Cursor::new(bytes)), Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), None => self.state = AsyncReadBodyState::Done, } diff --git a/core/lib/src/fairing/ad_hoc.rs b/core/lib/src/fairing/ad_hoc.rs index 8509c513..493dad81 100644 --- a/core/lib/src/fairing/ad_hoc.rs +++ b/core/lib/src/fairing/ad_hoc.rs @@ -1,6 +1,6 @@ use std::sync::Mutex; -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use crate::{Rocket, Request, Response, Data}; use crate::fairing::{Fairing, Kind, Info}; diff --git a/core/lib/src/fairing/mod.rs b/core/lib/src/fairing/mod.rs index f8fbc390..3f958f01 100644 --- a/core/lib/src/fairing/mod.rs +++ b/core/lib/src/fairing/mod.rs @@ -47,7 +47,7 @@ //! of other `Fairings` are not jeopardized. For instance, unless it is made //! abundantly clear, a fairing should not rewrite every request. -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use crate::{Rocket, Request, Response, Data}; diff --git a/core/lib/src/handler.rs b/core/lib/src/handler.rs index 7ce867b5..11ef07d7 100644 --- a/core/lib/src/handler.rs +++ b/core/lib/src/handler.rs @@ -1,6 +1,6 @@ //! Types and traits for request and error handlers and their return values. -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use crate::data::Data; use crate::request::Request; diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index 03573e44..bdddab5f 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -153,5 +153,10 @@ pub fn custom(config: config::Config) -> Rocket { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[doc(hidden)] pub fn async_test(fut: impl std::future::Future + Send) -> R { - tokio::runtime::current_thread::Runtime::new().expect("create tokio runtime").block_on(fut) + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .expect("create tokio runtime") + .block_on(fut) } diff --git a/core/lib/src/request/form/form.rs b/core/lib/src/request/form/form.rs index 56ca9d0d..241cdcf2 100644 --- a/core/lib/src/request/form/form.rs +++ b/core/lib/src/request/form/form.rs @@ -1,6 +1,6 @@ use std::ops::Deref; -use tokio_io::AsyncReadExt; +use tokio::io::AsyncReadExt; use crate::outcome::Outcome::*; use crate::request::{Request, form::{FromForm, FormItems, FormDataError}}; diff --git a/core/lib/src/request/from_request.rs b/core/lib/src/request/from_request.rs index 3bd8b204..898c3701 100644 --- a/core/lib/src/request/from_request.rs +++ b/core/lib/src/request/from_request.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::net::SocketAddr; -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; use crate::router::Route; use crate::request::Request; diff --git a/core/lib/src/response/mod.rs b/core/lib/src/response/mod.rs index 3403ed2b..8d49b5e3 100644 --- a/core/lib/src/response/mod.rs +++ b/core/lib/src/response/mod.rs @@ -49,4 +49,4 @@ pub use self::debug::Debug; /// Type alias for the `Result` of a `Responder::respond` call. pub type Result<'r> = std::result::Result, crate::http::Status>; /// Type alias for the `Result` of a `Responder::respond` call. -pub type ResultFuture<'r> = futures_core::future::BoxFuture<'r, Result<'r>>; +pub type ResultFuture<'r> = futures_util::future::BoxFuture<'r, Result<'r>>; diff --git a/core/lib/src/response/responder.rs b/core/lib/src/response/responder.rs index 26da4e5d..62c95be1 100644 --- a/core/lib/src/response/responder.rs +++ b/core/lib/src/response/responder.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::io::Cursor; use futures_util::future::ready; -use tokio_io::BufReader; +use tokio::io::BufReader; use crate::http::{Status, ContentType, StatusClass}; use crate::response::{self, Response, Body}; diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index 48c5cedc..9b2c3495 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use std::future::Future; use std::pin::Pin; -use tokio_io::{AsyncRead, AsyncReadExt}; +use tokio::io::{AsyncRead, AsyncReadExt}; use futures_util::future::FutureExt; use crate::response::{Responder, ResultFuture}; @@ -1057,14 +1057,12 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use futures::io::repeat; - /// use futures_tokio_compat::Compat; - /// use tokio::io::AsyncReadExt; + /// use tokio::io::{repeat, AsyncReadExt}; /// use rocket::Response; /// /// # rocket::async_test(async { /// let mut response = Response::new(); - /// response.set_streamed_body(Compat::new(repeat(97)).take(5)); + /// response.set_streamed_body(repeat(97).take(5)); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// # }) /// ``` @@ -1079,14 +1077,12 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use futures::io::repeat; - /// use futures_tokio_compat::Compat; - /// use tokio::io::AsyncReadExt; + /// use tokio::io::{repeat, AsyncReadExt}; /// use rocket::Response; /// /// # rocket::async_test(async { /// let mut response = Response::new(); - /// response.set_chunked_body(Compat::new(repeat(97)).take(5), 10); + /// response.set_chunked_body(repeat(97).take(5), 10); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// # }) /// ``` diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index eadb6754..1661a384 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Debug}; -use tokio_io::AsyncRead; +use tokio::io::AsyncRead; use crate::request::Request; use crate::response::{Response, Responder, ResultFuture, DEFAULT_CHUNK_SIZE}; diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 49dfc60c..581384b1 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -5,9 +5,9 @@ use std::io; use std::mem; use std::sync::Arc; -use futures_core::future::{Future, BoxFuture}; -use futures_channel::{mpsc, oneshot}; -use futures_util::{future::FutureExt, stream::StreamExt}; +use futures_util::future::{Future, FutureExt, BoxFuture}; +use futures_util::stream::StreamExt; +use tokio::sync::{mpsc, oneshot}; use yansi::Paint; use state::Container; @@ -120,16 +120,16 @@ impl Rocket { tx: oneshot::Sender>, ) -> impl Future> + 'r { async move { - let mut hyp_res = hyper::Response::builder(); - hyp_res.status(response.status().code); + let mut hyp_res = hyper::Response::builder() + .status(response.status().code); for header in response.headers().iter() { let name = header.name.as_str(); let value = header.value.as_bytes(); - hyp_res.header(name, value); + hyp_res = hyp_res.header(name, value); } - let send_response = move |mut hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { + let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; tx.send(response).expect("channel receiver should not be dropped"); Ok(()) @@ -137,15 +137,15 @@ impl Rocket { match response.body() { None => { - hyp_res.header(header::CONTENT_LENGTH, "0"); + hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0"); send_response(hyp_res, hyper::Body::empty())?; } Some(Body::Sized(body, size)) => { - hyp_res.header(header::CONTENT_LENGTH, size.to_string()); + hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string()); let (mut sender, hyp_body) = hyper::Body::channel(); send_response(hyp_res, hyp_body)?; - let mut stream = body.into_chunk_stream(4096); + let mut stream = body.into_bytes_stream(4096); while let Some(next) = stream.next().await { sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; } @@ -156,7 +156,7 @@ impl Rocket { let (mut sender, hyp_body) = hyper::Body::channel(); send_response(hyp_res, hyp_body)?; - let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow")); + let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow")); while let Some(next) = stream.next().await { sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; } @@ -760,41 +760,39 @@ impl Rocket { } }); - // NB: executor must be passed manually here, see hyperium/hyper#1537 + #[derive(Clone)] + struct TokioExecutor; + + impl hyper::Executor for TokioExecutor where Fut: Future + Send + 'static, Fut::Output: Send { + fn execute(&self, fut: Fut) { + tokio::spawn(fut); + } + } + hyper::Server::builder(Incoming::from_listener(listener)) - .executor(tokio::executor::DefaultExecutor::current()) + .executor(TokioExecutor) .serve(service) - .with_graceful_shutdown(async move { shutdown_receiver.next().await; }) + .with_graceful_shutdown(async move { shutdown_receiver.recv().await; }) .await .map_err(crate::error::Error::Run) } - /// Similar to `launch()`, but using a custom Tokio runtime and returning - /// a `Future` that completes along with the server. The runtime has no - /// restrictions other than being Tokio-based, and can have other tasks - /// running on it. + /// Returns a `Future` that completes when the server is shut down or + /// errors. If the `ctrl_c_shutdown` feature is enabled, `Ctrl-C` will be + /// handled as a shutdown signal. /// /// # Example /// /// ```rust - /// use futures::future::FutureExt; - /// - /// // This gives us the default behavior. Alternatively, we could use a - /// // `tokio::runtime::Builder` to configure with greater detail. - /// let runtime = tokio::runtime::Runtime::new().expect("error creating runtime"); - /// + /// #[tokio::main] + /// async fn main() { /// # if false { - /// let server_done = rocket::ignite().spawn_on(&runtime); - /// runtime.block_on(async move { - /// let result = server_done.await; + /// let result = rocket::ignite().serve().await; /// assert!(result.is_ok()); - /// }); /// # } + /// } /// ``` - pub fn spawn_on( - self, - runtime: &tokio::runtime::Runtime, - ) -> impl Future> { + pub async fn serve(self) -> Result<(), crate::error::Error> { use std::net::ToSocketAddrs; use crate::error::Error::Launch; @@ -802,7 +800,7 @@ impl Rocket { let full_addr = format!("{}:{}", self.config.address, self.config.port); let addrs = match full_addr.to_socket_addrs() { Ok(a) => a.collect::>(), - Err(e) => return futures_util::future::ready(Err(Launch(From::from(e)))).boxed(), + Err(e) => return Err(Launch(From::from(e))), }; let addr = addrs[0]; @@ -812,26 +810,26 @@ impl Rocket { (cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver) ) = ( self.get_shutdown_handle(), - oneshot::channel() + oneshot::channel(), ); - let server = async move { + let server = { macro_rules! listen_on { ($expr:expr) => {{ let listener = match $expr { Ok(ok) => ok, Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))), }; - self.listen_on(listener).await + self.listen_on(listener) }}; } #[cfg(feature = "tls")] { if let Some(tls) = self.config.tls.clone() { - listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await) + listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await).boxed() } else { - listen_on!(crate::http::private::bind_tcp(addr).await) + listen_on!(crate::http::private::bind_tcp(addr).await).boxed() } } #[cfg(not(feature = "tls"))] @@ -846,31 +844,30 @@ impl Rocket { }); #[cfg(feature = "ctrl_c_shutdown")] - match tokio::net::signal::ctrl_c() { - Ok(mut ctrl_c) => { - runtime.spawn(async move { - // Stop listening for `ctrl_c` if the server shuts down - // a different way to avoid waiting forever. - futures_util::future::select( - ctrl_c.next(), - cancel_ctrl_c_listener_receiver, - ).await; + { + tokio::spawn(async move { + use futures_util::future::{select, Either}; - // Request the server shutdown. - shutdown_handle.shutdown(); - }); - }, - Err(err) => { - // Signal handling isn't strictly necessary, so we can skip it - // if necessary. It's a good idea to let the user know we're - // doing so in case they are expecting certain behavior. - let message = "Not listening for shutdown keybinding."; - warn!("{}", Paint::yellow(message)); - info_!("Error: {}", err); - }, + let either = select( + tokio::signal::ctrl_c().boxed(), + cancel_ctrl_c_listener_receiver, + ).await; + + match either { + Either::Left((Ok(()), _)) | Either::Right((_, _)) => shutdown_handle.shutdown(), + Either::Left((Err(err), _)) => { + // Signal handling isn't strictly necessary, so we can skip it + // if necessary. It's a good idea to let the user know we're + // doing so in case they are expecting certain behavior. + let message = "Not listening for shutdown keybinding."; + warn!("{}", Paint::yellow(message)); + info_!("Error: {}", err); + } + } + }); } - server.boxed() + server.await } /// Starts the application server and begins listening for and dispatching @@ -893,14 +890,15 @@ impl Rocket { /// # } /// ``` pub fn launch(self) -> Result<(), crate::error::Error> { - // TODO.async What meaning should config.workers have now? // Initialize the tokio runtime - let runtime = tokio::runtime::Builder::new() - .core_threads(self.config.workers as usize) + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .num_threads(self.config.workers as usize) + .enable_all() .build() .expect("Cannot build runtime!"); - runtime.block_on(self.spawn_on(&runtime)) + runtime.block_on(async move { self.serve().await }) } /// Returns a [`ShutdownHandle`], which can be used to gracefully terminate diff --git a/core/lib/src/router/mod.rs b/core/lib/src/router/mod.rs index 2167c513..0ab30640 100644 --- a/core/lib/src/router/mod.rs +++ b/core/lib/src/router/mod.rs @@ -3,7 +3,7 @@ mod route; use std::collections::hash_map::HashMap; -use futures_core::future::BoxFuture; +use futures_util::future::BoxFuture; pub use self::route::Route; diff --git a/core/lib/src/shutdown.rs b/core/lib/src/shutdown.rs index 7eaab993..d7b7360a 100644 --- a/core/lib/src/shutdown.rs +++ b/core/lib/src/shutdown.rs @@ -1,5 +1,5 @@ use crate::request::{FromRequest, Outcome, Request}; -use futures_channel::mpsc; +use tokio::sync::mpsc; /// # Example /// @@ -34,6 +34,7 @@ impl ShutdownHandle { // Intentionally ignore any error, as the only scenarios this can happen // is sending too many shutdown requests or we're already shut down. let _ = self.0.try_send(()); + info!("Server shutdown requested, waiting for all pending requests to finish."); } } diff --git a/core/lib/tests/head_handling.rs b/core/lib/tests/head_handling.rs index d000cc06..026b6b67 100644 --- a/core/lib/tests/head_handling.rs +++ b/core/lib/tests/head_handling.rs @@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> { mod head_handling_tests { use super::*; - use tokio_io::{AsyncRead, AsyncReadExt}; + use tokio::io::{AsyncRead, AsyncReadExt}; use rocket::Route; use rocket::local::Client; diff --git a/examples/content_types/Cargo.toml b/examples/content_types/Cargo.toml index bb192c2d..70e6c88f 100644 --- a/examples/content_types/Cargo.toml +++ b/examples/content_types/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" publish = false [dependencies] -tokio = "0.2.0-alpha.6" +tokio = { version = "0.2.0", features = ["io-util"] } rocket = { path = "../../core/lib" } serde = "1.0" serde_json = "1.0" diff --git a/examples/manual_routes/Cargo.toml b/examples/manual_routes/Cargo.toml index 5676090d..7fe20bbb 100644 --- a/examples/manual_routes/Cargo.toml +++ b/examples/manual_routes/Cargo.toml @@ -7,4 +7,4 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -tokio = "=0.2.0-alpha.6" +tokio = { version = "0.2.0", features = ["io-util"] } diff --git a/examples/stream/Cargo.toml b/examples/stream/Cargo.toml index bd27fe4f..9b5728d2 100644 --- a/examples/stream/Cargo.toml +++ b/examples/stream/Cargo.toml @@ -7,6 +7,4 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -tokio = "0.2.0-alpha.6" -futures-preview = "0.3.0-alpha.19" -futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" } +tokio = { version = "0.2.0", features = ["fs", "io-util"] } diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs index 8852a4c3..939b5a28 100644 --- a/examples/stream/src/main.rs +++ b/examples/stream/src/main.rs @@ -6,17 +6,15 @@ use rocket::response::{content, Stream}; -use futures::io::repeat; -use futures_tokio_compat::Compat; use tokio::fs::File; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::{repeat, AsyncRead, AsyncReadExt}; // Generate this file using: head -c BYTES /dev/random > big_file.dat const FILENAME: &str = "big_file.dat"; #[get("/")] fn root() -> content::Plain> { - content::Plain(Stream::from(Compat::new(repeat('a' as u8)).take(25000))) + content::Plain(Stream::from(repeat('a' as u8).take(25000))) } #[get("/big_file")]