Add fully featured SSE chat example.

This commit is contained in:
Sergio Benitez 2021-06-01 13:15:05 -07:00
parent a8f6103b99
commit f1ecb79a7e
9 changed files with 599 additions and 0 deletions

View File

@ -18,4 +18,5 @@ members = [
"pastebin",
"todo",
"chat",
]

View File

@ -16,6 +16,12 @@ This directory contains projects showcasing Rocket's features.
SQLite database driven by diesel. Runs migrations automatically at start-up.
Uses tera to render templates.
* **[`chat`](./chat)**
A real-time, multi-room chat application using Server-Sent Events (SSE) and
JavaScript's `EventSource`. Supports automatic reconnection with exponential
backoff and live connection status.
## Feature Examples
* **[`config`](./config)** - Illustrates how to extract values from a Rocket

12
examples/chat/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "chat"
version = "0.0.0"
workspace = "../"
edition = "2018"
publish = false
[dependencies]
rocket = { path = "../../core/lib", features = ["json"] }
[dev-dependencies]
rand = "0.8"

57
examples/chat/src/main.rs Normal file
View File

@ -0,0 +1,57 @@
//! Implements a chat server using async rocket and SSE.
#[macro_use] extern crate rocket;
#[cfg(test)] mod tests;
use rocket::{State, Shutdown};
use rocket::form::Form;
use rocket::fs::{relative, FileServer};
use rocket::response::stream::{EventStream, Event};
use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError};
use rocket::tokio::select;
use rocket::serde::{Serialize, Deserialize};
#[derive(Debug, PartialEq, Clone, FromForm, Deserialize, Serialize, UriDisplayQuery)]
#[serde(crate = "rocket::serde")]
struct Message {
#[field(validate = len(..30))]
pub room: String,
#[field(validate = len(..20))]
pub username: String,
pub message: String,
}
#[get("/events")]
async fn events(queue: &State<Sender<Message>>, mut end: Shutdown) -> EventStream![] {
// Subscribe to messages and map it to an SSE stream
let mut rx = queue.subscribe();
EventStream! {
loop {
let msg = select! {
msg = rx.recv() => match msg {
Ok(msg) => msg,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => continue,
},
_ = &mut end => break,
};
yield Event::json(&msg);
}
}
}
#[post("/message", data = "<form>")]
fn post(form: Form<Message>, queue: &State<Sender<Message>>) {
// A send 'fails' if there are no active subscribers. That's okay.
let _ = queue.send(form.into_inner());
}
#[launch]
fn rocket() -> _ {
rocket::build()
.manage(channel::<Message>(1024).0)
.mount("/", routes![post, events])
.mount("/", FileServer::from(relative!("static")))
}

115
examples/chat/src/tests.rs Normal file
View File

@ -0,0 +1,115 @@
use std::ops::Range;
use rand::{thread_rng, Rng};
use rand::distributions::Alphanumeric;
use rocket::http::{ContentType, Status};
use rocket::http::uri::fmt::{UriDisplay, Query};
use rocket::local::asynchronous::{Client, LocalResponse};
use rocket::tokio::{sync, join};
use rocket::tokio::io::{BufReader, AsyncBufReadExt};
use rocket::serde::json;
use super::*;
async fn send_message<'c>(client: &'c Client, message: &Message) -> LocalResponse<'c> {
client.post(uri!(post))
.header(ContentType::Form)
.body((message as &dyn UriDisplay<Query>).to_string())
.dispatch()
.await
}
fn gen_string(len: Range<usize>) -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(thread_rng().gen_range(len))
.map(char::from)
.collect()
}
#[async_test]
async fn messages() {
let client = Client::tracked(rocket()).await.unwrap();
let start_barrier = sync::Barrier::new(2);
let shutdown_message = Message {
room: ":control".into(),
username: ":control".into(),
message: "shutdown".into(),
};
// Generate somewhere between 75 and 100 messages.
let mut test_messages = vec![];
for _ in 0..thread_rng().gen_range(75..100) {
test_messages.push(Message {
room: gen_string(10..30),
username: gen_string(10..20),
message: gen_string(10..100),
})
}
let send_messages = async {
// Wait for the other task to start listening.
start_barrier.wait().await;
// Send all of the messages.
for message in &test_messages {
send_message(&client, message).await;
}
// Send the special "shutdown" message.
send_message(&client, &shutdown_message).await;
};
let receive_messages = async {
let response = client.get(uri!(events)).dispatch().await;
// We have the response stream. Let the receiver know to start sending.
start_barrier.wait().await;
let mut messages = vec![];
let mut reader = BufReader::new(response).lines();
while let Ok(Some(line)) = reader.next_line().await {
if !line.starts_with("data:") {
continue;
}
let data: Message = json::from_str(&line[5..]).expect("message JSON");
if &data == &shutdown_message {
// Test shutdown listening: this should end the stream.
client.rocket().shutdown().notify();
continue;
}
messages.push(data);
}
messages
};
let received_messages = join!(send_messages, receive_messages).1;
assert!(test_messages.len() >= 75);
assert_eq!(test_messages, received_messages);
}
#[async_test]
async fn bad_messages() {
// Generate a bunch of bad messages.
let mut bad_messages = vec![];
for _ in 0..thread_rng().gen_range(75..100) {
bad_messages.push(Message {
room: gen_string(30..40),
username: gen_string(20..30),
message: gen_string(10..100),
});
}
// Ensure they all result in a rejected request.
let client = Client::tracked(rocket()).await.unwrap();
for message in &bad_messages {
let response = send_message(&client, message).await;
assert_eq!(response.status(), Status::PayloadTooLarge);
}
}

View File

@ -0,0 +1,49 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Rocket Rooms</title>
<link rel="stylesheet" href="/reset.css">
<link rel="stylesheet" href="/style.css">
<script src="/script.js" charset="utf-8" defer></script>
</head>
<body>
<main>
<div id="sidebar">
<div id="status" class="pending"></div>
<div id="room-list">
<template id="room">
<button class="room"></button>
</template>
</div>
<form id="new-room">
<input type="text" name="name" id="name" autocomplete="off"
placeholder="new room..." maxlength="30"></input>
<button type="submit">+</button>
</form>
</div>
<div id="content">
<div id="messages">
<template id="message">
<div class="message">
<span class="username"></span>
<span class="text"></span>
</div>
</template>
</div>
<form id="new-message">
<input type="text" name="username" id="username" maxlength="20"
placeholder="guest" autocomplete="off">
<input type="text" name="message" id="message" autocomplete="off"
placeholder="Send a message..." autofocus>
<button type="submit" id="send">Send</button>
</form>
</div>
</main>
</body>
</html>

View File

@ -0,0 +1 @@
html,body,p,ol,ul,li,dl,dt,dd,blockquote,figure,fieldset,legend,textarea,pre,iframe,hr,h1,h2,h3,h4,h5,h6{margin:0;padding:0}h1,h2,h3,h4,h5,h6{font-size:100%;font-weight:normal}ul{list-style:none}button,input,select{margin:0}html{box-sizing:border-box}*,*::before,*::after{box-sizing:inherit}img,video{height:auto;max-width:100%}iframe,button,input{border:0}table{border-collapse:collapse;border-spacing:0}td,th{padding:0}

View File

@ -0,0 +1,172 @@
let roomListDiv = document.getElementById('room-list');
let messagesDiv = document.getElementById('messages');
let newMessageForm = document.getElementById('new-message');
let newRoomForm = document.getElementById('new-room');
let statusDiv = document.getElementById('status');
let roomTemplate = document.getElementById('room');
let messageTemplate = document.getElementById('message');
let messageField = newMessageForm.querySelector("#message");
let usernameField = newMessageForm.querySelector("#username");
let roomNameField = newRoomForm.querySelector("#name");
var STATE = {
room: "lobby",
rooms: {},
connected: false,
}
// Generate a color from a "hash" of a string. Thanks, internet.
function hashColor(str) {
let hash = 0;
for (var i = 0; i < str.length; i++) {
hash = str.charCodeAt(i) + ((hash << 5) - hash);
hash = hash & hash;
}
return `hsl(${hash % 360}, 100%, 70%)`;
}
// Add a new room `name` and change to it. Returns `true` if the room didn't
// already exist and false otherwise.
function addRoom(name) {
if (STATE[name]) {
changeRoom(name);
return false;
}
var node = roomTemplate.content.cloneNode(true);
var room = node.querySelector(".room");
room.addEventListener("click", () => changeRoom(name));
room.textContent = name;
room.dataset.name = name;
roomListDiv.appendChild(node);
STATE[name] = [];
changeRoom(name);
return true;
}
// Change the current room to `name`, restoring its messages.
function changeRoom(name) {
if (STATE.room == name) return;
var newRoom = roomListDiv.querySelector(`.room[data-name='${name}']`);
var oldRoom = roomListDiv.querySelector(`.room[data-name='${STATE.room}']`);
if (!newRoom || !oldRoom) return;
STATE.room = name;
oldRoom.classList.remove("active");
newRoom.classList.add("active");
messagesDiv.querySelectorAll(".message").forEach((msg) => {
messagesDiv.removeChild(msg)
});
STATE[name].forEach((data) => addMessage(name, data.username, data.message))
}
// Add `message` from `username` to `room`. If `push`, then actually store the
// message. If the current room is `room`, render the message.
function addMessage(room, username, message, push = false) {
if (push) {
STATE[room].push({ username, message })
}
if (STATE.room == room) {
var node = messageTemplate.content.cloneNode(true);
node.querySelector(".message .username").textContent = username;
node.querySelector(".message .username").style.color = hashColor(username);
node.querySelector(".message .text").textContent = message;
messagesDiv.appendChild(node);
}
}
// Subscribe to the event source at `uri` with exponential backoff reconnect.
function subscribe(uri) {
var retryTime = 1;
function connect(uri) {
const events = new EventSource(uri);
events.addEventListener("message", (ev) => {
console.log("raw data", JSON.stringify(ev.data));
console.log("decoded data", JSON.stringify(JSON.parse(ev.data)));
const msg = JSON.parse(ev.data);
if (!"message" in msg || !"room" in msg || !"username" in msg) return;
addMessage(msg.room, msg.username, msg.message, true);
});
events.addEventListener("open", () => {
setConnectedStatus(true);
console.log(`connected to event stream at ${uri}`);
retryTime = 1;
});
events.addEventListener("error", () => {
setConnectedStatus(false);
events.close();
let timeout = retryTime;
retryTime = Math.min(64, retryTime * 2);
console.log(`connection lost. attempting to reconnect in ${timeout}s`);
setTimeout(() => connect(uri), (() => timeout * 1000)());
});
}
connect(uri);
}
// Set the connection status: `true` for connected, `false` for disconnected.
function setConnectedStatus(status) {
STATE.connected = status;
statusDiv.className = (status) ? "connected" : "reconnecting";
}
// Let's go! Initialize the world.
function init() {
// Initialize some rooms.
addRoom("lobby");
addRoom("rocket");
changeRoom("lobby");
addMessage("lobby", "Rocket", "Hey! Open another browser tab, send a message.", true);
addMessage("rocket", "Rocket", "This is another room. Neat, huh?", true);
// Set up the form handler.
newMessageForm.addEventListener("submit", (e) => {
e.preventDefault();
const room = STATE.room;
const message = messageField.value;
const username = usernameField.value || "guest";
if (!message || !username) return;
if (STATE.connected) {
fetch("/message", {
method: "POST",
body: new URLSearchParams({ room, username, message }),
}).then((response) => {
if (response.ok) messageField.value = "";
});
}
})
// Set up the new room handler.
newRoomForm.addEventListener("submit", (e) => {
e.preventDefault();
const room = roomNameField.value;
if (!room) return;
roomNameField.value = "";
if (!addRoom(room)) return;
addMessage(room, "Rocket", `Look, your own "${room}" room! Nice.`, true);
})
// Subscribe to server-sent events.
subscribe("/events");
}
init();

View File

@ -0,0 +1,186 @@
:root {
--bg-dark: #242423;
--bg-light: #333533;
--fg-light: #E8EDDF;
--callout: rgb(255, 255, 102);
--callout-dark: #101010;
}
* {
font-size: 14px;
}
html, body, main {
background-color: var(--bg-dark);
color: #fff;
font-family: "Inter", Arial, Helvetica, sans-serif, "Noto Color Emoji";
font-weight: 400;
text-shadow: rgb(77, 81, 86) 0px 0px 0px;
height: 100%;
}
main {
display: flex;
}
button:hover:not(.active) {
filter: brightness(1.15);
cursor: pointer;
}
#sidebar {
flex: 3 30%;
display: flex;
flex-direction: column;
overflow: auto;
background-color: var(--bg-light);
}
#room-list {
display: flex;
flex-direction: column;
overflow: auto;
flex: 1;
}
#sidebar button {
height: 40px;
margin-bottom: 1px;
background: var(--bg-light);
color: #fff;
overflow: hidden;
}
#sidebar button.active {
background: var(--bg-dark);
color: var(--callout);
font-weight: bold;
box-shadow: 0px 2px 2px rgba(0,0,0,0.9);
z-index: 10;
}
#content {
flex: 7 100%;
overflow: auto;
display: flex;
flex-direction: column;
}
.message {
display: flex;
flex-direction: column;
padding: 10px 0;
}
.message:last-child {
padding-bottom: 20px;
}
.message .username {
font-weight: bold;
padding-bottom: 5px;
color: var(--callout);
}
#messages {
padding: 10px 20px;
flex: 1;
}
form#new-message {
bottom: 0;
position: sticky;
flex: 0 0 auto;
width: 100%;
}
form {
display: flex;
border-top: 2px solid #242424;
}
form * {
height: 40px;
background: var(--fg-light);
color: var(--bg-dark);
}
input {
padding: 0 10px;
}
input:focus {
outline: 0;
filter: brightness(1.05);
}
input#username {
text-align: right;
flex: 1 25%;
width: 25%;
border-right: 1px solid #303030;
}
input#message {
flex: 10 100%;
}
form button {
padding: 0 10px;
}
#sidebar #new-room {
display: flex;
flex: 0 0 auto;
flex-direction: row;
}
#new-room input:focus, #new-room button:hover {
filter: brightness(1.2);
}
#new-room input {
flex: 8 80%;
width: 20%;
background-color: var(--callout-dark);
color: #fff;
}
#new-room button {
flex: 2 20%;
width: 20%;
background-color: var(--bg-dark);
}
#status {
padding: 5px 10px;
text-align: center;
font-size: 12px;
}
#status.pending::before {
content: "status: connected";
}
#status.pending {
background-color: yellow;
color: #000;
}
#status.connected::before {
content: "status: connected";
}
#status.connected {
background-color: green;
color: #fff;
}
#status.reconnecting::before {
content: "status: reconnecting";
}
#status.reconnecting {
background-color: red;
color: #fff;
}