diff --git a/contrib/lib/Cargo.toml b/contrib/lib/Cargo.toml index 40ef57aa..15513a60 100644 --- a/contrib/lib/Cargo.toml +++ b/contrib/lib/Cargo.toml @@ -42,7 +42,8 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"] [dependencies] # Global dependencies. -futures-preview = { version = "0.3.0-alpha.18" } +futures-util-preview = "0.3.0-alpha.19" +tokio-io = "=0.2.0-alpha.6" rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } log = "0.4" @@ -87,7 +88,7 @@ brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } [dev-dependencies] -tokio-timer = "=0.3.0-alpha.5" +tokio-timer = "=0.3.0-alpha.6" [package.metadata.docs.rs] all-features = true diff --git a/contrib/lib/src/json.rs b/contrib/lib/src/json.rs index 1ca40c92..0962c449 100644 --- a/contrib/lib/src/json.rs +++ b/contrib/lib/src/json.rs @@ -18,14 +18,13 @@ use std::ops::{Deref, DerefMut}; use std::io; use std::iter::FromIterator; -use futures::io::AsyncReadExt; +use tokio_io::AsyncReadExt; use rocket::request::Request; use rocket::outcome::Outcome::*; use rocket::data::{Transform::*, Transformed, Data, FromData, TransformFuture, FromDataFuture}; use rocket::response::{self, Responder, content}; use rocket::http::Status; -use rocket::AsyncReadExt as _; use serde::{Serialize, Serializer}; use serde::de::{Deserialize, Deserializer}; diff --git a/contrib/lib/src/msgpack.rs b/contrib/lib/src/msgpack.rs index d8a02639..3ed02629 100644 --- a/contrib/lib/src/msgpack.rs +++ b/contrib/lib/src/msgpack.rs @@ -16,14 +16,13 @@ use std::ops::{Deref, DerefMut}; -use futures::io::AsyncReadExt; +use tokio_io::AsyncReadExt; use rocket::request::Request; use rocket::outcome::Outcome::*; use rocket::data::{Data, FromData, FromDataFuture, Transform::*, TransformFuture, Transformed}; use rocket::http::Status; use rocket::response::{self, content, Responder}; -use rocket::AsyncReadExt as _; use serde::Serialize; use serde::de::Deserialize; diff --git a/core/codegen/Cargo.toml b/core/codegen/Cargo.toml index 04c2eb0b..07237e73 100644 --- a/core/codegen/Cargo.toml +++ b/core/codegen/Cargo.toml @@ -28,5 +28,6 @@ version_check = "0.9.1" [dev-dependencies] rocket = { version = "0.5.0-dev", path = "../lib" } -futures-preview = "0.3.0-alpha.18" +futures-preview = "0.3.0-alpha.19" +tokio-io = "0.2.0-alpha.6" compiletest_rs = { version = "0.3", features = ["stable"] } diff --git a/core/codegen/tests/route-data.rs b/core/codegen/tests/route-data.rs index 912fb09a..3a4f38b0 100644 --- a/core/codegen/tests/route-data.rs +++ b/core/codegen/tests/route-data.rs @@ -22,8 +22,7 @@ impl FromDataSimple for Simple { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { Box::pin(async { - use futures::io::AsyncReadExt as _; - use rocket::AsyncReadExt as _; + use tokio_io::AsyncReadExt; let mut string = String::new(); let mut stream = data.open().take(64); diff --git a/core/codegen/tests/route.rs b/core/codegen/tests/route.rs index 89c53d12..0cd6d9d7 100644 --- a/core/codegen/tests/route.rs +++ b/core/codegen/tests/route.rs @@ -30,8 +30,7 @@ impl FromDataSimple for Simple { fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> { Box::pin(async move { - use futures::io::AsyncReadExt as _; - use rocket::AsyncReadExt as _; + use tokio_io::AsyncReadExt; let mut string = String::new(); let mut stream = data.open().take(64); diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index 39b09f86..fb166fd9 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -23,20 +23,20 @@ private-cookies = ["cookie/private", "cookie/key-expansion"] smallvec = "1.0" percent-encoding = "1" # TODO.async: stop using stream-unstable -hyper = { version = "=0.13.0-alpha.2", default-features = false, features = ["unstable-stream"] } +hyper = { version = "=0.13.0-alpha.4", default-features = false, features = ["unstable-stream"] } http = "0.1.17" mime = "0.3.13" time = "0.2.11" indexmap = "1.0" state = "0.4" -tokio-rustls = { version = "0.12.0-alpha.3", optional = true } -tokio-io = "=0.2.0-alpha.5" -tokio-net = "=0.2.0-alpha.5" -tokio-timer = "=0.3.0-alpha.5" +tokio-rustls = { version = "0.12.0-alpha.4", optional = true } +tokio-io = "=0.2.0-alpha.6" +tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp"] } +tokio-timer = "=0.3.0-alpha.6" cookie = { version = "0.14.0", features = ["percent-encode"] } pear = "0.1" unicode-xid = "0.2" -futures-preview = "0.3.0-alpha.18" +futures-core-preview = "0.3.0-alpha.19" log = "0.4" [dev-dependencies] diff --git a/core/http/src/listener.rs b/core/http/src/listener.rs index 758b4218..755e1267 100644 --- a/core/http/src/listener.rs +++ b/core/http/src/listener.rs @@ -101,7 +101,6 @@ impl Incoming { // Sleep for the specified duration let delay = Instant::now() + duration; - // TODO.async: This depends on a tokio Timer being set in the environment let mut error_delay = tokio_timer::delay(delay); match Pin::new(&mut error_delay).poll(cx) { @@ -128,7 +127,7 @@ impl Accept for Incoming { type Error = io::Error; fn poll_accept(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let result = futures::ready!(self.poll_next(cx)); + let result = futures_core::ready!(self.poll_next(cx)); Poll::Ready(Some(result)) } } @@ -157,8 +156,6 @@ impl fmt::Debug for Incoming { } } -// TODO.async: Put these under a feature such as #[cfg(feature = "tokio-runtime")] - pub fn bind_tcp(address: SocketAddr) -> Pin> + Send>> { Box::pin(async move { Ok(TcpListener::bind(address).await?) @@ -174,8 +171,8 @@ impl Listener for TcpListener { fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { // NB: This is only okay because TcpListener::accept() is stateless. - let accept = self.accept(); - futures::pin_mut!(accept); + let mut accept = self.accept(); + let accept = unsafe { Pin::new_unchecked(&mut accept) }; accept.poll(cx).map_ok(|(stream, _addr)| stream) } } diff --git a/core/http/src/tls.rs b/core/http/src/tls.rs index 9dd31f0a..cf8b61b9 100644 --- a/core/http/src/tls.rs +++ b/core/http/src/tls.rs @@ -67,8 +67,6 @@ pub fn load_private_key>(path: P) -> Result(req: &'r Request<'_>) -> futures::future::BoxFuture<'r, response::Result<'r>> { + fn $fn_name<'r>(req: &'r Request<'_>) -> futures_core::future::BoxFuture<'r, response::Result<'r>> { status::Custom(Status::from_code($code).unwrap(), content::Html(error_page_template!($code, $name, $description)) ).respond_to(req) diff --git a/core/lib/src/codegen.rs b/core/lib/src/codegen.rs index e61798e3..c517c432 100644 --- a/core/lib/src/codegen.rs +++ b/core/lib/src/codegen.rs @@ -1,4 +1,4 @@ -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use crate::{Request, Data}; use crate::handler::{Outcome, ErrorHandler}; diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index ead5c874..5749e487 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -1,14 +1,13 @@ +use std::future::Future; +use std::io; use std::path::Path; -use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite}; -use futures::future::Future; -use futures::stream::TryStreamExt; +use tokio_io::{AsyncRead, AsyncWrite, AsyncReadExt as _}; use super::data_stream::DataStream; use crate::http::hyper; - -use crate::ext::AsyncReadExt; +use crate::ext::{AsyncReadExt, AsyncReadBody}; /// The number of bytes to read into the "peek" buffer. const PEEK_BYTES: usize = 512; @@ -135,20 +134,19 @@ impl Data { /// /// ```rust /// use std::io; - /// use futures::io::AllowStdIo; /// use rocket::Data; /// /// async fn handler(mut data: Data) -> io::Result { /// // write all of the data to stdout - /// let written = data.stream_to(AllowStdIo::new(io::stdout())).await?; + /// let written = data.stream_to(tokio::io::stdout()).await?; /// Ok(format!("Wrote {} bytes.", written)) /// } /// ``` #[inline(always)] pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future> + 'w { Box::pin(async move { - let stream = self.open(); - stream.copy_into(&mut writer).await + let mut stream = self.open(); + stream.copy(&mut writer).await }) } @@ -172,7 +170,7 @@ impl Data { #[inline(always)] pub fn stream_to_file + Send + Unpin + 'static>(self, path: P) -> impl Future> { Box::pin(async move { - let mut file = async_std::fs::File::create(path).await?; + let mut file = tokio::fs::File::create(path).await?; let streaming = self.stream_to(&mut file); streaming.await }) @@ -186,9 +184,7 @@ impl Data { pub(crate) async fn new(body: hyper::Body) -> Data { trace_!("Data::new({:?})", body); - let mut stream = body.map_err(|e| { - io::Error::new(io::ErrorKind::Other, e) - }).into_async_read(); + let mut stream = AsyncReadBody::from(body); let mut peek_buf = vec![0; PEEK_BYTES]; diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index 721079bd..dcc370b6 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -1,7 +1,7 @@ use std::pin::Pin; +use std::task::{Context, Poll}; -use futures::io::{AsyncRead, Error as IoError}; -use futures::task::{Poll, Context}; +use tokio_io::AsyncRead; // TODO.async: Consider storing the real type here instead of a Box to avoid // the dynamic dispatch @@ -19,7 +19,7 @@ pub struct DataStream(pub(crate) Vec, pub(crate) Box, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { trace_!("DataStream::poll_read()"); if self.0.len() > 0 { let count = std::cmp::min(buf.len(), self.0.len()); diff --git a/core/lib/src/data/from_data.rs b/core/lib/src/data/from_data.rs index f9df6b99..9ebc93f4 100644 --- a/core/lib/src/data/from_data.rs +++ b/core/lib/src/data/from_data.rs @@ -1,7 +1,8 @@ use std::borrow::Borrow; -use futures::future::{ready, FutureExt, BoxFuture}; -use futures::io::AsyncReadExt; +use futures_core::future::BoxFuture; +use futures_util::future::{ready, FutureExt}; +use tokio_io::AsyncReadExt; use crate::outcome::{self, IntoOutcome}; use crate::outcome::Outcome::*; @@ -193,14 +194,12 @@ pub type FromDataFuture<'a, T, E> = BoxFuture<'a, Outcome>; /// # struct Name<'a> { first: &'a str, last: &'a str, } /// use std::io::{self, Read}; /// -/// use futures::io::AsyncReadExt; +/// use tokio::io::AsyncReadExt; /// /// use rocket::{Request, Data, Outcome::*}; /// use rocket::data::{FromData, Outcome, Transform, Transformed, TransformFuture, FromDataFuture}; /// use rocket::http::Status; /// -/// use rocket::AsyncReadExt as _; -/// /// const NAME_LIMIT: u64 = 256; /// /// enum NameError { @@ -462,14 +461,12 @@ impl<'a> FromData<'a> for Data { /// # /// use std::io::Read; /// -/// use futures::io::AsyncReadExt; +/// use tokio::io::AsyncReadExt; /// /// use rocket::{Request, Data, Outcome, Outcome::*}; /// use rocket::data::{self, FromDataSimple, FromDataFuture}; /// use rocket::http::{Status, ContentType}; /// -/// use rocket::AsyncReadExt as _; -/// /// // Always use a limit to prevent DoS attacks. /// const LIMIT: u64 = 256; /// diff --git a/core/lib/src/ext.rs b/core/lib/src/ext.rs index d9da2f64..8c8e2da6 100644 --- a/core/lib/src/ext.rs +++ b/core/lib/src/ext.rs @@ -1,37 +1,12 @@ -use std::io; +use std::io::{self, Cursor}; use std::pin::Pin; +use std::task::{Poll, Context}; -use futures::io::{AsyncRead, AsyncReadExt as _}; -use futures::future::BoxFuture; -use futures::stream::Stream; -use futures::task::{Poll, Context}; +use futures_core::{ready, future::BoxFuture, stream::Stream}; +use tokio_io::{AsyncRead, AsyncReadExt as _}; -use crate::http::hyper::Chunk; - -// Based on std::io::Take, but for AsyncRead instead of Read -pub struct Take{ - inner: R, - limit: u64, -} - -// TODO.async: Verify correctness of this implementation. -impl AsyncRead for Take where R: AsyncRead + Unpin { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - if self.limit == 0 { - return Poll::Ready(Ok(0)); - } - - let max = std::cmp::min(buf.len() as u64, self.limit) as usize; - match Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(n)) => { - self.limit -= n as u64; - Poll::Ready(Ok(n)) - }, - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - } - } -} +use crate::http::hyper; +use hyper::{Chunk, Payload}; pub struct IntoChunkStream { inner: R, @@ -64,10 +39,6 @@ impl Stream for IntoChunkStream } pub trait AsyncReadExt: AsyncRead { - fn take(self, limit: u64) -> Take where Self: Sized { - Take { inner: self, limit } - } - fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream where Self: Sized { IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] } } @@ -93,3 +64,46 @@ pub trait AsyncReadExt: AsyncRead { } impl AsyncReadExt for T { } + +pub struct AsyncReadBody { + inner: hyper::Body, + state: AsyncReadBodyState, +} + +enum AsyncReadBodyState { + Pending, + Partial(Cursor), + Done, +} + +impl From for AsyncReadBody { + fn from(body: hyper::Body) -> Self { + Self { inner: body, state: AsyncReadBodyState::Pending } + } +} + +impl AsyncRead for AsyncReadBody { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + loop { + match self.state { + AsyncReadBodyState::Pending => { + match ready!(Pin::new(&mut self.inner).poll_data(cx)) { + Some(Ok(chunk)) => self.state = AsyncReadBodyState::Partial(Cursor::new(chunk)), + Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), + None => self.state = AsyncReadBodyState::Done, + } + }, + AsyncReadBodyState::Partial(ref mut cursor) => { + match ready!(Pin::new(cursor).poll_read(cx, buf)) { + Ok(n) if n == 0 => { + self.state = AsyncReadBodyState::Pending; + } + Ok(n) => return Poll::Ready(Ok(n)), + Err(e) => return Poll::Ready(Err(e)), + } + } + AsyncReadBodyState::Done => return Poll::Ready(Ok(0)), + } + } + } +} diff --git a/core/lib/src/fairing/ad_hoc.rs b/core/lib/src/fairing/ad_hoc.rs index bb666e52..8509c513 100644 --- a/core/lib/src/fairing/ad_hoc.rs +++ b/core/lib/src/fairing/ad_hoc.rs @@ -1,6 +1,6 @@ use std::sync::Mutex; -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use crate::{Rocket, Request, Response, Data}; use crate::fairing::{Fairing, Kind, Info}; diff --git a/core/lib/src/fairing/mod.rs b/core/lib/src/fairing/mod.rs index ddc1030f..f8fbc390 100644 --- a/core/lib/src/fairing/mod.rs +++ b/core/lib/src/fairing/mod.rs @@ -47,7 +47,7 @@ //! of other `Fairings` are not jeopardized. For instance, unless it is made //! abundantly clear, a fairing should not rewrite every request. -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use crate::{Rocket, Request, Response, Data}; diff --git a/core/lib/src/handler.rs b/core/lib/src/handler.rs index 96791d7a..7ce867b5 100644 --- a/core/lib/src/handler.rs +++ b/core/lib/src/handler.rs @@ -1,6 +1,6 @@ //! Types and traits for request and error handlers and their return values. -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use crate::data::Data; use crate::request::Request; diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index f343856b..03573e44 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -137,7 +137,6 @@ pub use crate::router::Route; pub use crate::request::{Request, State}; pub use crate::catcher::Catcher; pub use crate::rocket::Rocket; -pub use ext::AsyncReadExt; /// Alias to [`Rocket::ignite()`] Creates a new instance of `Rocket`. pub fn ignite() -> Rocket { diff --git a/core/lib/src/request/form/form.rs b/core/lib/src/request/form/form.rs index 7f488d09..56ca9d0d 100644 --- a/core/lib/src/request/form/form.rs +++ b/core/lib/src/request/form/form.rs @@ -1,12 +1,11 @@ use std::ops::Deref; -use futures::io::AsyncReadExt; +use tokio_io::AsyncReadExt; use crate::outcome::Outcome::*; use crate::request::{Request, form::{FromForm, FormItems, FormDataError}}; use crate::data::{Outcome, Transform, Transformed, Data, FromData, TransformFuture, FromDataFuture}; use crate::http::{Status, uri::{Query, FromUriParam}}; -use crate::ext::AsyncReadExt as _; /// A data guard for parsing [`FromForm`] types strictly. /// @@ -200,7 +199,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form { if !request.content_type().map_or(false, |ct| ct.is_form()) { warn_!("Form data does not have form content type."); - return Box::pin(futures::future::ready(Transform::Borrowed(Forward(data)))); + return Box::pin(futures_util::future::ready(Transform::Borrowed(Forward(data)))); } let limit = request.limits().forms; @@ -216,7 +215,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form { } fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { - Box::pin(futures::future::ready(o.borrowed().and_then(|data| { + Box::pin(futures_util::future::ready(o.borrowed().and_then(|data| { >::from_data(data, true).map(Form) }))) } diff --git a/core/lib/src/request/form/lenient.rs b/core/lib/src/request/form/lenient.rs index d25b3f1c..f075341a 100644 --- a/core/lib/src/request/form/lenient.rs +++ b/core/lib/src/request/form/lenient.rs @@ -105,7 +105,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for LenientForm { } fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> { - Box::pin(futures::future::ready(o.borrowed().and_then(|form| { + Box::pin(futures_util::future::ready(o.borrowed().and_then(|form| { >::from_data(form, false).map(LenientForm) }))) } diff --git a/core/lib/src/response/debug.rs b/core/lib/src/response/debug.rs index 0bba0549..a11d9541 100644 --- a/core/lib/src/response/debug.rs +++ b/core/lib/src/response/debug.rs @@ -20,14 +20,12 @@ use yansi::Paint; /// # #![feature(proc_macro_hygiene)] /// use std::io; /// -/// use futures::io::AsyncReadExt; +/// use tokio::io::AsyncReadExt; /// /// # use rocket::post; /// use rocket::Data; /// use rocket::response::Debug; /// -/// use rocket::AsyncReadExt as _; -/// /// #[post("/", format = "plain", data = "")] /// async fn post(data: Data) -> Result> { /// let mut name = String::with_capacity(32); diff --git a/core/lib/src/response/mod.rs b/core/lib/src/response/mod.rs index d179a09c..3403ed2b 100644 --- a/core/lib/src/response/mod.rs +++ b/core/lib/src/response/mod.rs @@ -49,4 +49,4 @@ pub use self::debug::Debug; /// Type alias for the `Result` of a `Responder::respond` call. pub type Result<'r> = std::result::Result, crate::http::Status>; /// Type alias for the `Result` of a `Responder::respond` call. -pub type ResultFuture<'r> = futures::future::BoxFuture<'r, Result<'r>>; +pub type ResultFuture<'r> = futures_core::future::BoxFuture<'r, Result<'r>>; diff --git a/core/lib/src/response/responder.rs b/core/lib/src/response/responder.rs index bea1f944..26da4e5d 100644 --- a/core/lib/src/response/responder.rs +++ b/core/lib/src/response/responder.rs @@ -1,8 +1,8 @@ use std::fs::File; use std::io::Cursor; -use futures::io::BufReader; -use futures::future; +use futures_util::future::ready; +use tokio_io::BufReader; use crate::http::{Status, ContentType, StatusClass}; use crate::response::{self, Response, Body}; @@ -255,7 +255,7 @@ impl Responder<'_> for Vec { impl Responder<'_> for File { fn respond_to(self, _: &Request<'_>) -> response::ResultFuture<'static> { Box::pin(async move { - let file = async_std::fs::File::from(self); + let file = tokio::fs::File::from(self); let metadata = file.metadata().await; let stream = BufReader::new(file); match metadata { @@ -283,7 +283,7 @@ impl<'r, R: Responder<'r> + Send + 'r> Responder<'r> for Option { Some(r) => r.respond_to(req), None => { warn_!("Response was `None`."); - Box::pin(future::err(Status::NotFound)) + Box::pin(ready(Err(Status::NotFound))) }, } } diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index cc9ec2d4..48c5cedc 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -1,13 +1,13 @@ use std::{io, fmt, str}; use std::borrow::Cow; +use std::future::Future; use std::pin::Pin; -use futures::future::{Future, FutureExt}; -use futures::io::{AsyncRead, AsyncReadExt}; +use tokio_io::{AsyncRead, AsyncReadExt}; +use futures_util::future::FutureExt; use crate::response::{Responder, ResultFuture}; use crate::http::{Header, HeaderMap, Status, ContentType, Cookie}; -use crate::ext::AsyncReadExt as _; /// The default size, in bytes, of a chunk for streamed responses. pub const DEFAULT_CHUNK_SIZE: u64 = 4096; @@ -346,7 +346,7 @@ impl<'r> ResponseBuilder<'r> { /// /// ```rust,ignore /// use rocket::Response; - /// use async_std::fs::File; + /// use tokio::fs::File; /// # use std::io; /// /// # #[allow(dead_code)] @@ -372,7 +372,7 @@ impl<'r> ResponseBuilder<'r> { /// /// ```rust /// use rocket::Response; - /// use async_std::fs::File; + /// use tokio::fs::File; /// # use std::io; /// /// # #[allow(dead_code)] @@ -399,7 +399,7 @@ impl<'r> ResponseBuilder<'r> { /// /// ```rust /// use rocket::Response; - /// use async_std::fs::File; + /// use tokio::fs::File; /// # use std::io; /// /// # #[allow(dead_code)] @@ -1010,7 +1010,7 @@ impl<'r> Response<'r> { pub(crate) fn strip_body(&mut self) { if let Some(body) = self.take_body() { self.body = match body { - Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::empty()), n)), + Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::Cursor::new(&[])), n)), Body::Chunked(..) => None }; } @@ -1057,14 +1057,14 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::repeat; - /// use futures::io::AsyncReadExt; + /// use futures::io::repeat; + /// use futures_tokio_compat::Compat; + /// use tokio::io::AsyncReadExt; /// use rocket::Response; - /// use rocket::AsyncReadExt as _; /// /// # rocket::async_test(async { /// let mut response = Response::new(); - /// response.set_streamed_body(repeat(97).take(5)); + /// response.set_streamed_body(Compat::new(repeat(97)).take(5)); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// # }) /// ``` @@ -1079,14 +1079,14 @@ impl<'r> Response<'r> { /// # Example /// /// ```rust - /// use std::io::repeat; - /// use futures::io::AsyncReadExt; + /// use futures::io::repeat; + /// use futures_tokio_compat::Compat; + /// use tokio::io::AsyncReadExt; /// use rocket::Response; - /// use rocket::AsyncReadExt as _; /// /// # rocket::async_test(async { /// let mut response = Response::new(); - /// response.set_chunked_body(repeat(97).take(5), 10); + /// response.set_chunked_body(Compat::new(repeat(97)).take(5), 10); /// assert_eq!(response.body_string().await, Some("aaaaa".to_string())); /// # }) /// ``` diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index 74fdcd4f..eadb6754 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Debug}; -use futures::io::AsyncRead; +use tokio_io::AsyncRead; use crate::request::Request; use crate::response::{Response, Responder, ResultFuture, DEFAULT_CHUNK_SIZE}; @@ -23,12 +23,10 @@ impl Stream { /// bytes. Note: you probably shouldn't do this. /// /// ```rust - /// use std::io; - /// use futures::io::AllowStdIo; /// use rocket::response::Stream; /// /// # #[allow(unused_variables)] - /// let response = Stream::chunked(AllowStdIo::new(io::stdin()), 10); + /// let response = Stream::chunked(tokio::io::stdin(), 10); /// ``` pub fn chunked(reader: T, chunk_size: u64) -> Stream { Stream(reader, chunk_size) @@ -49,12 +47,10 @@ impl Debug for Stream { /// shouldn't do this. /// /// ```rust -/// use std::io; -/// use futures::io::AllowStdIo; /// use rocket::response::Stream; /// /// # #[allow(unused_variables)] -/// let response = Stream::from(AllowStdIo::new(io::stdin())); +/// let response = Stream::from(tokio::io::stdin()); /// ``` impl From for Stream { fn from(reader: T) -> Self { diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index a2f5603b..49dfc60c 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -5,11 +5,9 @@ use std::io; use std::mem; use std::sync::Arc; -use futures::future::{Future, FutureExt, BoxFuture}; -use futures::channel::{mpsc, oneshot}; -use futures::stream::StreamExt; -use futures::task::{Spawn, SpawnExt}; -use futures_tokio_compat::Compat as TokioCompat; +use futures_core::future::{Future, BoxFuture}; +use futures_channel::{mpsc, oneshot}; +use futures_util::{future::FutureExt, stream::StreamExt}; use yansi::Paint; use state::Container; @@ -54,7 +52,6 @@ pub struct Rocket { fn hyper_service_fn( rocket: Arc, h_addr: std::net::SocketAddr, - mut spawn: impl futures::task::Spawn, hyp_req: hyper::Request, ) -> impl Future, io::Error>> { // This future must return a hyper::Response, but that's not easy @@ -63,7 +60,7 @@ fn hyper_service_fn( // the response metadata (and a body channel) beforehand. let (tx, rx) = oneshot::channel(); - spawn.spawn(async move { + tokio::spawn(async move { // Get all of the information from Hyper. let (h_parts, h_body) = hyp_req.into_parts(); @@ -89,7 +86,7 @@ fn hyper_service_fn( // Dispatch the request to get a response, then write that response out. let r = rocket.dispatch(&mut req, data).await; rocket.issue_response(r, tx).await; - }).expect("failed to spawn handler"); + }); async move { rx.await.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) @@ -708,11 +705,10 @@ impl Rocket { } // TODO.async: Solidify the Listener APIs and make this function public - async fn listen_on(mut self, listener: L, spawn: S) -> Result<(), crate::error::Error> + async fn listen_on(mut self, listener: L) -> Result<(), crate::error::Error> where L: Listener + Send + Unpin + 'static, ::Connection: Send + Unpin + 'static, - S: Spawn + Clone + Send + 'static, { self = self.prelaunch_check().map_err(crate::error::Error::Launch)?; @@ -754,21 +750,19 @@ impl Rocket { .take().expect("shutdown receiver has already been used"); let rocket = Arc::new(self); - let spawn_makeservice = spawn.clone(); let service = hyper::make_service_fn(move |connection: &::Connection| { let rocket = rocket.clone(); let remote_addr = connection.remote_addr().unwrap_or_else(|| "0.0.0.0".parse().unwrap()); - let spawn_service = spawn_makeservice.clone(); async move { Ok::<_, std::convert::Infallible>(hyper::service_fn(move |req| { - hyper_service_fn(rocket.clone(), remote_addr, spawn_service.clone(), req) + hyper_service_fn(rocket.clone(), remote_addr, req) })) } }); // NB: executor must be passed manually here, see hyperium/hyper#1537 hyper::Server::builder(Incoming::from_listener(listener)) - .executor(TokioCompat::new(spawn)) + .executor(tokio::executor::DefaultExecutor::current()) .serve(service) .with_graceful_shutdown(async move { shutdown_receiver.next().await; }) .await @@ -808,10 +802,9 @@ impl Rocket { let full_addr = format!("{}:{}", self.config.address, self.config.port); let addrs = match full_addr.to_socket_addrs() { Ok(a) => a.collect::>(), - Err(e) => return futures::future::err(Launch(From::from(e))).boxed(), + Err(e) => return futures_util::future::ready(Err(Launch(From::from(e)))).boxed(), }; let addr = addrs[0]; - let spawn = TokioCompat::new(runtime.executor()); #[cfg(feature = "ctrl_c_shutdown")] let ( @@ -824,26 +817,26 @@ impl Rocket { let server = async move { macro_rules! listen_on { - ($spawn:expr, $expr:expr) => {{ + ($expr:expr) => {{ let listener = match $expr { Ok(ok) => ok, Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))), }; - self.listen_on(listener, spawn).await + self.listen_on(listener).await }}; } #[cfg(feature = "tls")] { if let Some(tls) = self.config.tls.clone() { - listen_on!(spawn, crate::http::tls::bind_tls(addr, tls.certs, tls.key).await) + listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await) } else { - listen_on!(spawn, crate::http::private::bind_tcp(addr).await) + listen_on!(crate::http::private::bind_tcp(addr).await) } } #[cfg(not(feature = "tls"))] { - listen_on!(spawn, crate::http::private::bind_tcp(addr).await) + listen_on!(crate::http::private::bind_tcp(addr).await) } }; @@ -858,7 +851,7 @@ impl Rocket { runtime.spawn(async move { // Stop listening for `ctrl_c` if the server shuts down // a different way to avoid waiting forever. - futures::future::select( + futures_util::future::select( ctrl_c.next(), cancel_ctrl_c_listener_receiver, ).await; diff --git a/core/lib/src/router/mod.rs b/core/lib/src/router/mod.rs index 6ad00a4e..2167c513 100644 --- a/core/lib/src/router/mod.rs +++ b/core/lib/src/router/mod.rs @@ -3,7 +3,7 @@ mod route; use std::collections::hash_map::HashMap; -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; pub use self::route::Route; diff --git a/core/lib/src/shutdown.rs b/core/lib/src/shutdown.rs index 0a3dc440..7eaab993 100644 --- a/core/lib/src/shutdown.rs +++ b/core/lib/src/shutdown.rs @@ -1,5 +1,5 @@ use crate::request::{FromRequest, Outcome, Request}; -use futures::channel::mpsc; +use futures_channel::mpsc; /// # Example /// diff --git a/core/lib/tests/head_handling.rs b/core/lib/tests/head_handling.rs index 45dc9d47..d000cc06 100644 --- a/core/lib/tests/head_handling.rs +++ b/core/lib/tests/head_handling.rs @@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> { mod head_handling_tests { use super::*; - use futures::io::AsyncReadExt; + use tokio_io::{AsyncRead, AsyncReadExt}; use rocket::Route; use rocket::local::Client; @@ -33,7 +33,7 @@ mod head_handling_tests { routes![index, empty, other] } - async fn assert_empty_sized_body(body: Body, expected_size: u64) { + async fn assert_empty_sized_body(body: Body, expected_size: u64) { match body { Body::Sized(mut body, size) => { let mut buffer = vec![]; diff --git a/core/lib/tests/local-request-content-type-issue-505.rs b/core/lib/tests/local-request-content-type-issue-505.rs index 69b9842a..9a0a172d 100644 --- a/core/lib/tests/local-request-content-type-issue-505.rs +++ b/core/lib/tests/local-request-content-type-issue-505.rs @@ -26,7 +26,7 @@ impl FromDataSimple for HasContentType { type Error = (); fn from_data(request: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, Self::Error> { - Box::pin(futures::future::ready(if request.content_type().is_some() { + Box::pin(futures_util::future::ready(if request.content_type().is_some() { Success(HasContentType) } else { Forward(data) diff --git a/examples/content_types/Cargo.toml b/examples/content_types/Cargo.toml index 385def3b..bb192c2d 100644 --- a/examples/content_types/Cargo.toml +++ b/examples/content_types/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" publish = false [dependencies] -futures-preview = "0.3.0-alpha.18" +tokio = "0.2.0-alpha.6" rocket = { path = "../../core/lib" } serde = "1.0" serde_json = "1.0" diff --git a/examples/content_types/src/main.rs b/examples/content_types/src/main.rs index d31648be..d0acebf9 100644 --- a/examples/content_types/src/main.rs +++ b/examples/content_types/src/main.rs @@ -7,11 +7,10 @@ use std::io; -use futures::io::AsyncReadExt as _; +use tokio::io::AsyncReadExt; use rocket::{Request, data::Data}; use rocket::response::{Debug, content::{Json, Html}}; -use rocket::AsyncReadExt as _; // NOTE: This example explicitly uses the `Json` type from `response::content` // for demonstration purposes. In a real application, _always_ prefer to use diff --git a/examples/manual_routes/Cargo.toml b/examples/manual_routes/Cargo.toml index 2ad42de7..5676090d 100644 --- a/examples/manual_routes/Cargo.toml +++ b/examples/manual_routes/Cargo.toml @@ -7,4 +7,4 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -async-std = "0.99.4" +tokio = "=0.2.0-alpha.6" diff --git a/examples/manual_routes/src/main.rs b/examples/manual_routes/src/main.rs index d2443668..bff7c115 100644 --- a/examples/manual_routes/src/main.rs +++ b/examples/manual_routes/src/main.rs @@ -4,7 +4,8 @@ extern crate rocket; mod tests; use std::env; -use async_std::fs::File; + +use tokio::fs::File; use rocket::{Request, Handler, Route, Data, Catcher, try_outcome}; use rocket::http::{Status, RawStr}; diff --git a/examples/stream/Cargo.toml b/examples/stream/Cargo.toml index 174a13eb..bd27fe4f 100644 --- a/examples/stream/Cargo.toml +++ b/examples/stream/Cargo.toml @@ -7,5 +7,6 @@ publish = false [dependencies] rocket = { path = "../../core/lib" } -futures-preview = "0.3.0-alpha.18" -async-std = "0.99.4" +tokio = "0.2.0-alpha.6" +futures-preview = "0.3.0-alpha.19" +futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" } diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs index eba6bb82..8852a4c3 100644 --- a/examples/stream/src/main.rs +++ b/examples/stream/src/main.rs @@ -6,20 +6,17 @@ use rocket::response::{content, Stream}; -use std::io::repeat; -use async_std::fs::File; - -use rocket::AsyncReadExt as _; - -//type LimitedRepeat = Take; -type LimitedRepeat = Box; +use futures::io::repeat; +use futures_tokio_compat::Compat; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncReadExt}; // Generate this file using: head -c BYTES /dev/random > big_file.dat const FILENAME: &str = "big_file.dat"; #[get("/")] -fn root() -> content::Plain> { - content::Plain(Stream::from(Box::new(repeat('a' as u8).take(25000)) as Box<_>)) +fn root() -> content::Plain> { + content::Plain(Stream::from(Compat::new(repeat('a' as u8)).take(25000))) } #[get("/big_file")]