Warn, don't error, if remote hangs up.

This commit is contained in:
Sergio Benitez 2021-06-26 12:05:04 -07:00
parent ef303d44f8
commit d34195fe11
1 changed files with 39 additions and 38 deletions

View File

@ -37,7 +37,7 @@ async fn handle<Fut, T, F>(name: Option<&str>, run: F) -> Option<T>
info_!("A panic in Rust must be treated as an exceptional event.");
info_!("Panicking is not a suitable error handling mechanism.");
info_!("Unwinding, the result of a panic, is an expensive operation.");
info_!("Panics will severely degrade application performance.");
info_!("Panics will degrade application performance.");
info_!("Instead of panicking, return `Option` and/or `Result`.");
info_!("Values of either type can be returned directly from handlers.");
warn_!("A panic is treated as an internal server error.");
@ -74,86 +74,87 @@ async fn hyper_service_fn(
tokio::spawn(async move {
// Convert a Hyper request into a Rocket request.
let (h_parts, mut h_body) = hyp_req.into_parts();
let mut req = match Request::from_hyp(&rocket, &h_parts, addr) {
Ok(req) => req,
match Request::from_hyp(&rocket, &h_parts, addr) {
Ok(mut req) => {
// Convert into Rocket `Data`, dispatch request, write response.
let mut data = Data::from(&mut h_body);
let token = rocket.preprocess_request(&mut req, &mut data).await;
let response = rocket.dispatch(token, &mut req, data).await;
rocket.send_response(response, tx).await;
},
Err(e) => {
// TODO: We don't have a request to pass in, so we fabricate
// one. This is weird. Instead, let the user know that we failed
// to parse a request (a special handler?).
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::ROOT);
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.send_response(r, tx).await;
let response = rocket.handle_error(Status::BadRequest, &dummy).await;
rocket.send_response(response, tx).await;
}
};
// Retrieve the data from the hyper body.
let mut data = Data::from(&mut h_body);
// Dispatch the request to get a response, then write that response out.
let token = rocket.preprocess_request(&mut req, &mut data).await;
let r = rocket.dispatch(token, &mut req, data).await;
rocket.send_response(r, tx).await;
}
});
// Receive the response written to `tx` by the task above.
rx.await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
rx.await.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))
}
impl Rocket<Orbit> {
/// Wrapper around `make_response` to log a success or failure.
/// Wrapper around `_send_response` to log a success or failure.
#[inline]
async fn send_response(
&self,
response: Response<'_>,
tx: oneshot::Sender<hyper::Response<hyper::Body>>,
) {
match self.make_response(response, tx).await {
let remote_hungup = |e: &io::Error| match e.kind() {
| io::ErrorKind::BrokenPipe
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted => true,
_ => false,
};
match self._send_response(response, tx).await {
Ok(()) => info_!("{}", Paint::green("Response succeeded.")),
Err(e) => error_!("Failed to write response: {}.", e),
Err(e) if remote_hungup(&e) => warn_!("Remote left: {}.", e),
Err(e) => warn_!("Failed to write response: {}.", e),
}
}
/// Attempts to create a hyper response from `response` and send it to `tx`.
#[inline]
async fn make_response(
async fn _send_response(
&self,
mut response: Response<'_>,
tx: oneshot::Sender<hyper::Response<hyper::Body>>,
) -> io::Result<()> {
let mut hyp_res = hyper::Response::builder()
.status(response.status().code);
let mut hyp_res = hyper::Response::builder();
hyp_res = hyp_res.status(response.status().code);
for header in response.headers().iter() {
let name = header.name.as_str();
let value = header.value.as_bytes();
hyp_res = hyp_res.header(name, value);
}
let send_response = move |res: hyper::ResponseBuilder, body| -> io::Result<()> {
let response = res.body(body)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tx.send(response).map_err(|_| {
let msg = "client disconnected before the response was started";
io::Error::new(io::ErrorKind::BrokenPipe, msg)
})
};
let body = response.body_mut();
if let Some(n) = body.size().await {
hyp_res = hyp_res.header(hyper::header::CONTENT_LENGTH, n);
}
let max_chunk_size = body.max_chunk_size();
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;
let hyp_response = hyp_res.body(hyp_body)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
tx.send(hyp_response).map_err(|_| {
let msg = "client disconnect before response started";
io::Error::new(io::ErrorKind::BrokenPipe, msg)
})?;
let max_chunk_size = body.max_chunk_size();
let mut stream = body.into_bytes_stream(max_chunk_size);
while let Some(next) = stream.next().await {
sender.send_data(next?).await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
.map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))?;
}
Ok(())