Use async fn instead of impl Future in a few methods in 'Data' and 'Rocket'.

This commit is contained in:
Jeb Rosen 2019-12-17 21:55:37 -08:00 committed by Sergio Benitez
parent f442642ec2
commit 9a16aeb2e0
2 changed files with 59 additions and 67 deletions

View File

@ -143,11 +143,9 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future<Output = io::Result<u64>> + 'w {
Box::pin(async move {
let mut stream = self.open();
tokio::io::copy(&mut stream, &mut writer).await
})
pub async fn stream_to<W: AsyncWrite + Unpin>(self, mut writer: W) -> io::Result<u64> {
let mut stream = self.open();
tokio::io::copy(&mut stream, &mut writer).await
}
/// A helper method to write the body of the request to a file at the path
@ -168,11 +166,9 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path> + Send + Unpin + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
Box::pin(async move {
let mut file = tokio::fs::File::create(path).await?;
self.stream_to(&mut file).await
})
pub async fn stream_to_file<P: AsRef<Path>>(self, path: P) -> io::Result<u64> {
let mut file = tokio::fs::File::create(path).await?;
self.stream_to(&mut file).await
}
// Creates a new data object with an internal buffer `buf`, where the cursor

View File

@ -95,75 +95,71 @@ fn hyper_service_fn(
impl Rocket {
#[inline]
fn issue_response<'r>(
async fn issue_response(
&self,
response: Response<'r>,
response: Response<'_>,
tx: oneshot::Sender<hyper::Response<hyper::Body>>,
) -> impl Future<Output = ()> + 'r {
) {
let result = self.write_response(response, tx);
async move {
match result.await {
Ok(()) => {
info_!("{}", Paint::green("Response succeeded."));
}
Err(e) => {
error_!("Failed to write response: {:?}.", e);
}
match result.await {
Ok(()) => {
info_!("{}", Paint::green("Response succeeded."));
}
Err(e) => {
error_!("Failed to write response: {:?}.", e);
}
}
}
#[inline]
fn write_response<'r>(
async fn write_response(
&self,
mut response: Response<'r>,
mut response: Response<'_>,
tx: oneshot::Sender<hyper::Response<hyper::Body>>,
) -> impl Future<Output = io::Result<()>> + 'r {
async move {
let mut hyp_res = hyper::Response::builder()
.status(response.status().code);
) -> io::Result<()> {
let mut hyp_res = hyper::Response::builder()
.status(response.status().code);
for header in response.headers().iter() {
let name = header.name.as_str();
let value = header.value.as_bytes();
hyp_res = hyp_res.header(name, value);
}
let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> {
let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tx.send(response).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Client disconnected before the response was started"))
};
match response.body() {
None => {
hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0");
send_response(hyp_res, hyper::Body::empty())?;
}
Some(Body::Sized(body, size)) => {
hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_bytes_stream(4096);
while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
}
Some(Body::Chunked(body, chunk_size)) => {
// TODO.async: This is identical to Body::Sized except for the chunk size
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
}
};
Ok(())
for header in response.headers().iter() {
let name = header.name.as_str();
let value = header.value.as_bytes();
hyp_res = hyp_res.header(name, value);
}
let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> {
let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tx.send(response).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Client disconnected before the response was started"))
};
match response.body() {
None => {
hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0");
send_response(hyp_res, hyper::Body::empty())?;
}
Some(Body::Sized(body, size)) => {
hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_bytes_stream(4096);
while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
}
Some(Body::Chunked(body, chunk_size)) => {
// TODO.async: This is identical to Body::Sized except for the chunk size
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
while let Some(next) = stream.next().await {
sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
}
}
};
Ok(())
}
}