Use futures 0.3-compatible hyper and tokio and the tokio runtime instead

of futures-rs executor.

Despite this change, using body_bytes_wait on (for example) a File will
still fail due to tokio-rs/tokio#1356.
This commit is contained in:
Jeb Rosen 2019-08-19 18:13:49 -07:00 committed by Sergio Benitez
parent 8c8598b4fd
commit af95129590
9 changed files with 80 additions and 116 deletions

View File

@ -22,7 +22,7 @@ private-cookies = ["cookie/private", "cookie/key-expansion"]
[dependencies]
smallvec = "1.0"
percent-encoding = "1"
hyper = { version = "0.12.31", default-features = false, features = ["runtime"] }
hyper = { git = "https://github.com/hyperium/hyper", rev = "a22dabd", default-features = false, features = ["runtime"] }
http = "0.1.17"
mime = "0.3.13"
time = "0.2.11"

View File

@ -5,9 +5,9 @@
//! while necessary.
#[doc(hidden)] pub use hyper::{Body, Request, Response, Server};
#[doc(hidden)] pub use hyper::body::Payload as Payload;
#[doc(hidden)] pub use hyper::body::{Payload, Sender as BodySender};
#[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::service::{make_service_fn, MakeService, Service};
#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, MakeService, Service};
#[doc(hidden)] pub use hyper::server::conn::{AddrIncoming, AddrStream};
#[doc(hidden)] pub use hyper::Chunk;

View File

@ -26,8 +26,9 @@ private-cookies = ["rocket_http/private-cookies"]
[dependencies]
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" }
futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "io-compat"] }
tokio = "0.1.16"
futures-preview = "0.3.0-alpha.18"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }
tokio = "0.2.0-alpha.2"
yansi = "0.5"
log = { version = "0.4", features = ["std"] }
toml = "0.4.7"

View File

@ -1,9 +1,9 @@
use std::path::Path;
use futures::compat::{Future01CompatExt, Stream01CompatExt, AsyncWrite01CompatExt};
use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite};
use futures::future::Future;
use futures::stream::TryStreamExt;
use futures_tokio_compat::Compat as TokioCompat;
use super::data_stream::DataStream;
@ -171,10 +171,10 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path> + Send + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
pub fn stream_to_file<P: AsRef<Path> + Send + Unpin + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
Box::pin(async move {
let file = tokio::fs::File::create(path).compat().await?.compat();
self.stream_to(file).await
let mut file = TokioCompat::new(tokio::fs::File::create(path).await?);
self.stream_to(&mut file).await
})
}
@ -186,7 +186,7 @@ impl Data {
pub(crate) async fn new(body: hyper::Body) -> Data {
trace_!("Data::new({:?})", body);
let mut stream = body.compat().map_err(|e| {
let mut stream = body.map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
}).into_async_read();

View File

@ -154,8 +154,5 @@ pub fn custom(config: config::Config) -> Rocket {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)]
pub fn async_test(fut: impl std::future::Future<Output = ()> + Send + 'static) {
use futures::future::{FutureExt, TryFutureExt};
let mut runtime = tokio::runtime::Runtime::new().expect("create tokio runtime");
runtime.block_on(fut.boxed().unit_error().compat()).expect("unit_error future returned Err");
tokio::runtime::Runtime::new().expect("create tokio runtime").block_on(fut)
}

View File

@ -405,13 +405,13 @@ impl<'c> LocalRequest<'c> {
request.set_uri(uri.into_owned());
} else {
error!("Malformed request URI: {}", uri);
return futures::executor::block_on(async move {
return tokio::runtime::Runtime::new().expect("create runtime").block_on(async move {
let res = client.rocket().handle_error(Status::BadRequest, request).await;
LocalResponse { _request: owned_request, response: res }
})
}
futures::executor::block_on(async move {
tokio::runtime::Runtime::new().expect("create runtime").block_on(async move {
// Actually dispatch the request.
let response = client.rocket().dispatch(request, Data::local(data)).await;
@ -460,11 +460,11 @@ pub struct LocalResponse<'c> {
impl LocalResponse<'_> {
pub fn body_string_wait(&mut self) -> Option<String> {
futures::executor::block_on(self.body_string())
tokio::runtime::Runtime::new().expect("create runtime").block_on(self.body_string())
}
pub fn body_bytes_wait(&mut self) -> Option<Vec<u8>> {
futures::executor::block_on(self.body_bytes())
tokio::runtime::Runtime::new().expect("create runtime").block_on(self.body_bytes())
}
}

View File

@ -1,7 +1,8 @@
use std::fs::File;
use std::io::{Cursor, BufReader};
use std::io::Cursor;
use futures::compat::AsyncRead01CompatExt;
use futures::io::BufReader;
use futures_tokio_compat::Compat as TokioCompat;
use crate::http::{Status, ContentType, StatusClass};
use crate::response::{self, Response, Body};
@ -255,7 +256,7 @@ impl Responder<'_> for File {
fn respond_to(self, _: &Request<'_>) -> response::ResultFuture<'static> {
Box::pin(async move {
let metadata = self.metadata();
let stream = BufReader::new(tokio::fs::File::from_std(self)).compat();
let stream = BufReader::new(TokioCompat::new(tokio::fs::File::from_std(self)));
match metadata {
Ok(md) => Response::build().raw_body(Body::Sized(stream, md.len())).ok(),
Err(_) => Response::build().streamed_body(stream).ok()

View File

@ -8,11 +8,10 @@ use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use futures::compat::{Compat, Executor01CompatExt, Sink01CompatExt};
use futures::future::{Future, FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::future::Future;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use futures_tokio_compat::Compat as TokioCompat;
use yansi::Paint;
use state::Container;
@ -47,76 +46,53 @@ pub struct Rocket {
fairings: Fairings,
}
struct RocketHyperService {
// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
// `dispatch` function, which knows nothing about Hyper. Because responding
// depends on the `HyperResponse` type, this function does the actual
// response processing.
fn hyper_service_fn(
rocket: Arc<Rocket>,
spawn: Box<dyn futures::task::Spawn + Send>,
remote_addr: std::net::SocketAddr,
}
h_addr: std::net::SocketAddr,
mut spawn: impl futures::task::Spawn,
hyp_req: hyper::Request<hyper::Body>,
) -> impl Future<Output = Result<hyper::Response<hyper::Body>, io::Error>> {
// This future must return a hyper::Response, but that's not easy
// because the response body might borrow from the request. Instead,
// we do the body writing in another future that will send us
// the response metadata (and a body channel) beforehand.
let (tx, rx) = futures::channel::oneshot::channel();
impl std::ops::Deref for RocketHyperService {
type Target = Rocket;
spawn.spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();
fn deref(&self) -> &Self::Target {
&*self.rocket
}
}
// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
}
};
#[doc(hidden)]
impl hyper::Service for RocketHyperService {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = io::Error;
type Future = Compat<Pin<Box<dyn Future<Output = Result<hyper::Response<Self::ResBody>, Self::Error>> + Send>>>;
// Retrieve the data from the hyper body.
let data = Data::from_hyp(h_body).await;
// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
// `dispatch` function, which knows nothing about Hyper. Because responding
// depends on the `HyperResponse` type, this function does the actual
// response processing.
fn call<'h>(
&mut self,
hyp_req: hyper::Request<Self::ReqBody>,
) -> Self::Future {
let rocket = self.rocket.clone();
let h_addr = self.remote_addr;
// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");
// This future must return a hyper::Response, but that's not easy
// because the response body might borrow from the request. Instead,
// we do the body writing in another future that will send us
// the response metadata (and a body channel) beforehand.
let (tx, rx) = futures::channel::oneshot::channel();
self.spawn.spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();
// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
}
};
// Retrieve the data from the hyper body.
let data = Data::from_hyp(h_body).await;
// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");
async move {
Ok(rx.await.expect("TODO.async: sender was dropped, error instead"))
}.boxed().compat()
async move {
Ok(rx.await.expect("TODO.async: sender was dropped, error instead"))
}
}
@ -170,40 +146,26 @@ impl Rocket {
}
Some(Body::Sized(body, size)) => {
hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (sender, hyp_body) = hyper::Body::channel();
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(4096);
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});
while let Some(next) = stream.next().await {
sink.send(next?).await?;
futures::future::poll_fn(|cx| sender.poll_ready(cx)).await.expect("TODO.async client gone?");
sender.send_data(next?).expect("send chunk");
}
// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
Some(Body::Chunked(body, chunk_size)) => {
// TODO.async: This is identical to Body::Sized except for the chunk size
let (sender, hyp_body) = hyper::Body::channel();
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});
while let Some(next) = stream.next().await {
sink.send(next?).await?;
futures::future::poll_fn(|cx| sender.poll_ready(cx)).await.expect("TODO.async client gone?");
sender.send_data(next?).expect("send chunk");
}
// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
};
@ -770,7 +732,7 @@ impl Rocket {
// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");
@ -815,13 +777,16 @@ impl Rocket {
logger::pop_max_level();
let rocket = Arc::new(self);
let spawn = Box::new(runtime.executor().compat());
let spawn = Box::new(TokioCompat::new(runtime.executor()));
let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| {
futures::future::ok::<_, Box<dyn std::error::Error + Send + Sync>>(RocketHyperService {
rocket: rocket.clone(),
spawn: spawn.clone(),
remote_addr: socket.remote_addr(),
}).compat()
let rocket = rocket.clone();
let remote_addr = socket.remote_addr();
let spawn = spawn.clone();
async move {
Ok::<_, std::convert::Infallible>(hyper::service_fn(move |req| {
hyper_service_fn(rocket.clone(), remote_addr, spawn.clone(), req)
}))
}
});
// NB: executor must be passed manually here, see hyperium/hyper#1537

View File

@ -37,7 +37,7 @@ mod head_handling_tests {
match body {
Body::Sized(mut body, size) => {
let mut buffer = vec![];
futures::executor::block_on(async {
tokio::runtime::Runtime::new().expect("create runtime").block_on(async {
body.read_to_end(&mut buffer).await.unwrap();
});
assert_eq!(size, expected_size);