diff --git a/contrib/lib/Cargo.toml b/contrib/lib/Cargo.toml index b34ddeea..f775af38 100644 --- a/contrib/lib/Cargo.toml +++ b/contrib/lib/Cargo.toml @@ -15,7 +15,7 @@ edition = "2018" # Internal use only. templates = ["serde", "serde_json", "glob", "notify"] databases = [ - "serde", "r2d2", "tokio/blocking", "tokio/rt-threaded", + "serde", "r2d2", "tokio/rt", "tokio/rt-multi-thread", "rocket_contrib_codegen/database_attribute" ] @@ -42,7 +42,7 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"] [dependencies] # Global dependencies. -tokio = { version = "0.2.0", optional = true } +tokio = { version = "1.0", optional = true } rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } log = "0.4" diff --git a/contrib/lib/src/databases.rs b/contrib/lib/src/databases.rs index 0a1ac811..5a4d7ab2 100644 --- a/contrib/lib/src/databases.rs +++ b/contrib/lib/src/databases.rs @@ -782,7 +782,7 @@ impl ConnectionPool { async fn get(&self) -> Result, ()> { let duration = std::time::Duration::from_secs(self.config.timeout as u64); let permit = match timeout(duration, self.semaphore.clone().acquire_owned()).await { - Ok(p) => p, + Ok(p) => p.expect("internal invariant broken: semaphore should not be closed"), Err(_) => { error_!("database connection retrieval timed out"); return Err(()); diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index 9e0d4947..fad87ea4 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -22,14 +22,14 @@ private-cookies = ["cookie/private", "cookie/key-expansion"] [dependencies] smallvec = "1.0" percent-encoding = "2" -hyper = { version = "0.13.0", default-features = false, features = ["runtime"] } +hyper = { version = "0.14", default-features = false, features = ["http1", "http2", "runtime", "server", "stream"] } http = "0.2" mime = "0.3.13" time = "0.2.11" indexmap = { version = "1.5.2", features = ["std"] } state = "0.4" -tokio-rustls = { version = "0.14.0", optional = true } -tokio = { version = "0.2.9", features = ["sync", "tcp", "time"] } +tokio-rustls = { version = "0.22.0", optional = true } +tokio = { version = "1.0", features = ["net", "sync", "time"] } unicode-xid = "0.2" log = "0.4" ref-cast = "1.0" @@ -37,6 +37,7 @@ uncased = "0.9" parking_lot = "0.11" either = "1" pear = "0.2" +pin-project-lite = "0.2" [dependencies.cookie] git = "https://github.com/SergioBenitez/cookie-rs.git" diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index c84102e9..cb007e81 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -4,10 +4,10 @@ //! These types will, with certainty, be removed with time, but they reside here //! while necessary. -#[doc(hidden)] pub use hyper::{Body, Request, Response, Server}; +#[doc(hidden)] pub use hyper::{Body, Error, Request, Response}; #[doc(hidden)] pub use hyper::body::{Bytes, HttpBody, Sender as BodySender}; -#[doc(hidden)] pub use hyper::error::Error; #[doc(hidden)] pub use hyper::rt::Executor; +#[doc(hidden)] pub use hyper::server::Server; #[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, Service}; #[doc(hidden)] pub use http::header::HeaderMap; diff --git a/core/http/src/listener.rs b/core/http/src/listener.rs index fe998abe..f3265d0a 100644 --- a/core/http/src/listener.rs +++ b/core/http/src/listener.rs @@ -10,7 +10,7 @@ use hyper::server::accept::Accept; use log::{debug, error}; -use tokio::time::Delay; +use tokio::time::Sleep; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; @@ -32,15 +32,18 @@ pub trait Connection: AsyncRead + AsyncWrite { fn remote_addr(&self) -> Option; } -/// This is a generic version of hyper's AddrIncoming that is intended to be -/// usable with listeners other than a plain TCP stream, e.g. TLS and/or Unix -/// sockets. It does so by bridging the `Listener` trait to what hyper wants (an -/// Accept). This type is internal to Rocket. -#[must_use = "streams do nothing unless polled"] -pub struct Incoming { - listener: L, - sleep_on_errors: Option, - pending_error_delay: Option, +pin_project_lite::pin_project! { + /// This is a generic version of hyper's AddrIncoming that is intended to be + /// usable with listeners other than a plain TCP stream, e.g. TLS and/or Unix + /// sockets. It does so by bridging the `Listener` trait to what hyper wants (an + /// Accept). This type is internal to Rocket. + #[must_use = "streams do nothing unless polled"] + pub struct Incoming { + listener: L, + sleep_on_errors: Option, + #[pin] + pending_error_delay: Option, + } } impl Incoming { @@ -72,18 +75,19 @@ impl Incoming { self.sleep_on_errors = val; } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { - // Check if a previous delay is active that was set by IO errors. - if let Some(ref mut delay) = self.pending_error_delay { - match Pin::new(delay).poll(cx) { - Poll::Ready(()) => {} - Poll::Pending => return Poll::Pending, - } - } - self.pending_error_delay = None; - + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); loop { - match self.listener.poll_accept(cx) { + // Check if a previous sleep timer is active that was set by IO errors. + if let Some(delay) = me.pending_error_delay.as_mut().as_pin_mut() { + match delay.poll(cx) { + Poll::Ready(()) => {} + Poll::Pending => return Poll::Pending, + } + } + me.pending_error_delay.set(None); + + match me.listener.poll_accept(cx) { Poll::Ready(Ok(stream)) => { return Poll::Ready(Ok(stream)); }, @@ -96,22 +100,11 @@ impl Incoming { continue; } - if let Some(duration) = self.sleep_on_errors { + if let Some(duration) = me.sleep_on_errors { error!("accept error: {}", e); // Sleep for the specified duration - let mut error_delay = tokio::time::delay_for(duration); - - match Pin::new(&mut error_delay).poll(cx) { - Poll::Ready(()) => { - // Wow, it's been a second already? Ok then... - continue - }, - Poll::Pending => { - self.pending_error_delay = Some(error_delay); - return Poll::Pending; - }, - } + me.pending_error_delay.set(Some(tokio::time::sleep(*duration))); } else { return Poll::Ready(Err(e)); } @@ -126,7 +119,7 @@ impl Accept for Incoming { type Error = io::Error; fn poll_accept( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll>> { self.poll_next(cx).map(Some) @@ -169,7 +162,7 @@ impl Listener for TcpListener { } fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { - self.poll_accept(cx).map_ok(|(stream, _addr)| stream) + (*self).poll_accept(cx).map_ok(|(stream, _addr)| stream) } } diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index c99ebcec..de6cced3 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -46,8 +46,8 @@ rand = "0.7" either = "1" [dependencies.tokio] -version = "0.2.9" -features = ["fs", "io-std", "io-util", "rt-threaded", "sync", "signal", "macros"] +version = "1.0" +features = ["fs", "io-std", "io-util", "rt-multi-thread", "sync", "signal", "macros"] [build-dependencies] yansi = "0.5" diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index b1e6b776..a968ec42 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use std::path::Path; use std::io::{self, Cursor}; -use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, Take}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, ReadBuf, Take}; use crate::ext::AsyncReadBody; @@ -116,12 +116,12 @@ impl AsyncRead for DataStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { if self.buffer.limit() > 0 { trace_!("DataStream::buffer_read()"); match Pin::new(&mut self.buffer).poll_read(cx, buf) { - Poll::Ready(Ok(0)) => { /* fall through */ }, + Poll::Ready(Ok(())) if buf.filled().is_empty() => { /* fall through */ }, poll => return poll, } } diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index 97bfa760..d851a507 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Poll, Context}; use futures::{ready, stream::Stream}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; use crate::http::hyper::{self, Bytes, HttpBody}; @@ -23,11 +23,13 @@ impl Stream for IntoBytesStream let Self { ref mut inner, ref mut buffer, buf_size } = *self; - match Pin::new(inner).poll_read(cx, &mut buffer[..]) { + let mut buf = ReadBuf::new(&mut buffer[..]); + match Pin::new(inner).poll_read(cx, &mut buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), - Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None), - Poll::Ready(Ok(n)) => { + Poll::Ready(Ok(())) if buf.filled().is_empty() => Poll::Ready(None), + Poll::Ready(Ok(())) => { + let n = buf.filled().len(); // FIXME(perf). let mut next = std::mem::replace(buffer, vec![0; buf_size]); next.truncate(n); @@ -72,8 +74,8 @@ impl AsyncRead for AsyncReadBody { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { loop { match self.state { State::Pending => { @@ -90,11 +92,11 @@ impl AsyncRead for AsyncReadBody { }, State::Partial(ref mut cursor) => { match ready!(Pin::new(cursor).poll_read(cx, buf)) { - Ok(n) if n == 0 => self.state = State::Pending, + Ok(()) if buf.filled().is_empty() => self.state = State::Pending, result => return Poll::Ready(result), } } - State::Done => return Poll::Ready(Ok(0)), + State::Done => return Poll::Ready(Ok(())), } } } diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index 6d98739e..e9cd26a9 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -164,10 +164,9 @@ pub fn custom(provider: T) -> Rocket { /// WARNING: This is unstable! Do not use this method outside of Rocket! #[doc(hidden)] pub fn async_test(fut: impl std::future::Future + Send) -> R { - tokio::runtime::Builder::new() - .threaded_scheduler() + tokio::runtime::Builder::new_multi_thread() .thread_name("rocket-test-worker-thread") - .core_threads(1) + .worker_threads(1) .enable_all() .build() .expect("create tokio runtime") @@ -180,9 +179,8 @@ pub fn async_main(fut: impl std::future::Future + Send) -> R { // FIXME: The `workers` value won't reflect swaps of `Rocket` in attach // fairings with different config values, or values from non-Rocket configs. // See tokio-rs/tokio#3329 for a necessary solution in `tokio`. - tokio::runtime::Builder::new() - .threaded_scheduler() - .core_threads(Config::from(Config::figment()).workers) + tokio::runtime::Builder::new_multi_thread() + .worker_threads(Config::from(Config::figment()).workers) .thread_name("rocket-worker-thread") .enable_all() .build() diff --git a/core/lib/src/local/asynchronous/response.rs b/core/lib/src/local/asynchronous/response.rs index 29b19bc5..57547b15 100644 --- a/core/lib/src/local/asynchronous/response.rs +++ b/core/lib/src/local/asynchronous/response.rs @@ -2,7 +2,7 @@ use std::io; use std::future::Future; use std::{pin::Pin, task::{Context, Poll}}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; use crate::http::CookieJar; use crate::{Request, Response}; @@ -125,11 +125,11 @@ impl AsyncRead for LocalResponse<'_> { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { let body = match self.response.body_mut() { Some(body) => body, - _ => return Poll::Ready(Ok(0)) + _ => return Poll::Ready(Ok(())) }; Pin::new(body.as_reader()).poll_read(cx, buf) diff --git a/core/lib/src/local/blocking/client.rs b/core/lib/src/local/blocking/client.rs index e4f4e5d4..108b907e 100644 --- a/core/lib/src/local/blocking/client.rs +++ b/core/lib/src/local/blocking/client.rs @@ -32,8 +32,7 @@ pub struct Client { impl Client { fn _new(rocket: Rocket, tracked: bool) -> Result { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("create tokio runtime"); diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 03752759..5b2655d8 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -305,7 +305,8 @@ impl Rocket { let (rocket, mut fairings) = match tokio::runtime::Handle::try_current() { Ok(handle) => { std::thread::spawn(move || { - handle.block_on(future) + let _e = handle.enter(); + futures::executor::block_on(future) }).join().unwrap() } Err(_) => { diff --git a/core/lib/src/shutdown.rs b/core/lib/src/shutdown.rs index 6f34fd6c..68f9e6cb 100644 --- a/core/lib/src/shutdown.rs +++ b/core/lib/src/shutdown.rs @@ -41,7 +41,7 @@ impl Shutdown { /// immediately; pending requests will continue to run until completion /// before the actual shutdown occurs. #[inline] - pub fn shutdown(mut self) { + pub fn shutdown(self) { // Intentionally ignore any error, as the only scenarios this can happen // is sending too many shutdown requests or we're already shut down. let _ = self.0.try_send(()); diff --git a/core/lib/tests/timer-on-attach.rs b/core/lib/tests/timer-on-attach.rs index 700e11de..35af26b2 100644 --- a/core/lib/tests/timer-on-attach.rs +++ b/core/lib/tests/timer-on-attach.rs @@ -4,7 +4,7 @@ async fn test_await_timer_inside_attach() { async fn do_async_setup() { // By using a timer or I/O resource, we ensure that do_async_setup will // deadlock if no thread is able to tick the time or I/O drivers. - rocket::tokio::time::delay_for(std::time::Duration::from_millis(100)).await; + rocket::tokio::time::sleep(std::time::Duration::from_millis(100)).await; } rocket::ignite() diff --git a/examples/manual_routes/Cargo.toml b/examples/manual_routes/Cargo.toml index 7fe20bbb..8519ae94 100644 --- a/examples/manual_routes/Cargo.toml +++ b/examples/manual_routes/Cargo.toml @@ -7,4 +7,3 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -tokio = { version = "0.2.0", features = ["io-util"] } diff --git a/examples/stream/Cargo.toml b/examples/stream/Cargo.toml index 9b5728d2..96792d90 100644 --- a/examples/stream/Cargo.toml +++ b/examples/stream/Cargo.toml @@ -7,4 +7,3 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -tokio = { version = "0.2.0", features = ["fs", "io-util"] } diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs index a733698b..a0b15432 100644 --- a/examples/stream/src/main.rs +++ b/examples/stream/src/main.rs @@ -4,8 +4,8 @@ use rocket::response::{content, Stream}; -use tokio::fs::File; -use tokio::io::{repeat, AsyncRead, AsyncReadExt}; +use rocket::tokio::fs::File; +use rocket::tokio::io::{repeat, AsyncRead, AsyncReadExt}; // Generate this file using: head -c BYTES /dev/random > big_file.dat const FILENAME: &str = "big_file.dat"; diff --git a/site/guide/4-requests.md b/site/guide/4-requests.md index bb63a749..c8bfbae7 100644 --- a/site/guide/4-requests.md +++ b/site/guide/4-requests.md @@ -999,16 +999,16 @@ Rocket makes it easy to use `async/await` in routes. ```rust # #[macro_use] extern crate rocket; -use rocket::tokio::time::{delay_for, Duration}; +use rocket::tokio::time::{sleep, Duration}; #[get("/delay/")] async fn delay(seconds: u64) -> String { - delay_for(Duration::from_secs(seconds)).await; + sleep(Duration::from_secs(seconds)).await; format!("Waited for {} seconds", seconds) } ``` First, notice that the route function is an `async fn`. This enables -the use of `await` inside the handler. `delay_for` is an asynchronous +the use of `await` inside the handler. `sleep` is an asynchronous function, so we must `await` it. ## Error Catchers