Add 'upgrade' example with WebSocket support.

This is an initial example that showcases using the new connection
upgrade API to implement WebSocket support outside of Rocket's core.
This commit is contained in:
Sergio Benitez 2023-03-29 17:01:33 -07:00
parent d97c83d7e0
commit bd482081ad
6 changed files with 178 additions and 0 deletions

View File

@ -15,6 +15,7 @@ members = [
"templating",
"testing",
"tls",
"upgrade",
"pastebin",
"todo",

View File

@ -87,3 +87,6 @@ This directory contains projects showcasing Rocket's features.
* **[`tls`](./tls)** - Illustrates configuring TLS with a variety of key pair
kinds.
* **[`upgrade`](./upgrade)** - Uses the connection upgrade API to implement
WebSocket support using tungstenite.

View File

@ -0,0 +1,10 @@
[package]
name = "upgrade"
version = "0.0.0"
workspace = "../"
edition = "2021"
publish = false
[dependencies]
rocket = { path = "../../core/lib" }
tokio-tungstenite = "0.18"

View File

@ -0,0 +1,69 @@
<!DOCTYPE html>
<head>
<title>WebSocket client test</title>
</head>
<body>
<h1>WebSocket Client Test</h1>
<div id="log"></div>
</body>
<script language="javascript" type="text/javascript">
var wsUri = "ws://127.0.0.1:8000/echo/";
var log;
function init()
{
log = document.getElementById("log");
testWebSocket();
}
function testWebSocket()
{
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) { onOpen(evt) };
websocket.onclose = function(evt) { onClose(evt) };
websocket.onmessage = function(evt) { onMessage(evt) };
websocket.onerror = function(evt) { onError(evt) };
}
function onOpen(evt)
{
writeLog("CONNECTED");
sendMessage("Hello world");
}
function onClose(evt)
{
writeLog("Websocket DISCONNECTED");
}
function onMessage(evt)
{
writeLog('<span style="color: blue;">RESPONSE: ' + evt.data+'</span>');
websocket.close();
}
function onError(evt)
{
writeLog('<span style="color: red;">ERROR:</span> ' + evt.data);
}
function sendMessage(message)
{
writeLog("SENT: " + message);
websocket.send(message);
}
function writeLog(message)
{
var pre = document.createElement("p");
pre.innerHTML = message;
log.appendChild(pre);
}
window.addEventListener("load", init, false);
</script>

View File

@ -0,0 +1,28 @@
#[macro_use] extern crate rocket;
use rocket::futures::{SinkExt, StreamExt};
use rocket::response::content::RawHtml;
mod ws;
#[get("/")]
fn index() -> RawHtml<&'static str> {
RawHtml(include_str!("../index.html"))
}
#[get("/echo")]
fn echo(ws: ws::WebSocket) -> ws::Channel {
ws.channel(|mut stream| Box::pin(async move {
while let Some(message) = stream.next().await {
let _ = stream.send(message?).await;
}
Ok(())
}))
}
#[launch]
fn rocket() -> _ {
rocket::build()
.mount("/", routes![index, echo])
}

View File

@ -0,0 +1,67 @@
use std::io;
use rocket::{Request, response};
use rocket::data::{IoHandler, IoStream};
use rocket::request::{FromRequest, Outcome};
use rocket::response::{Responder, Response};
use rocket::futures::future::BoxFuture;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::handshake::derive_accept_key;
use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_tungstenite::tungstenite::error::{Result, Error};
pub struct WebSocket(String);
#[rocket::async_trait]
impl<'r> FromRequest<'r> for WebSocket {
type Error = std::convert::Infallible;
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
use rocket::http::uncased::eq;
let headers = req.headers();
let is_upgrade = headers.get_one("Connection").map_or(false, |c| eq(c, "upgrade"));
let is_ws = headers.get("Upgrade").any(|p| eq(p, "websocket"));
let is_ws_13 = headers.get_one("Sec-WebSocket-Version").map_or(false, |v| v == "13");
let key = headers.get_one("Sec-WebSocket-Key").map(|k| derive_accept_key(k.as_bytes()));
match key {
Some(key) if is_upgrade && is_ws && is_ws_13 => Outcome::Success(WebSocket(key)),
Some(_) | None => Outcome::Forward(())
}
}
}
pub struct Channel {
ws: WebSocket,
handler: Box<dyn FnMut(WebSocketStream<IoStream>) -> BoxFuture<'static, Result<()>> + Send>,
}
impl WebSocket {
pub fn channel<F: Send + 'static>(self, handler: F) -> Channel
where F: FnMut(WebSocketStream<IoStream>) -> BoxFuture<'static, Result<()>>
{
Channel { ws: self, handler: Box::new(handler), }
}
}
impl<'r> Responder<'r, 'static> for Channel {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
Response::build()
.raw_header("Sec-Websocket-Version", "13")
.raw_header("Sec-WebSocket-Accept", self.ws.0.clone())
.upgrade("websocket", self)
.ok()
}
}
#[rocket::async_trait]
impl IoHandler for Channel {
async fn io(&mut self, io: IoStream) -> io::Result<()> {
let stream = WebSocketStream::from_raw_socket(io, Role::Server, None).await;
(self.handler)(stream).await.map_err(|e| match e {
Error::Io(e) => e,
other => io::Error::new(io::ErrorKind::Other, other)
})
}
}