Update to 'tokio' 1.0, 'hyper' 0.14.

This commit is contained in:
Jeb Rosen 2020-12-23 17:02:40 -08:00 committed by Sergio Benitez
parent 031948c1da
commit 92af8fca72
18 changed files with 72 additions and 80 deletions

View File

@ -15,7 +15,7 @@ edition = "2018"
# Internal use only.
templates = ["serde", "serde_json", "glob", "notify"]
databases = [
"serde", "r2d2", "tokio/blocking", "tokio/rt-threaded",
"serde", "r2d2", "tokio/rt", "tokio/rt-multi-thread",
"rocket_contrib_codegen/database_attribute"
]
@ -42,7 +42,7 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"]
[dependencies]
# Global dependencies.
tokio = { version = "0.2.0", optional = true }
tokio = { version = "1.0", optional = true }
rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true }
rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false }
log = "0.4"

View File

@ -782,7 +782,7 @@ impl<K: 'static, C: Poolable> ConnectionPool<K, C> {
async fn get(&self) -> Result<Connection<K, C>, ()> {
let duration = std::time::Duration::from_secs(self.config.timeout as u64);
let permit = match timeout(duration, self.semaphore.clone().acquire_owned()).await {
Ok(p) => p,
Ok(p) => p.expect("internal invariant broken: semaphore should not be closed"),
Err(_) => {
error_!("database connection retrieval timed out");
return Err(());

View File

@ -22,14 +22,14 @@ private-cookies = ["cookie/private", "cookie/key-expansion"]
[dependencies]
smallvec = "1.0"
percent-encoding = "2"
hyper = { version = "0.13.0", default-features = false, features = ["runtime"] }
hyper = { version = "0.14", default-features = false, features = ["http1", "http2", "runtime", "server", "stream"] }
http = "0.2"
mime = "0.3.13"
time = "0.2.11"
indexmap = { version = "1.5.2", features = ["std"] }
state = "0.4"
tokio-rustls = { version = "0.14.0", optional = true }
tokio = { version = "0.2.9", features = ["sync", "tcp", "time"] }
tokio-rustls = { version = "0.22.0", optional = true }
tokio = { version = "1.0", features = ["net", "sync", "time"] }
unicode-xid = "0.2"
log = "0.4"
ref-cast = "1.0"
@ -37,6 +37,7 @@ uncased = "0.9"
parking_lot = "0.11"
either = "1"
pear = "0.2"
pin-project-lite = "0.2"
[dependencies.cookie]
git = "https://github.com/SergioBenitez/cookie-rs.git"

View File

@ -4,10 +4,10 @@
//! These types will, with certainty, be removed with time, but they reside here
//! while necessary.
#[doc(hidden)] pub use hyper::{Body, Request, Response, Server};
#[doc(hidden)] pub use hyper::{Body, Error, Request, Response};
#[doc(hidden)] pub use hyper::body::{Bytes, HttpBody, Sender as BodySender};
#[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::rt::Executor;
#[doc(hidden)] pub use hyper::server::Server;
#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, Service};
#[doc(hidden)] pub use http::header::HeaderMap;

View File

@ -10,7 +10,7 @@ use hyper::server::accept::Accept;
use log::{debug, error};
use tokio::time::Delay;
use tokio::time::Sleep;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
@ -32,15 +32,18 @@ pub trait Connection: AsyncRead + AsyncWrite {
fn remote_addr(&self) -> Option<SocketAddr>;
}
/// This is a generic version of hyper's AddrIncoming that is intended to be
/// usable with listeners other than a plain TCP stream, e.g. TLS and/or Unix
/// sockets. It does so by bridging the `Listener` trait to what hyper wants (an
/// Accept). This type is internal to Rocket.
#[must_use = "streams do nothing unless polled"]
pub struct Incoming<L> {
pin_project_lite::pin_project! {
/// This is a generic version of hyper's AddrIncoming that is intended to be
/// usable with listeners other than a plain TCP stream, e.g. TLS and/or Unix
/// sockets. It does so by bridging the `Listener` trait to what hyper wants (an
/// Accept). This type is internal to Rocket.
#[must_use = "streams do nothing unless polled"]
pub struct Incoming<L> {
listener: L,
sleep_on_errors: Option<Duration>,
pending_error_delay: Option<Delay>,
#[pin]
pending_error_delay: Option<Sleep>,
}
}
impl<L: Listener> Incoming<L> {
@ -72,18 +75,19 @@ impl<L: Listener> Incoming<L> {
self.sleep_on_errors = val;
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<L::Connection>> {
// Check if a previous delay is active that was set by IO errors.
if let Some(ref mut delay) = self.pending_error_delay {
match Pin::new(delay).poll(cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<L::Connection>> {
let mut me = self.project();
loop {
// Check if a previous sleep timer is active that was set by IO errors.
if let Some(delay) = me.pending_error_delay.as_mut().as_pin_mut() {
match delay.poll(cx) {
Poll::Ready(()) => {}
Poll::Pending => return Poll::Pending,
}
}
self.pending_error_delay = None;
me.pending_error_delay.set(None);
loop {
match self.listener.poll_accept(cx) {
match me.listener.poll_accept(cx) {
Poll::Ready(Ok(stream)) => {
return Poll::Ready(Ok(stream));
},
@ -96,22 +100,11 @@ impl<L: Listener> Incoming<L> {
continue;
}
if let Some(duration) = self.sleep_on_errors {
if let Some(duration) = me.sleep_on_errors {
error!("accept error: {}", e);
// Sleep for the specified duration
let mut error_delay = tokio::time::delay_for(duration);
match Pin::new(&mut error_delay).poll(cx) {
Poll::Ready(()) => {
// Wow, it's been a second already? Ok then...
continue
},
Poll::Pending => {
self.pending_error_delay = Some(error_delay);
return Poll::Pending;
},
}
me.pending_error_delay.set(Some(tokio::time::sleep(*duration)));
} else {
return Poll::Ready(Err(e));
}
@ -126,7 +119,7 @@ impl<L: Listener + Unpin> Accept for Incoming<L> {
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<io::Result<Self::Conn>>> {
self.poll_next(cx).map(Some)
@ -169,7 +162,7 @@ impl Listener for TcpListener {
}
fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self::Connection>> {
self.poll_accept(cx).map_ok(|(stream, _addr)| stream)
(*self).poll_accept(cx).map_ok(|(stream, _addr)| stream)
}
}

View File

@ -46,8 +46,8 @@ rand = "0.7"
either = "1"
[dependencies.tokio]
version = "0.2.9"
features = ["fs", "io-std", "io-util", "rt-threaded", "sync", "signal", "macros"]
version = "1.0"
features = ["fs", "io-std", "io-util", "rt-multi-thread", "sync", "signal", "macros"]
[build-dependencies]
yansi = "0.5"

View File

@ -3,7 +3,7 @@ use std::task::{Context, Poll};
use std::path::Path;
use std::io::{self, Cursor};
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, Take};
use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, ReadBuf, Take};
use crate::ext::AsyncReadBody;
@ -116,12 +116,12 @@ impl AsyncRead for DataStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.buffer.limit() > 0 {
trace_!("DataStream::buffer_read()");
match Pin::new(&mut self.buffer).poll_read(cx, buf) {
Poll::Ready(Ok(0)) => { /* fall through */ },
Poll::Ready(Ok(())) if buf.filled().is_empty() => { /* fall through */ },
poll => return poll,
}
}

View File

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::task::{Poll, Context};
use futures::{ready, stream::Stream};
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};
use crate::http::hyper::{self, Bytes, HttpBody};
@ -23,11 +23,13 @@ impl<R> Stream for IntoBytesStream<R>
let Self { ref mut inner, ref mut buffer, buf_size } = *self;
match Pin::new(inner).poll_read(cx, &mut buffer[..]) {
let mut buf = ReadBuf::new(&mut buffer[..]);
match Pin::new(inner).poll_read(cx, &mut buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Ready(Ok(n)) if n == 0 => Poll::Ready(None),
Poll::Ready(Ok(n)) => {
Poll::Ready(Ok(())) if buf.filled().is_empty() => Poll::Ready(None),
Poll::Ready(Ok(())) => {
let n = buf.filled().len();
// FIXME(perf).
let mut next = std::mem::replace(buffer, vec![0; buf_size]);
next.truncate(n);
@ -72,8 +74,8 @@ impl AsyncRead for AsyncReadBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
loop {
match self.state {
State::Pending => {
@ -90,11 +92,11 @@ impl AsyncRead for AsyncReadBody {
},
State::Partial(ref mut cursor) => {
match ready!(Pin::new(cursor).poll_read(cx, buf)) {
Ok(n) if n == 0 => self.state = State::Pending,
Ok(()) if buf.filled().is_empty() => self.state = State::Pending,
result => return Poll::Ready(result),
}
}
State::Done => return Poll::Ready(Ok(0)),
State::Done => return Poll::Ready(Ok(())),
}
}
}

View File

@ -164,10 +164,9 @@ pub fn custom<T: figment::Provider>(provider: T) -> Rocket {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)]
pub fn async_test<R>(fut: impl std::future::Future<Output = R> + Send) -> R {
tokio::runtime::Builder::new()
.threaded_scheduler()
tokio::runtime::Builder::new_multi_thread()
.thread_name("rocket-test-worker-thread")
.core_threads(1)
.worker_threads(1)
.enable_all()
.build()
.expect("create tokio runtime")
@ -180,9 +179,8 @@ pub fn async_main<R>(fut: impl std::future::Future<Output = R> + Send) -> R {
// FIXME: The `workers` value won't reflect swaps of `Rocket` in attach
// fairings with different config values, or values from non-Rocket configs.
// See tokio-rs/tokio#3329 for a necessary solution in `tokio`.
tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(Config::from(Config::figment()).workers)
tokio::runtime::Builder::new_multi_thread()
.worker_threads(Config::from(Config::figment()).workers)
.thread_name("rocket-worker-thread")
.enable_all()
.build()

View File

@ -2,7 +2,7 @@ use std::io;
use std::future::Future;
use std::{pin::Pin, task::{Context, Poll}};
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};
use crate::http::CookieJar;
use crate::{Request, Response};
@ -125,11 +125,11 @@ impl AsyncRead for LocalResponse<'_> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8]
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let body = match self.response.body_mut() {
Some(body) => body,
_ => return Poll::Ready(Ok(0))
_ => return Poll::Ready(Ok(()))
};
Pin::new(body.as_reader()).poll_read(cx, buf)

View File

@ -32,8 +32,7 @@ pub struct Client {
impl Client {
fn _new(rocket: Rocket, tracked: bool) -> Result<Client, Error> {
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("create tokio runtime");

View File

@ -305,7 +305,8 @@ impl Rocket {
let (rocket, mut fairings) = match tokio::runtime::Handle::try_current() {
Ok(handle) => {
std::thread::spawn(move || {
handle.block_on(future)
let _e = handle.enter();
futures::executor::block_on(future)
}).join().unwrap()
}
Err(_) => {

View File

@ -41,7 +41,7 @@ impl Shutdown {
/// immediately; pending requests will continue to run until completion
/// before the actual shutdown occurs.
#[inline]
pub fn shutdown(mut self) {
pub fn shutdown(self) {
// Intentionally ignore any error, as the only scenarios this can happen
// is sending too many shutdown requests or we're already shut down.
let _ = self.0.try_send(());

View File

@ -4,7 +4,7 @@ async fn test_await_timer_inside_attach() {
async fn do_async_setup() {
// By using a timer or I/O resource, we ensure that do_async_setup will
// deadlock if no thread is able to tick the time or I/O drivers.
rocket::tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
rocket::tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
rocket::ignite()

View File

@ -7,4 +7,3 @@ publish = false
[dependencies]
rocket = { path = "../../core/lib" }
tokio = { version = "0.2.0", features = ["io-util"] }

View File

@ -7,4 +7,3 @@ publish = false
[dependencies]
rocket = { path = "../../core/lib" }
tokio = { version = "0.2.0", features = ["fs", "io-util"] }

View File

@ -4,8 +4,8 @@
use rocket::response::{content, Stream};
use tokio::fs::File;
use tokio::io::{repeat, AsyncRead, AsyncReadExt};
use rocket::tokio::fs::File;
use rocket::tokio::io::{repeat, AsyncRead, AsyncReadExt};
// Generate this file using: head -c BYTES /dev/random > big_file.dat
const FILENAME: &str = "big_file.dat";

View File

@ -999,16 +999,16 @@ Rocket makes it easy to use `async/await` in routes.
```rust
# #[macro_use] extern crate rocket;
use rocket::tokio::time::{delay_for, Duration};
use rocket::tokio::time::{sleep, Duration};
#[get("/delay/<seconds>")]
async fn delay(seconds: u64) -> String {
delay_for(Duration::from_secs(seconds)).await;
sleep(Duration::from_secs(seconds)).await;
format!("Waited for {} seconds", seconds)
}
```
First, notice that the route function is an `async fn`. This enables
the use of `await` inside the handler. `delay_for` is an asynchronous
the use of `await` inside the handler. `sleep` is an asynchronous
function, so we must `await` it.
## Error Catchers