Initial connection upgrade API implementation.

This commit is contained in:
Mai-Lapyst 2023-03-10 22:15:10 +01:00 committed by Sergio Benitez
parent 9584edcf59
commit 19e7e82fd6
5 changed files with 85 additions and 4 deletions

View File

@ -5,7 +5,7 @@
//! while necessary. //! while necessary.
pub use hyper::{Method, Error, Body, Uri, Version, Request, Response}; pub use hyper::{Method, Error, Body, Uri, Version, Request, Response};
pub use hyper::{body, server, service}; pub use hyper::{body, server, service, upgrade};
pub use http::{HeaderValue, request, uri}; pub use http::{HeaderValue, request, uri};
/// Reexported Hyper HTTP header types. /// Reexported Hyper HTTP header types.

View File

@ -123,6 +123,7 @@ pub use time;
#[doc(hidden)] pub mod sentinel; #[doc(hidden)] pub mod sentinel;
pub mod local; pub mod local;
pub mod request; pub mod request;
pub mod upgrade;
pub mod response; pub mod response;
pub mod config; pub mod config;
pub mod form; pub mod form;
@ -175,6 +176,7 @@ mod rocket;
mod router; mod router;
mod phase; mod phase;
#[doc(inline)] pub use crate::upgrade::Upgrade;
#[doc(inline)] pub use crate::response::Response; #[doc(inline)] pub use crate::response::Response;
#[doc(inline)] pub use crate::data::Data; #[doc(inline)] pub use crate::data::Data;
#[doc(inline)] pub use crate::config::Config; #[doc(inline)] pub use crate::config::Config;

View File

@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncSeek};
use crate::http::{Header, HeaderMap, Status, ContentType, Cookie}; use crate::http::{Header, HeaderMap, Status, ContentType, Cookie};
use crate::response::Body; use crate::response::Body;
use crate::upgrade::Upgrade;
/// Builder for the [`Response`] type. /// Builder for the [`Response`] type.
/// ///
@ -261,6 +262,13 @@ impl<'r> Builder<'r> {
self self
} }
/// Sets the upgrade of the `Response`.
#[inline(always)]
pub fn upgrade(&mut self, upgrade: Option<Box<dyn Upgrade<'static> + Send>>) -> &mut Builder<'r> {
self.response.set_upgrade(upgrade);
self
}
/// Sets the max chunk size of a body, if any, to `size`. /// Sets the max chunk size of a body, if any, to `size`.
/// ///
/// See [`Response::set_max_chunk_size()`] for notes. /// See [`Response::set_max_chunk_size()`] for notes.
@ -413,6 +421,7 @@ pub struct Response<'r> {
status: Option<Status>, status: Option<Status>,
headers: HeaderMap<'r>, headers: HeaderMap<'r>,
body: Body<'r>, body: Body<'r>,
upgrade: Option<Box<dyn Upgrade<'static> + Send>>,
} }
impl<'r> Response<'r> { impl<'r> Response<'r> {
@ -807,6 +816,27 @@ impl<'r> Response<'r> {
self.body = Body::with_unsized(body); self.body = Body::with_unsized(body);
} }
/// Returns a instance of the `Upgrade`-trait when the `Response` is upgradeable
#[inline(always)]
pub fn upgrade(&self) -> Option<&Box<dyn Upgrade<'static> + Send>> {
self.upgrade.as_ref()
}
/// Takes the upgrade out of the response, leaving a [`None`] in it's place.
/// With this, the caller takes ownership about the `Upgrade`-trait.
#[inline(always)]
pub fn take_upgrade(&mut self) -> Option<Box<dyn Upgrade<'static> + Send>> {
self.upgrade.take()
}
/// Sets the upgrade contained in this `Response`
///
/// While the response also need to have status 101 SwitchingProtocols in order to be a valid upgrade,
/// this method doesn't set this, and it's expected that the caller sets this.
pub fn set_upgrade(&mut self, upgrade: Option<Box<dyn Upgrade<'static> + Send>>) {
self.upgrade = upgrade;
}
/// Sets the body's maximum chunk size to `size` bytes. /// Sets the body's maximum chunk size to `size` bytes.
/// ///
/// The default max chunk size is [`Body::DEFAULT_MAX_CHUNK`]. The max chunk /// The default max chunk size is [`Body::DEFAULT_MAX_CHUNK`]. The max chunk

View File

@ -64,7 +64,7 @@ async fn handle<Fut, T, F>(name: Option<&str>, run: F) -> Option<T>
async fn hyper_service_fn( async fn hyper_service_fn(
rocket: Arc<Rocket<Orbit>>, rocket: Arc<Rocket<Orbit>>,
conn: ConnectionMeta, conn: ConnectionMeta,
hyp_req: hyper::Request<hyper::Body>, mut hyp_req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, io::Error> { ) -> Result<hyper::Response<hyper::Body>, io::Error> {
// This future must return a hyper::Response, but the response body might // This future must return a hyper::Response, but the response body might
// borrow from the request. Instead, write the body in another future that // borrow from the request. Instead, write the body in another future that
@ -72,6 +72,9 @@ async fn hyper_service_fn(
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
// Upgrade before do any other; we handle errors below
let hyp_upgraded = hyper::upgrade::on(&mut hyp_req);
// Convert a Hyper request into a Rocket request. // Convert a Hyper request into a Rocket request.
let (h_parts, mut h_body) = hyp_req.into_parts(); let (h_parts, mut h_body) = hyp_req.into_parts();
match Request::from_hyp(&rocket, &h_parts, Some(conn)) { match Request::from_hyp(&rocket, &h_parts, Some(conn)) {
@ -79,8 +82,40 @@ async fn hyper_service_fn(
// Convert into Rocket `Data`, dispatch request, write response. // Convert into Rocket `Data`, dispatch request, write response.
let mut data = Data::from(&mut h_body); let mut data = Data::from(&mut h_body);
let token = rocket.preprocess_request(&mut req, &mut data).await; let token = rocket.preprocess_request(&mut req, &mut data).await;
let response = rocket.dispatch(token, &mut req, data).await; let mut response = rocket.dispatch(token, &req, data).await;
rocket.send_response(response, tx).await;
if response.status() == Status::SwitchingProtocols {
let may_upgrade = response.take_upgrade();
match may_upgrade {
Some(upgrade) => {
// send the finishing response; needed so that hyper can upgrade the request
rocket.send_response(response, tx).await;
match hyp_upgraded.await {
Ok(hyp_upgraded) => {
// let the upgrade take the upgraded hyper request
let fu = upgrade.start(hyp_upgraded);
fu.await;
}
Err(e) => {
error_!("Failed to upgrade request: {e}");
// NOTE: we *should* send a response here but since we send one earlier AND upgraded the request,
// this cannot be done easily at this point...
// let response = rocket.handle_error(Status::InternalServerError, &req).await;
// rocket.send_response(response, tx).await;
}
}
}
None => {
error_!("Status is 101 switching protocols, but response dosn't hold a upgrade");
let response = rocket.handle_error(Status::InternalServerError, &req).await;
rocket.send_response(response, tx).await;
}
}
} else {
rocket.send_response(response, tx).await;
}
}, },
Err(e) => { Err(e) => {
warn!("Bad incoming HTTP request."); warn!("Bad incoming HTTP request.");

View File

@ -0,0 +1,14 @@
//! Upgrade wrapper to deal with hyper::upgarde::Upgraded
use crate::http::hyper;
/// Trait to determine if any given response in rocket is upgradeable.
///
/// When a response has the http code 101 SwitchingProtocols, and the response implements the Upgrade trait,
/// then rocket aquires the hyper::upgarde::Upgraded struct and calls the start() method of the trait with the hyper upgrade
/// and awaits the result.
#[crate::async_trait]
pub trait Upgrade<'a> {
/// Called with the hyper::upgarde::Upgraded struct when a rocket response should be upgraded
async fn start(&self, upgraded: hyper::upgrade::Upgraded);
}