From f1ecb79a7ea536bacc7b7e92d33bf5406c0c635f Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Tue, 1 Jun 2021 13:15:05 -0700 Subject: [PATCH] Add fully featured SSE chat example. --- examples/Cargo.toml | 1 + examples/README.md | 6 ++ examples/chat/Cargo.toml | 12 +++ examples/chat/src/main.rs | 57 ++++++++++ examples/chat/src/tests.rs | 115 ++++++++++++++++++++ examples/chat/static/index.html | 49 +++++++++ examples/chat/static/reset.css | 1 + examples/chat/static/script.js | 172 +++++++++++++++++++++++++++++ examples/chat/static/style.css | 186 ++++++++++++++++++++++++++++++++ 9 files changed, 599 insertions(+) create mode 100644 examples/chat/Cargo.toml create mode 100644 examples/chat/src/main.rs create mode 100644 examples/chat/src/tests.rs create mode 100644 examples/chat/static/index.html create mode 100644 examples/chat/static/reset.css create mode 100644 examples/chat/static/script.js create mode 100644 examples/chat/static/style.css diff --git a/examples/Cargo.toml b/examples/Cargo.toml index de5ec7df..34d66552 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,4 +18,5 @@ members = [ "pastebin", "todo", + "chat", ] diff --git a/examples/README.md b/examples/README.md index 0fb2ebcc..dddea7b1 100644 --- a/examples/README.md +++ b/examples/README.md @@ -16,6 +16,12 @@ This directory contains projects showcasing Rocket's features. SQLite database driven by diesel. Runs migrations automatically at start-up. Uses tera to render templates. + * **[`chat`](./chat)** + + A real-time, multi-room chat application using Server-Sent Events (SSE) and + JavaScript's `EventSource`. Supports automatic reconnection with exponential + backoff and live connection status. + ## Feature Examples * **[`config`](./config)** - Illustrates how to extract values from a Rocket diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml new file mode 100644 index 00000000..1397141c --- /dev/null +++ b/examples/chat/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "chat" +version = "0.0.0" +workspace = "../" +edition = "2018" +publish = false + +[dependencies] +rocket = { path = "../../core/lib", features = ["json"] } + +[dev-dependencies] +rand = "0.8" diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs new file mode 100644 index 00000000..17226e60 --- /dev/null +++ b/examples/chat/src/main.rs @@ -0,0 +1,57 @@ +//! Implements a chat server using async rocket and SSE. + +#[macro_use] extern crate rocket; + +#[cfg(test)] mod tests; + +use rocket::{State, Shutdown}; +use rocket::form::Form; +use rocket::fs::{relative, FileServer}; +use rocket::response::stream::{EventStream, Event}; +use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError}; +use rocket::tokio::select; +use rocket::serde::{Serialize, Deserialize}; + +#[derive(Debug, PartialEq, Clone, FromForm, Deserialize, Serialize, UriDisplayQuery)] +#[serde(crate = "rocket::serde")] +struct Message { + #[field(validate = len(..30))] + pub room: String, + #[field(validate = len(..20))] + pub username: String, + pub message: String, +} + +#[get("/events")] +async fn events(queue: &State>, mut end: Shutdown) -> EventStream![] { + // Subscribe to messages and map it to an SSE stream + let mut rx = queue.subscribe(); + EventStream! { + loop { + let msg = select! { + msg = rx.recv() => match msg { + Ok(msg) => msg, + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => continue, + }, + _ = &mut end => break, + }; + + yield Event::json(&msg); + } + } +} + +#[post("/message", data = "
")] +fn post(form: Form, queue: &State>) { + // A send 'fails' if there are no active subscribers. That's okay. + let _ = queue.send(form.into_inner()); +} + +#[launch] +fn rocket() -> _ { + rocket::build() + .manage(channel::(1024).0) + .mount("/", routes![post, events]) + .mount("/", FileServer::from(relative!("static"))) +} diff --git a/examples/chat/src/tests.rs b/examples/chat/src/tests.rs new file mode 100644 index 00000000..0b77ef76 --- /dev/null +++ b/examples/chat/src/tests.rs @@ -0,0 +1,115 @@ +use std::ops::Range; + +use rand::{thread_rng, Rng}; +use rand::distributions::Alphanumeric; + +use rocket::http::{ContentType, Status}; +use rocket::http::uri::fmt::{UriDisplay, Query}; +use rocket::local::asynchronous::{Client, LocalResponse}; + +use rocket::tokio::{sync, join}; +use rocket::tokio::io::{BufReader, AsyncBufReadExt}; +use rocket::serde::json; + +use super::*; + +async fn send_message<'c>(client: &'c Client, message: &Message) -> LocalResponse<'c> { + client.post(uri!(post)) + .header(ContentType::Form) + .body((message as &dyn UriDisplay).to_string()) + .dispatch() + .await +} + +fn gen_string(len: Range) -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(thread_rng().gen_range(len)) + .map(char::from) + .collect() +} + +#[async_test] +async fn messages() { + let client = Client::tracked(rocket()).await.unwrap(); + let start_barrier = sync::Barrier::new(2); + + let shutdown_message = Message { + room: ":control".into(), + username: ":control".into(), + message: "shutdown".into(), + }; + + // Generate somewhere between 75 and 100 messages. + let mut test_messages = vec![]; + for _ in 0..thread_rng().gen_range(75..100) { + test_messages.push(Message { + room: gen_string(10..30), + username: gen_string(10..20), + message: gen_string(10..100), + }) + } + + let send_messages = async { + // Wait for the other task to start listening. + start_barrier.wait().await; + + // Send all of the messages. + for message in &test_messages { + send_message(&client, message).await; + } + + // Send the special "shutdown" message. + send_message(&client, &shutdown_message).await; + }; + + let receive_messages = async { + let response = client.get(uri!(events)).dispatch().await; + + // We have the response stream. Let the receiver know to start sending. + start_barrier.wait().await; + + let mut messages = vec![]; + let mut reader = BufReader::new(response).lines(); + while let Ok(Some(line)) = reader.next_line().await { + if !line.starts_with("data:") { + continue; + } + + let data: Message = json::from_str(&line[5..]).expect("message JSON"); + if &data == &shutdown_message { + // Test shutdown listening: this should end the stream. + client.rocket().shutdown().notify(); + continue; + } + + messages.push(data); + } + + messages + }; + + let received_messages = join!(send_messages, receive_messages).1; + assert!(test_messages.len() >= 75); + assert_eq!(test_messages, received_messages); +} + +#[async_test] +async fn bad_messages() { + // Generate a bunch of bad messages. + let mut bad_messages = vec![]; + for _ in 0..thread_rng().gen_range(75..100) { + bad_messages.push(Message { + room: gen_string(30..40), + username: gen_string(20..30), + message: gen_string(10..100), + }); + } + + // Ensure they all result in a rejected request. + let client = Client::tracked(rocket()).await.unwrap(); + for message in &bad_messages { + let response = send_message(&client, message).await; + assert_eq!(response.status(), Status::PayloadTooLarge); + } +} diff --git a/examples/chat/static/index.html b/examples/chat/static/index.html new file mode 100644 index 00000000..d0a556e0 --- /dev/null +++ b/examples/chat/static/index.html @@ -0,0 +1,49 @@ + + + + + Rocket Rooms + + + + + +
+ + +
+ +
+ +
+ +
+ + + +
+
+
+ + diff --git a/examples/chat/static/reset.css b/examples/chat/static/reset.css new file mode 100644 index 00000000..df402bc9 --- /dev/null +++ b/examples/chat/static/reset.css @@ -0,0 +1 @@ +html,body,p,ol,ul,li,dl,dt,dd,blockquote,figure,fieldset,legend,textarea,pre,iframe,hr,h1,h2,h3,h4,h5,h6{margin:0;padding:0}h1,h2,h3,h4,h5,h6{font-size:100%;font-weight:normal}ul{list-style:none}button,input,select{margin:0}html{box-sizing:border-box}*,*::before,*::after{box-sizing:inherit}img,video{height:auto;max-width:100%}iframe,button,input{border:0}table{border-collapse:collapse;border-spacing:0}td,th{padding:0} diff --git a/examples/chat/static/script.js b/examples/chat/static/script.js new file mode 100644 index 00000000..d78e3e6a --- /dev/null +++ b/examples/chat/static/script.js @@ -0,0 +1,172 @@ +let roomListDiv = document.getElementById('room-list'); +let messagesDiv = document.getElementById('messages'); +let newMessageForm = document.getElementById('new-message'); +let newRoomForm = document.getElementById('new-room'); +let statusDiv = document.getElementById('status'); + +let roomTemplate = document.getElementById('room'); +let messageTemplate = document.getElementById('message'); + +let messageField = newMessageForm.querySelector("#message"); +let usernameField = newMessageForm.querySelector("#username"); +let roomNameField = newRoomForm.querySelector("#name"); + +var STATE = { + room: "lobby", + rooms: {}, + connected: false, +} + +// Generate a color from a "hash" of a string. Thanks, internet. +function hashColor(str) { + let hash = 0; + for (var i = 0; i < str.length; i++) { + hash = str.charCodeAt(i) + ((hash << 5) - hash); + hash = hash & hash; + } + + return `hsl(${hash % 360}, 100%, 70%)`; +} + +// Add a new room `name` and change to it. Returns `true` if the room didn't +// already exist and false otherwise. +function addRoom(name) { + if (STATE[name]) { + changeRoom(name); + return false; + } + + var node = roomTemplate.content.cloneNode(true); + var room = node.querySelector(".room"); + room.addEventListener("click", () => changeRoom(name)); + room.textContent = name; + room.dataset.name = name; + roomListDiv.appendChild(node); + + STATE[name] = []; + changeRoom(name); + return true; +} + +// Change the current room to `name`, restoring its messages. +function changeRoom(name) { + if (STATE.room == name) return; + + var newRoom = roomListDiv.querySelector(`.room[data-name='${name}']`); + var oldRoom = roomListDiv.querySelector(`.room[data-name='${STATE.room}']`); + if (!newRoom || !oldRoom) return; + + STATE.room = name; + oldRoom.classList.remove("active"); + newRoom.classList.add("active"); + + messagesDiv.querySelectorAll(".message").forEach((msg) => { + messagesDiv.removeChild(msg) + }); + + STATE[name].forEach((data) => addMessage(name, data.username, data.message)) +} + +// Add `message` from `username` to `room`. If `push`, then actually store the +// message. If the current room is `room`, render the message. +function addMessage(room, username, message, push = false) { + if (push) { + STATE[room].push({ username, message }) + } + + if (STATE.room == room) { + var node = messageTemplate.content.cloneNode(true); + node.querySelector(".message .username").textContent = username; + node.querySelector(".message .username").style.color = hashColor(username); + node.querySelector(".message .text").textContent = message; + messagesDiv.appendChild(node); + } +} + +// Subscribe to the event source at `uri` with exponential backoff reconnect. +function subscribe(uri) { + var retryTime = 1; + + function connect(uri) { + const events = new EventSource(uri); + + events.addEventListener("message", (ev) => { + console.log("raw data", JSON.stringify(ev.data)); + console.log("decoded data", JSON.stringify(JSON.parse(ev.data))); + const msg = JSON.parse(ev.data); + if (!"message" in msg || !"room" in msg || !"username" in msg) return; + addMessage(msg.room, msg.username, msg.message, true); + }); + + events.addEventListener("open", () => { + setConnectedStatus(true); + console.log(`connected to event stream at ${uri}`); + retryTime = 1; + }); + + events.addEventListener("error", () => { + setConnectedStatus(false); + events.close(); + + let timeout = retryTime; + retryTime = Math.min(64, retryTime * 2); + console.log(`connection lost. attempting to reconnect in ${timeout}s`); + setTimeout(() => connect(uri), (() => timeout * 1000)()); + }); + } + + connect(uri); +} + +// Set the connection status: `true` for connected, `false` for disconnected. +function setConnectedStatus(status) { + STATE.connected = status; + statusDiv.className = (status) ? "connected" : "reconnecting"; +} + +// Let's go! Initialize the world. +function init() { + // Initialize some rooms. + addRoom("lobby"); + addRoom("rocket"); + changeRoom("lobby"); + addMessage("lobby", "Rocket", "Hey! Open another browser tab, send a message.", true); + addMessage("rocket", "Rocket", "This is another room. Neat, huh?", true); + + // Set up the form handler. + newMessageForm.addEventListener("submit", (e) => { + e.preventDefault(); + + const room = STATE.room; + const message = messageField.value; + const username = usernameField.value || "guest"; + if (!message || !username) return; + + if (STATE.connected) { + fetch("/message", { + method: "POST", + body: new URLSearchParams({ room, username, message }), + }).then((response) => { + if (response.ok) messageField.value = ""; + }); + } + }) + + // Set up the new room handler. + newRoomForm.addEventListener("submit", (e) => { + e.preventDefault(); + + const room = roomNameField.value; + if (!room) return; + + roomNameField.value = ""; + if (!addRoom(room)) return; + + addMessage(room, "Rocket", `Look, your own "${room}" room! Nice.`, true); + }) + + // Subscribe to server-sent events. + subscribe("/events"); +} + +init(); diff --git a/examples/chat/static/style.css b/examples/chat/static/style.css new file mode 100644 index 00000000..47fe129b --- /dev/null +++ b/examples/chat/static/style.css @@ -0,0 +1,186 @@ +:root { + --bg-dark: #242423; + --bg-light: #333533; + --fg-light: #E8EDDF; + --callout: rgb(255, 255, 102); + --callout-dark: #101010; +} + +* { + font-size: 14px; +} + +html, body, main { + background-color: var(--bg-dark); + color: #fff; + font-family: "Inter", Arial, Helvetica, sans-serif, "Noto Color Emoji"; + font-weight: 400; + text-shadow: rgb(77, 81, 86) 0px 0px 0px; + height: 100%; +} + +main { + display: flex; +} + +button:hover:not(.active) { + filter: brightness(1.15); + cursor: pointer; +} + +#sidebar { + flex: 3 30%; + display: flex; + flex-direction: column; + overflow: auto; + background-color: var(--bg-light); +} + +#room-list { + display: flex; + flex-direction: column; + overflow: auto; + flex: 1; +} + +#sidebar button { + height: 40px; + margin-bottom: 1px; + background: var(--bg-light); + color: #fff; + overflow: hidden; +} + +#sidebar button.active { + background: var(--bg-dark); + color: var(--callout); + font-weight: bold; + box-shadow: 0px 2px 2px rgba(0,0,0,0.9); + z-index: 10; +} + +#content { + flex: 7 100%; + overflow: auto; + display: flex; + flex-direction: column; +} + +.message { + display: flex; + flex-direction: column; + padding: 10px 0; +} + +.message:last-child { + padding-bottom: 20px; +} + +.message .username { + font-weight: bold; + padding-bottom: 5px; + color: var(--callout); +} + +#messages { + padding: 10px 20px; + flex: 1; +} + +form#new-message { + bottom: 0; + position: sticky; + flex: 0 0 auto; + width: 100%; +} + +form { + display: flex; + border-top: 2px solid #242424; +} + +form * { + height: 40px; + background: var(--fg-light); + color: var(--bg-dark); +} + +input { + padding: 0 10px; +} + +input:focus { + outline: 0; + filter: brightness(1.05); +} + +input#username { + text-align: right; + flex: 1 25%; + width: 25%; + border-right: 1px solid #303030; +} + +input#message { + flex: 10 100%; +} + +form button { + padding: 0 10px; +} + +#sidebar #new-room { + display: flex; + flex: 0 0 auto; + flex-direction: row; +} + +#new-room input:focus, #new-room button:hover { + filter: brightness(1.2); +} + +#new-room input { + flex: 8 80%; + width: 20%; + background-color: var(--callout-dark); + color: #fff; +} + +#new-room button { + flex: 2 20%; + width: 20%; + background-color: var(--bg-dark); +} + +#status { + padding: 5px 10px; + text-align: center; + font-size: 12px; +} + +#status.pending::before { + content: "status: connected"; +} + +#status.pending { + background-color: yellow; + color: #000; +} + +#status.connected::before { + content: "status: connected"; +} + +#status.connected { + background-color: green; + color: #fff; +} + +#status.reconnecting::before { + content: "status: reconnecting"; +} + +#status.reconnecting { + background-color: red; + color: #fff; +}