Introduce Server-Sent Event Streams.

Resolves #33.
This commit is contained in:
Sergio Benitez 2021-06-01 11:50:58 -07:00
parent 8029ea319f
commit a8f6103b99
4 changed files with 1003 additions and 414 deletions

View File

@ -1,413 +0,0 @@
//! A Responder implementing a [Server-sent events] (SSE) stream.
//!
//! This module is intended for eventual inclusion in rocket_contrib.
//!
//! [Server-sent events]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
use std::borrow::Cow;
use std::time::Duration;
use rocket::request::Request;
use rocket::response::{Responder, Response};
use rocket::futures::stream::Stream;
// Based on https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
// (Reproduced here for quick reference. retrieved 2021-04-17)
//
// stream = [ bom ] *event
// event = *( comment / field ) end-of-line
// comment = colon *any-char end-of-line
// field = 1*name-char [ colon [ space ] *any-char ] end-of-line
// end-of-line = ( cr lf / cr / lf )
//
// ; characters
// lf = %x000A ; U+000A LINE FEED (LF)
// cr = %x000D ; U+000D CARRIAGE RETURN (CR)
// space = %x0020 ; U+0020 SPACE
// colon = %x003A ; U+003A COLON (:)
// bom = %xFEFF ; U+FEFF BYTE ORDER MARK
// name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
// ; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
// any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
// ; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)/
//
// Notice that Multiple encodings are possible for the same data, especially in
// the choice of newline. This implementation always uses only "\n" (LF).
/// Low-level serialization of fields in text/event-stream format.
///
/// Corresponds to 'comment / field' above; there is no dedicated name for this concept.
///
/// Always use the public constructors [`FieldKind::comment`] and
/// [`FieldKind::field`], which validate inputs. Misuse of the public enum
/// variants cannot lead to memory unsafety, but it can break the event stream.
enum FieldKind<'a> {
/// Serializes as ":{}\n". May contain any characters except CR or LF
Comment(&'a str),
/// Serializes as "{}\n" or "{}: {}\n".
///
/// The name may contain any characters except CR or LF or ':' (colon).
/// The value, if present, may contain any characters except CR or LF.
Field(&'a str, Option<&'a str>),
}
impl<'a> FieldKind<'a> {
/// Returns true if 'name' is a valid name for an SSE field.
/// All characters are valid except for ':' (colon), CR, and LF.
pub fn is_valid_name(name: &str) -> bool {
!name.bytes().any(|b| b == b'\n' || b == b'\r' || b == b':')
}
/// Returns true if 'value' is a valid value for an SSE field.
/// All characters are valid except for CR, and LF.
pub fn is_valid_value(value: &str) -> bool {
!value.bytes().any(|b| b == b'\n' || b == b'\r')
}
/// Creates a comment field.
pub fn comment(comment: &'a str) -> Result<Self, ()> {
let comment = comment.into();
if Self::is_valid_value(comment) {
Ok(Self::Comment(comment))
} else {
Err(())
}
}
/// Creates a key/value field.
pub fn field(name: &'a str, value: Option<&'a str>) -> Result<Self, ()> {
let name = name.into();
let value = value.map(|v| v.into());
if Self::is_valid_name(name) && value.map_or(true, Self::is_valid_value) {
Ok(Self::Field(name, value))
} else {
Err(())
}
}
/// Serializes 'self' into 'out' in the text/event-stream format, including
/// a trailing newline.
pub fn serialize(&self, out: &mut Vec<u8>) {
match self {
FieldKind::Comment(comment) => {
out.push(b':');
out.extend_from_slice(comment.as_bytes());
}
FieldKind::Field(name, None) => {
out.extend_from_slice(name.as_bytes());
}
FieldKind::Field(name, Some(value)) => {
out.extend_from_slice(name.as_bytes());
out.extend_from_slice(b": ");
out.extend_from_slice(value.as_bytes());
}
}
out.push(b'\n');
}
#[cfg(test)]
pub fn to_vec(&self) -> Vec<u8> {
let mut vec = vec![];
self.serialize(&mut vec);
vec
}
}
#[cfg(test)]
mod field_tests {
use super::FieldKind;
#[test]
pub fn test_field_serialization() {
assert_eq!(
FieldKind::comment("test comment").unwrap().to_vec(),
b":test comment\n"
);
assert_eq!(
FieldKind::field("magic", None).unwrap().to_vec(),
b"magic\n"
);
assert_eq!(
FieldKind::field("hello", Some("w:o:r:l:d"))
.unwrap()
.to_vec(),
b"hello: w:o:r:l:d\n"
);
}
#[test]
pub fn test_disallowed_field_values() {
assert!(FieldKind::comment("newlines\nbad").is_err());
assert!(FieldKind::field("newlines\nbad", None).is_err());
assert!(FieldKind::field("no:colon", None).is_err());
assert!(FieldKind::field("x", Some("newlines\nbad")).is_err());
}
}
/// A single event in an SSE stream, with optional `event`, `data`, `id`, and
/// `retry` fields.
///
/// Events are created with [`Event::new()`] or [`Event::message()`]. They can be
/// [`serialize`](Event::serialize())d or wrapped into an [`EventSource`].
///
/// See [Using server-sent events] for more information on the meaning of the
/// fields and how they are interpreted by user agents.
///
/// [Using server-sent events]:
/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields
#[derive(Clone, Default, Eq, PartialEq, Hash, Debug)]
pub struct Event {
event: Option<String>,
data: Option<String>,
id: Option<String>,
retry: Option<u32>,
}
impl Event {
/// Creates a new empty `Event`.
pub fn new() -> Self {
Self::default()
}
/// Creates a new `Event` with only a data field.
///
/// Since the 'event' (event type) is left unspecified, the client will use
/// the default event type of `message`.
pub fn message<S: Into<String>>(data: S) -> Self {
Self::new().with_data(data)
}
/// Sets the value of the 'event' (event type) field. It may not contain newlines.
pub fn with_event<T: Into<String>>(mut self, event: T) -> Result<Self, ()> {
let event = event.into();
if !FieldKind::is_valid_value(&event) {
return Err(());
}
self.event = Some(event);
Ok(self)
}
/// Sets the value of the 'id' field. It may not contain newlines.
pub fn with_id<T: Into<String>>(mut self, id: T) -> Result<Self, ()> {
let id = id.into();
if !FieldKind::is_valid_value(&id) {
return Err(());
}
self.id = Some(id);
Ok(self)
}
/// Sets the value of the 'data' field. It may contain newlines.
pub fn with_data<T: Into<String>>(mut self, data: T) -> Self {
let data = data.into();
// No need to validate this: only newlines might be invalid, and for
// 'data' they are handled separately during serialization
self.data = Some(data);
self
}
/// Sets the value of the 'retry' field, in milliseconds.
pub fn with_retry(mut self, retry: u32) -> Self {
self.retry = Some(retry);
self
}
/// Returns the 'event' (event type) for this `Event`
pub fn event(&self) -> Option<&str> {
self.event.as_deref()
}
/// Returns the 'data' for this `Event`
pub fn data(&self) -> Option<&str> {
self.data.as_deref()
}
/// Returns the 'id' for this `Event`
pub fn id(&self) -> Option<&str> {
self.id.as_deref()
}
/// Returns the retry time for this `Event`
pub fn retry(&self) -> Option<u32> {
self.retry
}
fn try_serialize(self) -> Result<Vec<u8>, ()> {
let mut out = vec![];
if let Some(event) = self.event {
FieldKind::field("event", Some(&event))?.serialize(&mut out);
}
if let Some(id) = self.id {
FieldKind::field("id", Some(&id))?.serialize(&mut out);
}
if let Some(data) = self.data {
// "data" is treated specially: it can contain newlines, which are
// encoded in multiple "data" fields
for line in data.lines() {
FieldKind::field("data", Some(&line))?.serialize(&mut out);
}
}
if let Some(retry) = self.retry {
FieldKind::field("retry", Some(&retry.to_string()))?.serialize(&mut out);
}
// extra blank line indicates "end of Event"
out.push(b'\n');
Ok(out)
}
/// Serializes `self` into a byte buffer.
pub fn serialize(self) -> Vec<u8> {
self.try_serialize()
.expect("internal invariant broken: field contents should have already been validated")
}
}
const EMPTY_COMMENT_EVENT: &[u8] = b":\n\n";
#[cfg(test)]
mod event_tests {
use super::Event;
#[test]
pub fn test_event_serialization() {
assert_eq!(
Event::new()
.with_event("test")
.unwrap()
.with_data("line 1\nline 2")
.serialize(),
b"event: test\ndata: line 1\ndata: line 2\n\n"
);
assert_eq!(
Event::new().with_event("nodata").unwrap().serialize(),
b"event: nodata\n\n"
);
assert_eq!(
Event::new()
.with_id("event1")
.unwrap()
.with_retry(5)
.serialize(),
b"id: event1\nretry: 5\n\n"
);
assert_eq!(Event::new().serialize(), b"\n");
}
#[test]
pub fn test_disallowed_event_values() {
assert!(Event::new().with_event("a\rb").is_err());
assert!(Event::new().with_event("a\nb").is_err());
assert!(Event::new().with_id("1\r2").is_err());
assert!(Event::new().with_id("1\n2").is_err());
}
}
/// A `Responder` representing an SSE stream.
///
/// See the [`EventSource::new()`] function for a usage example.
///
/// The `Last-Event-ID` header is not handled by this API; if you wish to
/// support the feature, send events with some kind of `id` and use the
/// value in the header (if provided) to decide which event to resume from.
pub struct EventSource<S> {
stream: S,
interval: Option<Duration>,
}
impl<S: Stream<Item = Event>> EventSource<S> {
/// Creates an `EventSource` from a [`Stream`] of [`Event`]s.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// #
/// use rocket_rooms::sse::{self, Event, EventSource};
/// use rocket::futures::stream::Stream;
/// use rocket::response::stream::stream;
///
/// #[get("/events")]
/// fn events() -> EventSource<impl Stream<Item = Event>> {
/// EventSource::new(stream! {
/// let mut i = 0;
/// while i <= 3 {
/// i += 1;
/// yield Event::message(format!("data {}", i));
/// }
/// })
/// }
/// ```
pub fn new(stream: S) -> Self {
EventSource {
stream,
interval: Some(Duration::from_secs(30)),
}
}
/// Sets a "ping" interval for this `EventSource` to avoid connection
/// timeouts when no data is being transferred. The default `interval` for a
/// newly created `EventSource` is `None`, which disables this
/// functionality.
///
/// The ping is implemented by sending an empty comment to the client every
/// `interval` seconds.
///
/// # Example
/// ```rust
/// # use rocket::get;
/// #
/// use std::time::Duration;
///
/// use rocket_rooms::sse::{self, Event, EventSource};
/// use rocket::futures::stream::Stream;
///
/// #[get("/events")]
/// fn events() -> EventSource<impl Stream<Item = Event>> {
/// # let event_stream = rocket::futures::stream::pending();
/// // let event_stream = ...
///
/// // Set the ping interval to 15 seconds
/// EventSource::new(event_stream).with_ping_interval(Some(Duration::from_secs(15)))
/// }
/// ```
pub fn with_ping_interval(mut self, interval: Option<Duration>) -> Self {
self.interval = interval;
self
}
}
impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventSource<S> {
fn respond_to(self, req: &'r Request<'_>) -> rocket::response::Result<'r> {
use rocket::response::stream::ByteStream;
use rocket::tokio::time::interval;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
let serialized_events = self.stream.map(|e| Cow::Owned(e.serialize()));
let response = if let Some(duration) = self.interval {
let pings =
IntervalStream::new(interval(duration)).map(|_| Cow::Borrowed(EMPTY_COMMENT_EVENT));
ByteStream::from(pings.merge(serialized_events)).respond_to(req)?
} else {
ByteStream::from(serialized_events).respond_to(req)?
};
Response::build_from(response)
.raw_header("Content-Type", "text/event-stream")
.raw_header("Cache-Control", "no-cache")
.raw_header("Expires", "0")
.ok()
}
}

View File

@ -42,6 +42,7 @@
//! * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
//! * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
//! * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
//! * [`struct@EventStream`] ([`EventStream!`]) - Server-Sent [`Event`] stream
//!
//! Each type implements `Responder`; each macro can be invoked to generate a
//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
@ -132,11 +133,16 @@ mod reader;
mod bytes;
mod text;
mod one;
mod sse;
mod raw_sse;
pub(crate) use self::raw_sse::*;
pub use self::one::One;
pub use self::text::TextStream;
pub use self::bytes::ByteStream;
pub use self::reader::ReaderStream;
pub use self::sse::{Event, EventStream};
crate::export! {
/// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
@ -146,7 +152,7 @@ crate::export! {
/// This macro takes any series of statements and expands them into an
/// expression of type `impl Stream<Item = T>`, a stream that `yield`s
/// elements of type `T`. It supports any Rust statement syntax with the
/// following additions:
/// following extensions:
///
/// * `yield expr`
///
@ -159,6 +165,8 @@ crate::export! {
/// executes the block with the binding. `stream` must implement
/// `Stream<Item = T>`; the type of `x` is `T`.
///
/// * `?` short-cicuits stream termination on `Err`
///
/// # Examples
///
/// ```rust

View File

@ -0,0 +1,231 @@
use std::borrow::Cow;
use std::io::{self, Cursor};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::cmp::min;
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
/// Low-level serialization of fields in text/event-stream format.
///
/// Based on https://html.spec.whatwg.org/multipage/server-sent-events.html,
/// reproduced here for quick reference. Retrieved 2021-04-17.
///
/// ```text
/// stream = [ bom ] *event
/// event = *( comment / field ) end-of-line
/// comment = colon *any-char end-of-line
/// field = 1*name-char [ colon [ space ] *any-char ] end-of-line
/// end-of-line = ( cr lf / cr / lf )
///
/// ; characters
/// lf = %x000A ; U+000A LINE FEED (LF)
/// cr = %x000D ; U+000D CARRIAGE RETURN (CR)
/// space = %x0020 ; U+0020 SPACE
/// colon = %x003A ; U+003A COLON (:)
/// bom = %xFEFF ; U+FEFF BYTE ORDER MARK
/// name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
/// ; a scalar value other than:
/// ; U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
/// any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
/// ; a scalar value other than:
/// ; U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)/
/// ```
///
/// Notice that Multiple encodings are possible for the same data, especially in
/// the choice of newline. This implementation always uses only "\n" (LF).
///
/// Serializes (via `AsyncRead`) as a series of "${name}:${value}\n" events.
/// Either or both `name` and `value` may be empty. When the name is empty, this
/// is a comment. Otherwise, this is a field.
#[derive(Debug)]
pub struct RawLinedEvent {
name: Cursor<Cow<'static, [u8]>>,
value: Take<Cursor<Cow<'static, [u8]>>>,
state: State,
}
/// Converts a `Cow<str>` to a `Cow<[u8]>`.
fn farm(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
match cow {
Cow::Borrowed(slice) => Cow::Borrowed(slice.as_bytes()),
Cow::Owned(vec) => Cow::Owned(vec.into_bytes())
}
}
/// Farms `cow`, replacing `\r`, `\n`, and `:` with ` ` in the process.
///
/// This converts any string into a valid event `name`.
fn farm_name(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
let mut i = 0;
let mut cow = farm(cow);
while i < cow.len() {
if let Some(k) = memchr::memchr3(b'\r', b'\n', b':', &cow[i..]) {
cow.to_mut()[i + k] = b' ';
// This can't overflow as i + k + 1 <= len, since we found a char.
i += k + 1;
} else {
break;
}
}
cow
}
/// Farms `cow`, replacing `\r` and `\n` with ` ` in the process.
///
/// This converts any string into a valid event `value`.
fn farm_value(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
let mut i = 0;
let mut cow = farm(cow);
while i < cow.len() {
if let Some(k) = memchr::memchr2(b'\r', b'\n', &cow[i..]) {
cow.to_mut()[i + k] = b' ';
// This can't overflow as i + k + 1 <= len, since we found a char.
i += k + 1;
} else {
break;
}
}
cow
}
impl RawLinedEvent {
/// Create a `RawLinedEvent` from a valid, prefarmed `name` and `value`.
fn prefarmed(name: Cow<'static, [u8]>, value: Cow<'static, [u8]>) -> RawLinedEvent {
let name = Cursor::new(name);
let mut value = Cursor::new(value).take(0);
advance(&mut value);
RawLinedEvent { name, value, state: State::Name }
}
/// Create a `RawLinedEvent` from potentially invalid `name` and `value`
/// where `value` is not allowed to be multiple lines.
///
/// Characters `\n`, `\r`, and ':' in `name` and characters `\r` \`n` in
/// `value` `are replaced with a space ` `.
pub fn one<N, V>(name: N, value: V) -> RawLinedEvent
where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
{
RawLinedEvent::prefarmed(farm_name(name.into()), farm_value(value.into()))
}
/// Create a `RawLinedEvent` from potentially invalid `name` and `value`
/// where `value` is allowed to be multiple lines.
///
/// Characters `\n`, `\r`, and ':' in `name` are replaced with a space ` `.
/// `value` is allowed to contain any character. New lines (`\r\n` or `\n`)
/// and carriage returns `\r` result in a new event being emitted.
pub fn many<N, V>(name: N, value: V) -> RawLinedEvent
where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
{
RawLinedEvent::prefarmed(farm_name(name.into()), farm(value.into()))
}
/// Create a `RawLinedEvent` from known value `value`. The value is emitted
/// directly with _no_ name and suffixed with a `\n`.
pub fn raw<V: Into<Cow<'static, str>>>(value: V) -> RawLinedEvent {
let value = value.into();
let len = value.len();
RawLinedEvent {
name: Cursor::new(Cow::Borrowed(&[])),
value: Cursor::new(farm(value)).take(len as u64),
state: State::Value
}
}
}
/// The `AsyncRead`er state.
#[derive(Debug, PartialEq, Copy, Clone)]
enum State {
Name,
Colon,
Value,
NewLine,
Done
}
/// Find the next new-line (`\n` or `\r`) character in `buf` beginning at the
/// current cursor position and sets the limit to be at that position.
fn advance<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
// Technically, the position need not be <= len, so we right it.
let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
let inner = buf.get_ref().get_ref().as_ref();
let next = memchr::memchr2(b'\n', b'\r', &inner[(pos as usize)..])
.map(|i| pos + i as u64)
.unwrap_or_else(|| inner.len() as u64);
let limit = next - pos;
buf.set_limit(limit);
}
/// If the cursor in `buf` is currently at an `\r`, `\r\n` or `\n`, sets the
/// cursor position to be _after_ the characters.
fn skip<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
match buf.get_ref().get_ref().as_ref().get(pos as usize) {
// This cannot overflow as clearly `buf.len() >= pos + 1`.
Some(b'\n') => buf.get_mut().set_position(pos + 1),
Some(b'\r') => {
let next = (pos as usize).saturating_add(1);
if buf.get_ref().get_ref().as_ref().get(next) == Some(&b'\n') {
// This cannot overflow as clearly `buf.len() >= pos + 2`.
buf.get_mut().set_position(pos + 2);
} else {
// This cannot overflow as clearly `buf.len() >= pos + 1`.
buf.get_mut().set_position(pos + 1);
}
}
_ => return,
}
}
impl AsyncRead for RawLinedEvent {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
use bytes::Buf;
loop {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
match self.state {
State::Name => {
futures::ready!(Pin::new(&mut self.name).poll_read(cx, buf))?;
if !self.name.has_remaining() {
self.name.set_position(0);
self.state = State::Colon;
}
}
State::Colon => {
// Note that we've checked `buf.remaining() != 0`.
buf.put_slice(&[b':']);
self.state = State::Value;
}
State::Value => {
futures::ready!(Pin::new(&mut self.value).poll_read(cx, buf))?;
if self.value.limit() == 0 {
self.state = State::NewLine;
}
}
State::NewLine => {
// Note that we've checked `buf.remaining() != 0`.
buf.put_slice(&[b'\n']);
if self.value.get_ref().has_remaining() {
skip(&mut self.value);
advance(&mut self.value);
self.state = State::Name;
} else {
self.state = State::Done;
}
}
State::Done => return Poll::Ready(Ok(()))
}
}
}
}

View File

@ -0,0 +1,763 @@
use std::array;
use std::borrow::Cow;
use tokio::io::AsyncRead;
use tokio::time::Duration;
use futures::stream::{self, Stream, StreamExt};
use futures::future::ready;
use crate::request::Request;
use crate::response::{self, Response, Responder, stream::{ReaderStream, RawLinedEvent}};
use crate::http::ContentType;
/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
///
/// A server-sent event is either a _field_ or a _comment_. Comments can be
/// constructed via [`Event::comment()`] while fields can be constructed via
/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
///
/// ```rust
/// use rocket::tokio::time::Duration;
/// use rocket::response::stream::Event;
///
/// // A `data` event with message "Hello, SSE!".
/// let event = Event::data("Hello, SSE!");
///
/// // The same event but with event name of `hello`.
/// let event = Event::data("Hello, SSE!").event("hello");
///
/// // A `retry` event to set the client-side reconnection time.
/// let event = Event::retry(Duration::from_secs(5));
///
/// // An event with an attached comment, event name, and ID.
/// let event = Event::data("Hello, SSE!")
/// .with_comment("just a hello message")
/// .event("hello")
/// .id("1");
/// ```
///
/// We largely defer to [MDN's using server-sent events] documentation for
/// client-side details but reproduce, in our words, relevant server-side
/// documentation here.
///
/// [MDN's using server-sent events]:
/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
///
/// # Pitfalls
///
/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
/// Rocket's SSE support.
///
/// # Comments
///
/// A server-sent _comment_, created via [`Event::comment()`], is an event that
/// appears only in the raw server-sent event data stream and is inaccessible by
/// most clients. This includes JavaScript's `EventSource`. As such, they serve
/// little utility beyond debugging a raw data stream and keeping a connection
/// alive. See [hearbeat](struct@EventStream#hearbeat) for information on
/// Rocket's `EventStream` keep-alive.
///
/// # Fields
///
/// A server-sent field can be one of four kinds:
///
/// * `retry`
///
/// A `retry` event, created via [`Event::retry()`], sets the reconnection
/// time on the client side. It is the duration the client will wait before
/// attempting to reconnect when a connection is lost. Most applications
/// will not need to set a `retry`, opting instead to use the
/// implementation's default or to reconnect manually on error.
///
/// * `id`
///
/// Sets the event id to associate all subsequent fields with. This value
/// cannot be retrieved directly via most clients, including JavaScript
/// `EventSource`. Instead, it is sent by the implementation on reconnection
/// via the `Last-Event-ID` header. An `id` can be attached to other fields
/// via the [`Event::id()`] builder method.
///
/// * `event`
///
/// Sets the event name to associate the next `data` field with. In
/// JavaScript's `EventSource`, this is the event to be listened for, which
/// defaults to `message`. An `event` can be attached to other fields via
/// the [`Event::event()`] builder method.
///
/// * `data`
///
/// Sends data to dispatch as an event at the client. In JavaScript's
/// `EventSource`, this (and only this) results in an event handler for
/// `event`, specified just prior, being triggered. A data field can be
/// created via the [`Event::data()`] or [`Event::json()`] constructors.
///
/// # Implementation Notes
///
/// A constructed `Event` _always_ emits its fields in the following order:
///
/// 1. `comment`
/// 2. `retry`
/// 3. `id`
/// 4. `event`
/// 5. `data`
///
/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
/// Rocket's default implementation automatically converts new lines and
/// carriage returns in `event` and `id` fields to spaces.
///
/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
/// protocol-level `\n`, that is, in such a way that they are interpreted as
/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
/// those emitted by [`Event::json()`], have no such restrictions.
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
pub struct Event {
comment: Option<Cow<'static, str>>,
retry: Option<Duration>,
id: Option<Cow<'static, str>>,
event: Option<Cow<'static, str>>,
data: Option<Cow<'static, str>>,
}
impl Event {
/// Creates an empty `Event` with no fields. This is hidden so we never
/// generate an `Event` with nothing.
fn empty() -> Self {
Event { comment: None, retry: None, id: None, event: None, data: None, }
}
/// Creates a new `Event` with a data field of `data` serialized as JSON.
///
/// # Example
///
/// ```rust
/// use rocket::serde::Serialize;
/// use rocket::response::stream::Event;
///
/// #[derive(Serialize)]
/// #[serde(crate = "rocket::serde")]
/// struct MyData<'r> {
/// string: &'r str,
/// number: usize,
/// }
///
/// let data = MyData { string: "hello!", number: 10 };
/// let event = Event::json(&data);
/// ```
#[cfg(feature = "json")]
#[cfg_attr(nightly, doc(cfg(feature = "json")))]
pub fn json<T: serde::Serialize>(data: &T) -> Self {
let string = serde_json::to_string(data).unwrap_or_default();
Self::data(string)
}
/// Creates a new `Event` with a data field containing the raw `data`.
///
/// # Raw SSE is Lossy
///
/// Unencoded carriage returns cannot be expressed in the protocol. Thus,
/// any carriage returns in `data` will not appear at the client-side.
/// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
/// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
/// received as `foo\nbar\nbaz` at the client-side.
///
/// See [pitfalls](struct@EventStream#pitfalls) for more details.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// // A `data` event with message "Hello, SSE!".
/// let event = Event::data("Hello, SSE!");
/// ```
pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self { data: Some(data.into()), ..Event::empty() }
}
/// Creates a new comment `Event`.
///
/// As with [`Event::data()`], unencoded carriage returns cannot be
/// expressed in the protocol. Thus, any carriage returns in `data` will
/// not appear at the client-side. For comments, this is generally not a
/// concern as comments are discarded by client-side libraries.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// let event = Event::comment("bet you'll never see me!");
/// ```
pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
Self { comment: Some(data.into()), ..Event::empty() }
}
/// Creates a new retry `Event`.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
/// use rocket::tokio::time::Duration;
///
/// let event = Event::retry(Duration::from_millis(250));
/// ```
pub fn retry(period: Duration) -> Self {
Self { retry: Some(period), ..Event::empty() }
}
/// Sets the value of the 'event' (event type) field.
///
/// Event names may not contain new lines `\n` or carriage returns `\r`. If
/// `event` _does_ contain new lines or carriage returns, they are replaced
/// with spaces (` `) before being sent to the client.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// // The event name is "start".
/// let event = Event::data("hi").event("start");
///
/// // The event name is "then end", with `\n` replaced with ` `.
/// let event = Event::data("bye").event("then\nend");
/// ```
pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
self.event = Some(event.into());
self
}
/// Sets the value of the 'id' (last event ID) field.
///
/// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
/// `id` _does_ contain new lines or carriage returns, they are replaced
/// with spaces (` `) before being sent to the client.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// // The event ID is "start".
/// let event = Event::data("hi").id("start");
///
/// // The event ID is "then end", with `\n` replaced with ` `.
/// let event = Event::data("bye").id("then\nend");
/// ```
/// Sets the value of the 'id' field. It may not contain newlines.
pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
self.id = Some(id.into());
self
}
/// Sets or replaces the value of the `data` field.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// // The data "hello" will be sent.
/// let event = Event::data("hi").with_data("hello");
///
/// // The two below are equivalent.
/// let event = Event::comment("bye").with_data("goodbye");
/// let event = Event::data("goodbyte").with_comment("bye");
/// ```
pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.data = Some(data.into());
self
}
/// Sets or replaces the value of the `comment` field.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
///
/// // The comment "🚀" will be sent.
/// let event = Event::comment("Rocket is great!").with_comment("🚀");
///
/// // The two below are equivalent.
/// let event = Event::comment("bye").with_data("goodbye");
/// let event = Event::data("goodbyte").with_comment("bye");
/// ```
pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
self.comment = Some(data.into());
self
}
/// Sets or replaces the value of the `retry` field.
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Event;
/// use rocket::tokio::time::Duration;
///
/// // The reconnection will be set to 10 seconds.
/// let event = Event::retry(Duration::from_millis(500))
/// .with_retry(Duration::from_secs(10));
///
/// // The two below are equivalent.
/// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
/// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
/// ```
pub fn with_retry(mut self, period: Duration) -> Self {
self.retry = Some(period);
self
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
let events = [
self.comment.map(|v| RawLinedEvent::many("", v)),
self.retry.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
self.id.map(|v| RawLinedEvent::one("id", v)),
self.event.map(|v| RawLinedEvent::one("event", v)),
self.data.map(|v| RawLinedEvent::many("data", v)),
Some(RawLinedEvent::raw("")),
];
stream::iter(array::IntoIter::new(events)).filter_map(|v| ready(v))
}
}
/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
///
/// An `EventStream` can be constructed from any [`Stream`] of items of type
/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
/// or through generator syntax via [`EventStream!`].
///
/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
///
/// # Responder
///
/// `EventStream` is a (potentially infinite) responder. The response
/// `Content-Type` is set to [`EventStream`](ContentType::EventStream). The body
/// is [unsized](crate::response::Body#unsized), and values are sent as soon as
/// they are yielded by the internal iterator.
///
/// ## Heartbeat
///
/// A heartbeat comment is injected into the internal stream and sent at a fixed
/// interval. The comment is discarded by clients and serves only to keep the
/// connection alive; it does not interfere with application data. The interval
/// defaults to 30 seconds but can be adjusted with
/// [`EventStream::heartbeat()`].
///
/// # Examples
///
/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
/// the client, one per second:
///
/// ```rust
/// # use rocket::*;
/// use rocket::response::stream::{Event, EventStream};;
/// use rocket::tokio::time::{self, Duration};
///
/// #[get("/events")]
/// fn stream() -> EventStream![] {
/// EventStream! {
/// let mut interval = time::interval(Duration::from_secs(1));
/// loop {
/// yield Event::data("ping");
/// interval.tick().await;
/// }
/// }
/// }
/// ```
///
/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
///
/// ```rust
/// # use rocket::get;
/// use rocket::response::stream::{Event, EventStream};
/// use rocket::tokio::time::Duration;
///
/// #[get("/events")]
/// fn events() -> EventStream![] {
/// EventStream! {
/// for i in 0..3 {
/// yield Event::retry(Duration::from_secs(10));
/// yield Event::data(format!("{}", i)).id("cat").event("bar");
/// yield Event::comment("silly boy");
/// }
/// }
/// }
/// ```
///
/// The syntax of `EventStream!` as an expression is identical to that of
/// [`stream!`](crate::response::stream::stream). For how to gracefully
/// terminate an otherwise infinite stream, see [graceful
/// shutdown](crate::response::stream#graceful-shutdown).
///
/// # Pitfalls
///
/// Server-Sent Events are a rather simple mechanism, though there are some
/// pitfalls to be aware of.
///
/// * **Buffering**
///
/// Protocol restrictions complicate implementing an API that does not
/// buffer. As such, if you are sending _lots_ of data, consider sending the
/// data via multiple data fields (with events to signal start and end).
/// Alternatively, send _one_ event which instructs the client to fetch the
/// data from another endpoint which in-turn streams the data.
///
/// * **Raw SSE requires UTF-8 data**
///
/// Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
/// consider encoding it, for instance, as JSON using [`Event::json()`].
/// Alternatively, as described before, use SSE as a notifier which alerts
/// the client to fetch the data from elsewhere.
///
/// * **Raw SSE is Lossy**
///
/// Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
/// due to interference with the line protocol.
///
/// The protocol allows expressing new lines as multiple messages, however,
/// and Rocket automatically transforms a message of `foo\nbar` into two
/// messages, `foo` and `bar`, so that they are reconstructed (automatically)
/// as `foo\nbar` on the client-side. For messages that only contain new
/// lines `\n`, the conversion is lossless.
///
/// However, the protocol has no mechanism for expressing carriage returns
/// and thus it is not possible to send unencoded carriage returns via SSE.
/// Rocket handles carriage returns like it handles new lines: it splits the
/// data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
/// the client side. A single `\r` that is not part of an `\r\n` sequence
/// also becomes `\n` at the client side. As a result, the message
/// `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
///
/// To send messages losslessly, they must be encoded first, for instance, by
/// using [`Event::json()`].
pub struct EventStream<S> {
stream: S,
heartbeat: Option<Duration>,
}
impl<S: Stream<Item = Event>> EventStream<S> {
/// Sets a "ping" interval for this `EventStream` to avoid connection
/// timeouts when no data is being transferred. The default `interval` for a
/// newly created `EventStream` is `None`, which disables this
/// functionality.
///
/// The ping is implemented by sending an empty comment to the client every
/// `interval` seconds.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// use rocket::response::stream::{Event, EventStream};
/// use rocket::tokio::time::Duration;
///
/// #[get("/events")]
/// fn events() -> EventStream![] {
/// // Remove the default heartbeat.
/// # let event_stream = rocket::futures::stream::pending();
/// EventStream::from(event_stream).heartbeat(None);
///
/// // Set the heartbeat interval to 15 seconds.
/// # let event_stream = rocket::futures::stream::pending();
/// EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
///
/// // Do the same but for a generated `EventStream`:
/// let stream = EventStream! {
/// yield Event::data("hello");
/// };
///
/// stream.heartbeat(Duration::from_secs(15))
/// }
/// ```
pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
self.heartbeat = heartbeat.into();
self
}
fn heartbeat_stream(&self) -> Option<impl Stream<Item = RawLinedEvent>> {
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
self.heartbeat
.map(|beat| IntervalStream::new(interval(beat)))
.map(|stream| stream.map(|_| RawLinedEvent::raw(":\n")))
}
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
use futures::future::Either;
use crate::ext::StreamExt;
let heartbeat_stream = self.heartbeat_stream();
let raw_events = self.stream.map(|e| e.into_stream()).flatten();
match heartbeat_stream {
Some(heartbeat) => Either::Left(raw_events.join(heartbeat)),
None => Either::Right(raw_events)
}
}
fn into_reader(self) -> impl AsyncRead {
ReaderStream::from(self.into_stream())
}
}
impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
/// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
///
/// Use `EventStream::from()` to construct an `EventStream` from an already
/// existing stream. Otherwise, prefer to use [`EventStream!`].
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::{Event, EventStream};
/// use rocket::futures::stream;
///
/// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
/// let stream = EventStream::from(raw);
/// ```
fn from(stream: S) -> Self {
EventStream { stream, heartbeat: Some(Duration::from_secs(30)), }
}
}
impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
Response::build()
.header(ContentType::EventStream)
.raw_header("Cache-Control", "no-cache")
.raw_header("Expires", "0")
.streamed_body(self.into_reader())
.ok()
}
}
crate::export! {
/// Type and stream expression macro for [`struct@EventStream`].
///
/// See [`struct@EventStream`] and the [module level
/// docs](crate::response::stream#typed-streams) for usage details.
macro_rules! EventStream {
() => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
}
}
#[cfg(test)]
mod sse_tests {
use tokio::io::AsyncReadExt;
use tokio::time::{self, Duration};
use futures::stream::Stream;
use crate::response::stream::{stream, Event, EventStream, ReaderStream};
impl Event {
fn into_string(self) -> String {
crate::async_test(async move {
let mut string = String::new();
let mut reader = ReaderStream::from(self.into_stream());
reader.read_to_string(&mut string).await.expect("event -> string");
string
})
}
}
impl<S: Stream<Item = Event>> EventStream<S> {
fn into_string(self) -> String {
crate::async_test(async move {
let mut string = String::new();
let reader = self.into_reader();
tokio::pin!(reader);
reader.read_to_string(&mut string).await.expect("event stream -> string");
string
})
}
}
#[test]
fn test_event_data() {
let event = Event::data("a\nb");
assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
let event = Event::data("a\n");
assert_eq!(event.into_string(), "data:a\ndata:\n\n");
let event = Event::data("cats make me happy!");
assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
let event = Event::data("in the\njungle\nthe mighty\njungle");
assert_eq!(event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
assert_eq!(event.into_string(),
"data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
let event = Event::data("\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\r\nb\r\n");
assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
let event = Event::data("\n\nb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::data("\n\rb\r");
assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
let event = Event::comment("\n\rb\r");
assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
let event = Event::data("\n\n\n");
assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
let event = Event::data("\n");
assert_eq!(event.into_string(), "data:\ndata:\n\n");
let event = Event::data("");
assert_eq!(event.into_string(), "data:\n\n");
}
#[test]
fn test_event_fields() {
let event = Event::data("foo").id("moo");
assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
let event = Event::data("foo").id("moo").with_retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar").id("moo").with_retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n");
let event = Event::retry(Duration::from_secs(45));
assert_eq!(event.into_string(), "retry:45000\n\n");
let event = Event::comment("incoming data...");
assert_eq!(event.into_string(), ":incoming data...\n\n");
let event = Event::data("foo").id("moo").with_comment("cows, ey?");
assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
let event = Event::data("foo\nbar")
.id("moo")
.event("milk")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n");
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("??")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
let event = Event::data("foo")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(), ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
let event = Event::data("foo\r\nbar\nbaz")
.id("moo")
.event("milk")
.with_comment("?\n?")
.with_retry(Duration::from_secs(3));
assert_eq!(event.into_string(),
":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n");
}
#[test]
fn test_bad_chars() {
let event = Event::data("foo").id("dead\nbeef").event("m\noo");
assert_eq!(event.into_string(), "id:dead beef\nevent:m oo\ndata:foo\n\n");
let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
assert_eq!(event.into_string(), "id:d be f\nevent:m \ndata:f\ndata:o\n\n");
let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
assert_eq!(event.into_string(), "id: \nevent: b\ndata:f\ndata:o\n\n");
}
#[test]
fn test_event_stream() {
use futures::stream::iter;
let stream = EventStream::from(iter(vec![Event::data("foo")]));
assert_eq!(stream.into_string(), "data:foo\n\n");
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
assert_eq!(stream.into_string(), "data:a\n\ndata:b\n\n");
let stream = EventStream::from(iter(vec![
Event::data("a\n"),
Event::data("b"),
Event::data("c\n\nd"),
Event::data("e"),
]));
assert_eq!(stream.into_string(),
"data:a\ndata:\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n");
}
#[test]
fn test_heartbeat() {
use futures::future::ready;
use futures::stream::{once, iter, StreamExt};
// Set a heartbeat interval of 200ms. Send nothing for 500ms. We should
// get 2 or 3 heartbeats, the latter if one is sent eagerly.
let raw = stream!(time::sleep(Duration::from_millis(500)).await;)
.map(|_| unreachable!());
let string = EventStream::from(raw)
.heartbeat(Duration::from_millis(200))
.into_string();
match string.as_str() {
":\n\n:\n\n" | ":\n\n:\n\n:\n\n" => { /* ok */ },
s => panic!("unexpected heartbeat response: {:?}", s)
}
let stream = EventStream! {
time::sleep(Duration::from_millis(200)).await;
yield Event::data("foo");
time::sleep(Duration::from_millis(200)).await;
yield Event::data("bar");
};
let string = stream.heartbeat(Duration::from_millis(300)).into_string();
match string.as_str() {
":\n\ndata:foo\n\n:\n\ndata:bar\n\n" | "data:foo\n\n:\n\ndata:bar\n\n" => { /* ok */ },
s => panic!("unexpected heartbeat response: {:?}", s)
}
// We shouldn't send a heartbeat if a message is immediately available.
let stream = EventStream::from(once(ready(Event::data("hello"))));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
assert_eq!(string, "data:hello\n\n");
// It's okay if we do it with two, though.
let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
let string = stream.heartbeat(Duration::from_secs(1)).into_string();
match string.as_str() {
"data:a\n\n:\n\ndata:b\n\n" | "data:a\n\ndata:b\n\n" => { /* ok */ },
s => panic!("unexpected heartbeat response: {:?}", s)
}
}
}