Update 'hyper', 'futures-*-preview', and 'tokio-*' dependencies.

Use I/O traits and types from 'tokio-io' as much as possible.

A few adapters only exist in futures-io-preview and use
futures-tokio-compat as a bridge for now.
This commit is contained in:
Jeb Rosen 2019-09-22 15:48:08 -07:00 committed by Sergio Benitez
parent 0d89637e8b
commit ea06878581
38 changed files with 159 additions and 173 deletions

View File

@ -42,7 +42,8 @@ memcache_pool = ["databases", "memcache", "r2d2-memcache"]
[dependencies]
# Global dependencies.
futures-preview = { version = "0.3.0-alpha.18" }
futures-util-preview = "0.3.0-alpha.19"
tokio-io = "=0.2.0-alpha.6"
rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true }
rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false }
log = "0.4"
@ -87,7 +88,7 @@ brotli = { version = "3.3", optional = true }
flate2 = { version = "1.0", optional = true }
[dev-dependencies]
tokio-timer = "=0.3.0-alpha.5"
tokio-timer = "=0.3.0-alpha.6"
[package.metadata.docs.rs]
all-features = true

View File

@ -18,14 +18,13 @@ use std::ops::{Deref, DerefMut};
use std::io;
use std::iter::FromIterator;
use futures::io::AsyncReadExt;
use tokio_io::AsyncReadExt;
use rocket::request::Request;
use rocket::outcome::Outcome::*;
use rocket::data::{Transform::*, Transformed, Data, FromData, TransformFuture, FromDataFuture};
use rocket::response::{self, Responder, content};
use rocket::http::Status;
use rocket::AsyncReadExt as _;
use serde::{Serialize, Serializer};
use serde::de::{Deserialize, Deserializer};

View File

@ -16,14 +16,13 @@
use std::ops::{Deref, DerefMut};
use futures::io::AsyncReadExt;
use tokio_io::AsyncReadExt;
use rocket::request::Request;
use rocket::outcome::Outcome::*;
use rocket::data::{Data, FromData, FromDataFuture, Transform::*, TransformFuture, Transformed};
use rocket::http::Status;
use rocket::response::{self, content, Responder};
use rocket::AsyncReadExt as _;
use serde::Serialize;
use serde::de::Deserialize;

View File

@ -28,5 +28,6 @@ version_check = "0.9.1"
[dev-dependencies]
rocket = { version = "0.5.0-dev", path = "../lib" }
futures-preview = "0.3.0-alpha.18"
futures-preview = "0.3.0-alpha.19"
tokio-io = "0.2.0-alpha.6"
compiletest_rs = { version = "0.3", features = ["stable"] }

View File

@ -22,8 +22,7 @@ impl FromDataSimple for Simple {
fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> {
Box::pin(async {
use futures::io::AsyncReadExt as _;
use rocket::AsyncReadExt as _;
use tokio_io::AsyncReadExt;
let mut string = String::new();
let mut stream = data.open().take(64);

View File

@ -30,8 +30,7 @@ impl FromDataSimple for Simple {
fn from_data(_: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, ()> {
Box::pin(async move {
use futures::io::AsyncReadExt as _;
use rocket::AsyncReadExt as _;
use tokio_io::AsyncReadExt;
let mut string = String::new();
let mut stream = data.open().take(64);

View File

@ -23,20 +23,20 @@ private-cookies = ["cookie/private", "cookie/key-expansion"]
smallvec = "1.0"
percent-encoding = "1"
# TODO.async: stop using stream-unstable
hyper = { version = "=0.13.0-alpha.2", default-features = false, features = ["unstable-stream"] }
hyper = { version = "=0.13.0-alpha.4", default-features = false, features = ["unstable-stream"] }
http = "0.1.17"
mime = "0.3.13"
time = "0.2.11"
indexmap = "1.0"
state = "0.4"
tokio-rustls = { version = "0.12.0-alpha.3", optional = true }
tokio-io = "=0.2.0-alpha.5"
tokio-net = "=0.2.0-alpha.5"
tokio-timer = "=0.3.0-alpha.5"
tokio-rustls = { version = "0.12.0-alpha.4", optional = true }
tokio-io = "=0.2.0-alpha.6"
tokio-net = { version = "=0.2.0-alpha.6", features = ["tcp"] }
tokio-timer = "=0.3.0-alpha.6"
cookie = { version = "0.14.0", features = ["percent-encode"] }
pear = "0.1"
unicode-xid = "0.2"
futures-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.19"
log = "0.4"
[dev-dependencies]

View File

@ -101,7 +101,6 @@ impl<L: Listener> Incoming<L> {
// Sleep for the specified duration
let delay = Instant::now() + duration;
// TODO.async: This depends on a tokio Timer being set in the environment
let mut error_delay = tokio_timer::delay(delay);
match Pin::new(&mut error_delay).poll(cx) {
@ -128,7 +127,7 @@ impl<L: Listener + Unpin> Accept for Incoming<L> {
type Error = io::Error;
fn poll_accept(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let result = futures::ready!(self.poll_next(cx));
let result = futures_core::ready!(self.poll_next(cx));
Poll::Ready(Some(result))
}
}
@ -157,8 +156,6 @@ impl<L: fmt::Debug> fmt::Debug for Incoming<L> {
}
}
// TODO.async: Put these under a feature such as #[cfg(feature = "tokio-runtime")]
pub fn bind_tcp(address: SocketAddr) -> Pin<Box<dyn Future<Output=Result<TcpListener, io::Error>> + Send>> {
Box::pin(async move {
Ok(TcpListener::bind(address).await?)
@ -174,8 +171,8 @@ impl Listener for TcpListener {
fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Connection, io::Error>> {
// NB: This is only okay because TcpListener::accept() is stateless.
let accept = self.accept();
futures::pin_mut!(accept);
let mut accept = self.accept();
let accept = unsafe { Pin::new_unchecked(&mut accept) };
accept.poll(cx).map_ok(|(stream, _addr)| stream)
}
}

View File

@ -67,8 +67,6 @@ pub fn load_private_key<P: AsRef<Path>>(path: P) -> Result<rustls::PrivateKey, E
}
}
// TODO.async: Put these under a feature such as #[cfg(feature = "tokio-runtime")]
pub struct TlsListener {
listener: TcpListener,
acceptor: TlsAcceptor,

View File

@ -27,9 +27,11 @@ ctrl_c_shutdown = ["tokio/signal"]
[dependencies]
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" }
futures-preview = "0.3.0-alpha.18"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }
tokio = "=0.2.0-alpha.5"
futures-core-preview = "0.3.0-alpha.19"
futures-channel-preview = "0.3.0-alpha.19"
futures-util-preview = "0.3.0-alpha.19"
tokio = "=0.2.0-alpha.6"
tokio-io = "=0.2.0-alpha.6"
yansi = "0.5"
log = { version = "0.4", features = ["std"] }
toml = "0.4.7"
@ -40,12 +42,13 @@ memchr = "2" # TODO: Use pear instead.
binascii = "0.1"
pear = "0.1"
atty = "0.2"
async-std = "0.99.4"
[build-dependencies]
yansi = "0.5"
version_check = "0.9.1"
[dev-dependencies]
futures-preview = "0.3.0-alpha.19"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }
# TODO: Find a way to not depend on this.
lazy_static = "1.0"

View File

@ -1,4 +1,4 @@
use futures::future::Future;
use std::future::Future;
use crate::response;
use crate::handler::ErrorHandler;
@ -154,7 +154,7 @@ macro_rules! default_catchers {
let mut map = HashMap::new();
$(
fn $fn_name<'r>(req: &'r Request<'_>) -> futures::future::BoxFuture<'r, response::Result<'r>> {
fn $fn_name<'r>(req: &'r Request<'_>) -> futures_core::future::BoxFuture<'r, response::Result<'r>> {
status::Custom(Status::from_code($code).unwrap(),
content::Html(error_page_template!($code, $name, $description))
).respond_to(req)

View File

@ -1,4 +1,4 @@
use futures::future::BoxFuture;
use futures_core::future::BoxFuture;
use crate::{Request, Data};
use crate::handler::{Outcome, ErrorHandler};

View File

@ -1,14 +1,13 @@
use std::future::Future;
use std::io;
use std::path::Path;
use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite};
use futures::future::Future;
use futures::stream::TryStreamExt;
use tokio_io::{AsyncRead, AsyncWrite, AsyncReadExt as _};
use super::data_stream::DataStream;
use crate::http::hyper;
use crate::ext::AsyncReadExt;
use crate::ext::{AsyncReadExt, AsyncReadBody};
/// The number of bytes to read into the "peek" buffer.
const PEEK_BYTES: usize = 512;
@ -135,20 +134,19 @@ impl Data {
///
/// ```rust
/// use std::io;
/// use futures::io::AllowStdIo;
/// use rocket::Data;
///
/// async fn handler(mut data: Data) -> io::Result<String> {
/// // write all of the data to stdout
/// let written = data.stream_to(AllowStdIo::new(io::stdout())).await?;
/// let written = data.stream_to(tokio::io::stdout()).await?;
/// Ok(format!("Wrote {} bytes.", written))
/// }
/// ```
#[inline(always)]
pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future<Output = io::Result<u64>> + 'w {
Box::pin(async move {
let stream = self.open();
stream.copy_into(&mut writer).await
let mut stream = self.open();
stream.copy(&mut writer).await
})
}
@ -172,7 +170,7 @@ impl Data {
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path> + Send + Unpin + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
Box::pin(async move {
let mut file = async_std::fs::File::create(path).await?;
let mut file = tokio::fs::File::create(path).await?;
let streaming = self.stream_to(&mut file);
streaming.await
})
@ -186,9 +184,7 @@ impl Data {
pub(crate) async fn new(body: hyper::Body) -> Data {
trace_!("Data::new({:?})", body);
let mut stream = body.map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
}).into_async_read();
let mut stream = AsyncReadBody::from(body);
let mut peek_buf = vec![0; PEEK_BYTES];

View File

@ -1,7 +1,7 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::io::{AsyncRead, Error as IoError};
use futures::task::{Poll, Context};
use tokio_io::AsyncRead;
// TODO.async: Consider storing the real type here instead of a Box to avoid
// the dynamic dispatch
@ -19,7 +19,7 @@ pub struct DataStream(pub(crate) Vec<u8>, pub(crate) Box<dyn AsyncRead + Unpin +
// possible since Hyper's `HttpReader` doesn't implement `BufRead`.
impl AsyncRead for DataStream {
#[inline(always)]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, std::io::Error>> {
trace_!("DataStream::poll_read()");
if self.0.len() > 0 {
let count = std::cmp::min(buf.len(), self.0.len());

View File

@ -1,7 +1,8 @@
use std::borrow::Borrow;
use futures::future::{ready, FutureExt, BoxFuture};
use futures::io::AsyncReadExt;
use futures_core::future::BoxFuture;
use futures_util::future::{ready, FutureExt};
use tokio_io::AsyncReadExt;
use crate::outcome::{self, IntoOutcome};
use crate::outcome::Outcome::*;
@ -193,14 +194,12 @@ pub type FromDataFuture<'a, T, E> = BoxFuture<'a, Outcome<T, E>>;
/// # struct Name<'a> { first: &'a str, last: &'a str, }
/// use std::io::{self, Read};
///
/// use futures::io::AsyncReadExt;
/// use tokio::io::AsyncReadExt;
///
/// use rocket::{Request, Data, Outcome::*};
/// use rocket::data::{FromData, Outcome, Transform, Transformed, TransformFuture, FromDataFuture};
/// use rocket::http::Status;
///
/// use rocket::AsyncReadExt as _;
///
/// const NAME_LIMIT: u64 = 256;
///
/// enum NameError {
@ -462,14 +461,12 @@ impl<'a> FromData<'a> for Data {
/// #
/// use std::io::Read;
///
/// use futures::io::AsyncReadExt;
/// use tokio::io::AsyncReadExt;
///
/// use rocket::{Request, Data, Outcome, Outcome::*};
/// use rocket::data::{self, FromDataSimple, FromDataFuture};
/// use rocket::http::{Status, ContentType};
///
/// use rocket::AsyncReadExt as _;
///
/// // Always use a limit to prevent DoS attacks.
/// const LIMIT: u64 = 256;
///

View File

@ -1,37 +1,12 @@
use std::io;
use std::io::{self, Cursor};
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::io::{AsyncRead, AsyncReadExt as _};
use futures::future::BoxFuture;
use futures::stream::Stream;
use futures::task::{Poll, Context};
use futures_core::{ready, future::BoxFuture, stream::Stream};
use tokio_io::{AsyncRead, AsyncReadExt as _};
use crate::http::hyper::Chunk;
// Based on std::io::Take, but for AsyncRead instead of Read
pub struct Take<R>{
inner: R,
limit: u64,
}
// TODO.async: Verify correctness of this implementation.
impl<R> AsyncRead for Take<R> where R: AsyncRead + Unpin {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
if self.limit == 0 {
return Poll::Ready(Ok(0));
}
let max = std::cmp::min(buf.len() as u64, self.limit) as usize;
match Pin::new(&mut self.inner).poll_read(cx, &mut buf[..max]) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
self.limit -= n as u64;
Poll::Ready(Ok(n))
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
}
}
}
use crate::http::hyper;
use hyper::{Chunk, Payload};
pub struct IntoChunkStream<R> {
inner: R,
@ -64,10 +39,6 @@ impl<R> Stream for IntoChunkStream<R>
}
pub trait AsyncReadExt: AsyncRead {
fn take(self, limit: u64) -> Take<Self> where Self: Sized {
Take { inner: self, limit }
}
fn into_chunk_stream(self, buf_size: usize) -> IntoChunkStream<Self> where Self: Sized {
IntoChunkStream { inner: self, buf_size, buffer: vec![0; buf_size] }
}
@ -93,3 +64,46 @@ pub trait AsyncReadExt: AsyncRead {
}
impl<T: AsyncRead> AsyncReadExt for T { }
pub struct AsyncReadBody {
inner: hyper::Body,
state: AsyncReadBodyState,
}
enum AsyncReadBodyState {
Pending,
Partial(Cursor<Chunk>),
Done,
}
impl From<hyper::Body> for AsyncReadBody {
fn from(body: hyper::Body) -> Self {
Self { inner: body, state: AsyncReadBodyState::Pending }
}
}
impl AsyncRead for AsyncReadBody {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
loop {
match self.state {
AsyncReadBodyState::Pending => {
match ready!(Pin::new(&mut self.inner).poll_data(cx)) {
Some(Ok(chunk)) => self.state = AsyncReadBodyState::Partial(Cursor::new(chunk)),
Some(Err(e)) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
None => self.state = AsyncReadBodyState::Done,
}
},
AsyncReadBodyState::Partial(ref mut cursor) => {
match ready!(Pin::new(cursor).poll_read(cx, buf)) {
Ok(n) if n == 0 => {
self.state = AsyncReadBodyState::Pending;
}
Ok(n) => return Poll::Ready(Ok(n)),
Err(e) => return Poll::Ready(Err(e)),
}
}
AsyncReadBodyState::Done => return Poll::Ready(Ok(0)),
}
}
}
}

View File

@ -1,6 +1,6 @@
use std::sync::Mutex;
use futures::future::BoxFuture;
use futures_core::future::BoxFuture;
use crate::{Rocket, Request, Response, Data};
use crate::fairing::{Fairing, Kind, Info};

View File

@ -47,7 +47,7 @@
//! of other `Fairings` are not jeopardized. For instance, unless it is made
//! abundantly clear, a fairing should not rewrite every request.
use futures::future::BoxFuture;
use futures_core::future::BoxFuture;
use crate::{Rocket, Request, Response, Data};

View File

@ -1,6 +1,6 @@
//! Types and traits for request and error handlers and their return values.
use futures::future::BoxFuture;
use futures_core::future::BoxFuture;
use crate::data::Data;
use crate::request::Request;

View File

@ -137,7 +137,6 @@ pub use crate::router::Route;
pub use crate::request::{Request, State};
pub use crate::catcher::Catcher;
pub use crate::rocket::Rocket;
pub use ext::AsyncReadExt;
/// Alias to [`Rocket::ignite()`] Creates a new instance of `Rocket`.
pub fn ignite() -> Rocket {

View File

@ -1,12 +1,11 @@
use std::ops::Deref;
use futures::io::AsyncReadExt;
use tokio_io::AsyncReadExt;
use crate::outcome::Outcome::*;
use crate::request::{Request, form::{FromForm, FormItems, FormDataError}};
use crate::data::{Outcome, Transform, Transformed, Data, FromData, TransformFuture, FromDataFuture};
use crate::http::{Status, uri::{Query, FromUriParam}};
use crate::ext::AsyncReadExt as _;
/// A data guard for parsing [`FromForm`] types strictly.
///
@ -200,7 +199,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form<T> {
if !request.content_type().map_or(false, |ct| ct.is_form()) {
warn_!("Form data does not have form content type.");
return Box::pin(futures::future::ready(Transform::Borrowed(Forward(data))));
return Box::pin(futures_util::future::ready(Transform::Borrowed(Forward(data))));
}
let limit = request.limits().forms;
@ -216,7 +215,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for Form<T> {
}
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> {
Box::pin(futures::future::ready(o.borrowed().and_then(|data| {
Box::pin(futures_util::future::ready(o.borrowed().and_then(|data| {
<Form<T>>::from_data(data, true).map(Form)
})))
}

View File

@ -105,7 +105,7 @@ impl<'f, T: FromForm<'f> + Send + 'f> FromData<'f> for LenientForm<T> {
}
fn from_data(_: &Request<'_>, o: Transformed<'f, Self>) -> FromDataFuture<'f, Self, Self::Error> {
Box::pin(futures::future::ready(o.borrowed().and_then(|form| {
Box::pin(futures_util::future::ready(o.borrowed().and_then(|form| {
<Form<T>>::from_data(form, false).map(LenientForm)
})))
}

View File

@ -20,14 +20,12 @@ use yansi::Paint;
/// # #![feature(proc_macro_hygiene)]
/// use std::io;
///
/// use futures::io::AsyncReadExt;
/// use tokio::io::AsyncReadExt;
///
/// # use rocket::post;
/// use rocket::Data;
/// use rocket::response::Debug;
///
/// use rocket::AsyncReadExt as _;
///
/// #[post("/", format = "plain", data = "<data>")]
/// async fn post(data: Data) -> Result<String, Debug<io::Error>> {
/// let mut name = String::with_capacity(32);

View File

@ -49,4 +49,4 @@ pub use self::debug::Debug;
/// Type alias for the `Result` of a `Responder::respond` call.
pub type Result<'r> = std::result::Result<self::Response<'r>, crate::http::Status>;
/// Type alias for the `Result` of a `Responder::respond` call.
pub type ResultFuture<'r> = futures::future::BoxFuture<'r, Result<'r>>;
pub type ResultFuture<'r> = futures_core::future::BoxFuture<'r, Result<'r>>;

View File

@ -1,8 +1,8 @@
use std::fs::File;
use std::io::Cursor;
use futures::io::BufReader;
use futures::future;
use futures_util::future::ready;
use tokio_io::BufReader;
use crate::http::{Status, ContentType, StatusClass};
use crate::response::{self, Response, Body};
@ -255,7 +255,7 @@ impl Responder<'_> for Vec<u8> {
impl Responder<'_> for File {
fn respond_to(self, _: &Request<'_>) -> response::ResultFuture<'static> {
Box::pin(async move {
let file = async_std::fs::File::from(self);
let file = tokio::fs::File::from(self);
let metadata = file.metadata().await;
let stream = BufReader::new(file);
match metadata {
@ -283,7 +283,7 @@ impl<'r, R: Responder<'r> + Send + 'r> Responder<'r> for Option<R> {
Some(r) => r.respond_to(req),
None => {
warn_!("Response was `None`.");
Box::pin(future::err(Status::NotFound))
Box::pin(ready(Err(Status::NotFound)))
},
}
}

View File

@ -1,13 +1,13 @@
use std::{io, fmt, str};
use std::borrow::Cow;
use std::future::Future;
use std::pin::Pin;
use futures::future::{Future, FutureExt};
use futures::io::{AsyncRead, AsyncReadExt};
use tokio_io::{AsyncRead, AsyncReadExt};
use futures_util::future::FutureExt;
use crate::response::{Responder, ResultFuture};
use crate::http::{Header, HeaderMap, Status, ContentType, Cookie};
use crate::ext::AsyncReadExt as _;
/// The default size, in bytes, of a chunk for streamed responses.
pub const DEFAULT_CHUNK_SIZE: u64 = 4096;
@ -346,7 +346,7 @@ impl<'r> ResponseBuilder<'r> {
///
/// ```rust,ignore
/// use rocket::Response;
/// use async_std::fs::File;
/// use tokio::fs::File;
/// # use std::io;
///
/// # #[allow(dead_code)]
@ -372,7 +372,7 @@ impl<'r> ResponseBuilder<'r> {
///
/// ```rust
/// use rocket::Response;
/// use async_std::fs::File;
/// use tokio::fs::File;
/// # use std::io;
///
/// # #[allow(dead_code)]
@ -399,7 +399,7 @@ impl<'r> ResponseBuilder<'r> {
///
/// ```rust
/// use rocket::Response;
/// use async_std::fs::File;
/// use tokio::fs::File;
/// # use std::io;
///
/// # #[allow(dead_code)]
@ -1010,7 +1010,7 @@ impl<'r> Response<'r> {
pub(crate) fn strip_body(&mut self) {
if let Some(body) = self.take_body() {
self.body = match body {
Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::empty()), n)),
Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::Cursor::new(&[])), n)),
Body::Chunked(..) => None
};
}
@ -1057,14 +1057,14 @@ impl<'r> Response<'r> {
/// # Example
///
/// ```rust
/// use std::io::repeat;
/// use futures::io::AsyncReadExt;
/// use futures::io::repeat;
/// use futures_tokio_compat::Compat;
/// use tokio::io::AsyncReadExt;
/// use rocket::Response;
/// use rocket::AsyncReadExt as _;
///
/// # rocket::async_test(async {
/// let mut response = Response::new();
/// response.set_streamed_body(repeat(97).take(5));
/// response.set_streamed_body(Compat::new(repeat(97)).take(5));
/// assert_eq!(response.body_string().await, Some("aaaaa".to_string()));
/// # })
/// ```
@ -1079,14 +1079,14 @@ impl<'r> Response<'r> {
/// # Example
///
/// ```rust
/// use std::io::repeat;
/// use futures::io::AsyncReadExt;
/// use futures::io::repeat;
/// use futures_tokio_compat::Compat;
/// use tokio::io::AsyncReadExt;
/// use rocket::Response;
/// use rocket::AsyncReadExt as _;
///
/// # rocket::async_test(async {
/// let mut response = Response::new();
/// response.set_chunked_body(repeat(97).take(5), 10);
/// response.set_chunked_body(Compat::new(repeat(97)).take(5), 10);
/// assert_eq!(response.body_string().await, Some("aaaaa".to_string()));
/// # })
/// ```

View File

@ -1,6 +1,6 @@
use std::fmt::{self, Debug};
use futures::io::AsyncRead;
use tokio_io::AsyncRead;
use crate::request::Request;
use crate::response::{Response, Responder, ResultFuture, DEFAULT_CHUNK_SIZE};
@ -23,12 +23,10 @@ impl<T: AsyncRead> Stream<T> {
/// bytes. Note: you probably shouldn't do this.
///
/// ```rust
/// use std::io;
/// use futures::io::AllowStdIo;
/// use rocket::response::Stream;
///
/// # #[allow(unused_variables)]
/// let response = Stream::chunked(AllowStdIo::new(io::stdin()), 10);
/// let response = Stream::chunked(tokio::io::stdin(), 10);
/// ```
pub fn chunked(reader: T, chunk_size: u64) -> Stream<T> {
Stream(reader, chunk_size)
@ -49,12 +47,10 @@ impl<T: AsyncRead + Debug> Debug for Stream<T> {
/// shouldn't do this.
///
/// ```rust
/// use std::io;
/// use futures::io::AllowStdIo;
/// use rocket::response::Stream;
///
/// # #[allow(unused_variables)]
/// let response = Stream::from(AllowStdIo::new(io::stdin()));
/// let response = Stream::from(tokio::io::stdin());
/// ```
impl<T: AsyncRead> From<T> for Stream<T> {
fn from(reader: T) -> Self {

View File

@ -5,11 +5,9 @@ use std::io;
use std::mem;
use std::sync::Arc;
use futures::future::{Future, FutureExt, BoxFuture};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use futures::task::{Spawn, SpawnExt};
use futures_tokio_compat::Compat as TokioCompat;
use futures_core::future::{Future, BoxFuture};
use futures_channel::{mpsc, oneshot};
use futures_util::{future::FutureExt, stream::StreamExt};
use yansi::Paint;
use state::Container;
@ -54,7 +52,6 @@ pub struct Rocket {
fn hyper_service_fn(
rocket: Arc<Rocket>,
h_addr: std::net::SocketAddr,
mut spawn: impl futures::task::Spawn,
hyp_req: hyper::Request<hyper::Body>,
) -> impl Future<Output = Result<hyper::Response<hyper::Body>, io::Error>> {
// This future must return a hyper::Response, but that's not easy
@ -63,7 +60,7 @@ fn hyper_service_fn(
// the response metadata (and a body channel) beforehand.
let (tx, rx) = oneshot::channel();
spawn.spawn(async move {
tokio::spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();
@ -89,7 +86,7 @@ fn hyper_service_fn(
// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");
});
async move {
rx.await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
@ -708,11 +705,10 @@ impl Rocket {
}
// TODO.async: Solidify the Listener APIs and make this function public
async fn listen_on<L, S>(mut self, listener: L, spawn: S) -> Result<(), crate::error::Error>
async fn listen_on<L>(mut self, listener: L) -> Result<(), crate::error::Error>
where
L: Listener + Send + Unpin + 'static,
<L as Listener>::Connection: Send + Unpin + 'static,
S: Spawn + Clone + Send + 'static,
{
self = self.prelaunch_check().map_err(crate::error::Error::Launch)?;
@ -754,21 +750,19 @@ impl Rocket {
.take().expect("shutdown receiver has already been used");
let rocket = Arc::new(self);
let spawn_makeservice = spawn.clone();
let service = hyper::make_service_fn(move |connection: &<L as Listener>::Connection| {
let rocket = rocket.clone();
let remote_addr = connection.remote_addr().unwrap_or_else(|| "0.0.0.0".parse().unwrap());
let spawn_service = spawn_makeservice.clone();
async move {
Ok::<_, std::convert::Infallible>(hyper::service_fn(move |req| {
hyper_service_fn(rocket.clone(), remote_addr, spawn_service.clone(), req)
hyper_service_fn(rocket.clone(), remote_addr, req)
}))
}
});
// NB: executor must be passed manually here, see hyperium/hyper#1537
hyper::Server::builder(Incoming::from_listener(listener))
.executor(TokioCompat::new(spawn))
.executor(tokio::executor::DefaultExecutor::current())
.serve(service)
.with_graceful_shutdown(async move { shutdown_receiver.next().await; })
.await
@ -808,10 +802,9 @@ impl Rocket {
let full_addr = format!("{}:{}", self.config.address, self.config.port);
let addrs = match full_addr.to_socket_addrs() {
Ok(a) => a.collect::<Vec<_>>(),
Err(e) => return futures::future::err(Launch(From::from(e))).boxed(),
Err(e) => return futures_util::future::ready(Err(Launch(From::from(e)))).boxed(),
};
let addr = addrs[0];
let spawn = TokioCompat::new(runtime.executor());
#[cfg(feature = "ctrl_c_shutdown")]
let (
@ -824,26 +817,26 @@ impl Rocket {
let server = async move {
macro_rules! listen_on {
($spawn:expr, $expr:expr) => {{
($expr:expr) => {{
let listener = match $expr {
Ok(ok) => ok,
Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))),
};
self.listen_on(listener, spawn).await
self.listen_on(listener).await
}};
}
#[cfg(feature = "tls")]
{
if let Some(tls) = self.config.tls.clone() {
listen_on!(spawn, crate::http::tls::bind_tls(addr, tls.certs, tls.key).await)
listen_on!(crate::http::tls::bind_tls(addr, tls.certs, tls.key).await)
} else {
listen_on!(spawn, crate::http::private::bind_tcp(addr).await)
listen_on!(crate::http::private::bind_tcp(addr).await)
}
}
#[cfg(not(feature = "tls"))]
{
listen_on!(spawn, crate::http::private::bind_tcp(addr).await)
listen_on!(crate::http::private::bind_tcp(addr).await)
}
};
@ -858,7 +851,7 @@ impl Rocket {
runtime.spawn(async move {
// Stop listening for `ctrl_c` if the server shuts down
// a different way to avoid waiting forever.
futures::future::select(
futures_util::future::select(
ctrl_c.next(),
cancel_ctrl_c_listener_receiver,
).await;

View File

@ -3,7 +3,7 @@ mod route;
use std::collections::hash_map::HashMap;
use futures::future::BoxFuture;
use futures_core::future::BoxFuture;
pub use self::route::Route;

View File

@ -1,5 +1,5 @@
use crate::request::{FromRequest, Outcome, Request};
use futures::channel::mpsc;
use futures_channel::mpsc;
/// # Example
///

View File

@ -22,7 +22,7 @@ fn other() -> content::Json<&'static str> {
mod head_handling_tests {
use super::*;
use futures::io::AsyncReadExt;
use tokio_io::{AsyncRead, AsyncReadExt};
use rocket::Route;
use rocket::local::Client;
@ -33,7 +33,7 @@ mod head_handling_tests {
routes![index, empty, other]
}
async fn assert_empty_sized_body<T: futures::AsyncRead + Unpin>(body: Body<T>, expected_size: u64) {
async fn assert_empty_sized_body<T: AsyncRead + Unpin>(body: Body<T>, expected_size: u64) {
match body {
Body::Sized(mut body, size) => {
let mut buffer = vec![];

View File

@ -26,7 +26,7 @@ impl FromDataSimple for HasContentType {
type Error = ();
fn from_data(request: &Request<'_>, data: Data) -> data::FromDataFuture<'static, Self, Self::Error> {
Box::pin(futures::future::ready(if request.content_type().is_some() {
Box::pin(futures_util::future::ready(if request.content_type().is_some() {
Success(HasContentType)
} else {
Forward(data)

View File

@ -6,7 +6,7 @@ edition = "2018"
publish = false
[dependencies]
futures-preview = "0.3.0-alpha.18"
tokio = "0.2.0-alpha.6"
rocket = { path = "../../core/lib" }
serde = "1.0"
serde_json = "1.0"

View File

@ -7,11 +7,10 @@
use std::io;
use futures::io::AsyncReadExt as _;
use tokio::io::AsyncReadExt;
use rocket::{Request, data::Data};
use rocket::response::{Debug, content::{Json, Html}};
use rocket::AsyncReadExt as _;
// NOTE: This example explicitly uses the `Json` type from `response::content`
// for demonstration purposes. In a real application, _always_ prefer to use

View File

@ -7,4 +7,4 @@ publish = false
[dependencies]
rocket = { path = "../../core/lib" }
async-std = "0.99.4"
tokio = "=0.2.0-alpha.6"

View File

@ -4,7 +4,8 @@ extern crate rocket;
mod tests;
use std::env;
use async_std::fs::File;
use tokio::fs::File;
use rocket::{Request, Handler, Route, Data, Catcher, try_outcome};
use rocket::http::{Status, RawStr};

View File

@ -7,5 +7,6 @@ publish = false
[dependencies]
rocket = { path = "../../core/lib" }
futures-preview = "0.3.0-alpha.18"
async-std = "0.99.4"
tokio = "0.2.0-alpha.6"
futures-preview = "0.3.0-alpha.19"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }

View File

@ -6,20 +6,17 @@
use rocket::response::{content, Stream};
use std::io::repeat;
use async_std::fs::File;
use rocket::AsyncReadExt as _;
//type LimitedRepeat = Take<Repeat>;
type LimitedRepeat = Box<dyn futures::io::AsyncRead + Send + Unpin>;
use futures::io::repeat;
use futures_tokio_compat::Compat;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt};
// Generate this file using: head -c BYTES /dev/random > big_file.dat
const FILENAME: &str = "big_file.dat";
#[get("/")]
fn root() -> content::Plain<Stream<LimitedRepeat>> {
content::Plain(Stream::from(Box::new(repeat('a' as u8).take(25000)) as Box<_>))
fn root() -> content::Plain<Stream<impl AsyncRead>> {
content::Plain(Stream::from(Compat::new(repeat('a' as u8)).take(25000)))
}
#[get("/big_file")]