diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index da0f9be6..9e03a6fd 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -143,11 +143,9 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to<'w, W: AsyncWrite + Unpin + 'w>(self, mut writer: W) -> impl Future> + 'w { - Box::pin(async move { - let mut stream = self.open(); - tokio::io::copy(&mut stream, &mut writer).await - }) + pub async fn stream_to(self, mut writer: W) -> io::Result { + let mut stream = self.open(); + tokio::io::copy(&mut stream, &mut writer).await } /// A helper method to write the body of the request to a file at the path @@ -168,11 +166,9 @@ impl Data { /// } /// ``` #[inline(always)] - pub fn stream_to_file + Send + Unpin + 'static>(self, path: P) -> impl Future> { - Box::pin(async move { - let mut file = tokio::fs::File::create(path).await?; - self.stream_to(&mut file).await - }) + pub async fn stream_to_file>(self, path: P) -> io::Result { + let mut file = tokio::fs::File::create(path).await?; + self.stream_to(&mut file).await } // Creates a new data object with an internal buffer `buf`, where the cursor diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 1598ec6c..139a3cd3 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -95,75 +95,71 @@ fn hyper_service_fn( impl Rocket { #[inline] - fn issue_response<'r>( + async fn issue_response( &self, - response: Response<'r>, + response: Response<'_>, tx: oneshot::Sender>, - ) -> impl Future + 'r { + ) { let result = self.write_response(response, tx); - async move { - match result.await { - Ok(()) => { - info_!("{}", Paint::green("Response succeeded.")); - } - Err(e) => { - error_!("Failed to write response: {:?}.", e); - } + match result.await { + Ok(()) => { + info_!("{}", Paint::green("Response succeeded.")); + } + Err(e) => { + error_!("Failed to write response: {:?}.", e); } } } #[inline] - fn write_response<'r>( + async fn write_response( &self, - mut response: Response<'r>, + mut response: Response<'_>, tx: oneshot::Sender>, - ) -> impl Future> + 'r { - async move { - let mut hyp_res = hyper::Response::builder() - .status(response.status().code); + ) -> io::Result<()> { + let mut hyp_res = hyper::Response::builder() + .status(response.status().code); - for header in response.headers().iter() { - let name = header.name.as_str(); - let value = header.value.as_bytes(); - hyp_res = hyp_res.header(name, value); - } - - let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { - let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - tx.send(response).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Client disconnected before the response was started")) - }; - - match response.body() { - None => { - hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0"); - send_response(hyp_res, hyper::Body::empty())?; - } - Some(Body::Sized(body, size)) => { - hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string()); - let (mut sender, hyp_body) = hyper::Body::channel(); - send_response(hyp_res, hyp_body)?; - - let mut stream = body.into_bytes_stream(4096); - while let Some(next) = stream.next().await { - sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - } - } - Some(Body::Chunked(body, chunk_size)) => { - // TODO.async: This is identical to Body::Sized except for the chunk size - - let (mut sender, hyp_body) = hyper::Body::channel(); - send_response(hyp_res, hyp_body)?; - - let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow")); - while let Some(next) = stream.next().await { - sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - } - } - }; - - Ok(()) + for header in response.headers().iter() { + let name = header.name.as_str(); + let value = header.value.as_bytes(); + hyp_res = hyp_res.header(name, value); } + + let send_response = move |hyp_res: hyper::ResponseBuilder, body| -> io::Result<()> { + let response = hyp_res.body(body).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + tx.send(response).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Client disconnected before the response was started")) + }; + + match response.body() { + None => { + hyp_res = hyp_res.header(header::CONTENT_LENGTH, "0"); + send_response(hyp_res, hyper::Body::empty())?; + } + Some(Body::Sized(body, size)) => { + hyp_res = hyp_res.header(header::CONTENT_LENGTH, size.to_string()); + let (mut sender, hyp_body) = hyper::Body::channel(); + send_response(hyp_res, hyp_body)?; + + let mut stream = body.into_bytes_stream(4096); + while let Some(next) = stream.next().await { + sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + } + } + Some(Body::Chunked(body, chunk_size)) => { + // TODO.async: This is identical to Body::Sized except for the chunk size + + let (mut sender, hyp_body) = hyper::Body::channel(); + send_response(hyp_res, hyp_body)?; + + let mut stream = body.into_bytes_stream(chunk_size.try_into().expect("u64 -> usize overflow")); + while let Some(next) = stream.next().await { + sender.send_data(next?).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + } + } + }; + + Ok(()) } }