From 6fc24789e9616c91e1ec2c21178412002df8bace Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Wed, 27 Apr 2022 13:15:47 -0700 Subject: [PATCH] Fix SSE heartbeat so it never interrupts events. Previously, the heartbeat message, in its raw form, was ":\n\n". This commit changes the message to be ":\n". The former message, when parsed as Server-Sent Events, contained an empty comment (as desired) _and_ a new line (erroneously). The new line resulted in emitting any event that was presently being emitted, even if it wasn't complete. That is, emitting an event partly, such as the event's data but not its name. Removing the extra new line resolves this issue and ensures that events aren't interrupted by the heartbeat. Fixes #2152. --- core/lib/src/response/stream/sse.rs | 4 +-- examples/responders/src/main.rs | 39 ++++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/core/lib/src/response/stream/sse.rs b/core/lib/src/response/stream/sse.rs index cce8a273..d56fca1b 100644 --- a/core/lib/src/response/stream/sse.rs +++ b/core/lib/src/response/stream/sse.rs @@ -524,7 +524,7 @@ impl> EventStream { self.heartbeat .map(|beat| IntervalStream::new(interval(beat))) - .map(|stream| stream.map(|_| RawLinedEvent::raw(":\n"))) + .map(|stream| stream.map(|_| RawLinedEvent::raw(":"))) } fn into_stream(self) -> impl Stream { @@ -762,7 +762,7 @@ mod sse_tests { use futures::future::ready; use futures::stream::{once, iter, StreamExt}; - const HEARTBEAT: &str = ":\n\n"; + const HEARTBEAT: &str = ":\n"; // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4. diff --git a/examples/responders/src/main.rs b/examples/responders/src/main.rs index e7ba7cea..4e067095 100644 --- a/examples/responders/src/main.rs +++ b/examples/responders/src/main.rs @@ -39,7 +39,7 @@ use rocket::tokio::time::{self, Duration}; use rocket::futures::stream::{repeat, StreamExt}; use rocket::Shutdown; -use rocket::response::stream::TextStream; +use rocket::response::stream::{TextStream, EventStream, Event}; #[get("/stream/hi")] fn many_his() -> TextStream![&'static str] { @@ -62,6 +62,42 @@ fn one_hi_per_ms(mut shutdown: Shutdown, n: u8) -> TextStream![&'static str] { } } +#[get("/progress", rank = 2)] +fn progress_page() -> RawHtml<&'static str> { + RawHtml(r#" + + +

+ "#) +} + +#[get("/progress", format = "text/event-stream", rank = 1)] +fn progress_stream() -> EventStream![] { + let stream = EventStream! { + let mut interval = time::interval(Duration::from_secs(1)); + + for count in 0..100 { + interval.tick().await; + yield Event::data(count.to_string()).event("progress"); + } + + yield Event::data("").event("done"); + }; + + stream.heartbeat(Duration::from_secs(3)) +} + /***************************** `Redirect` Responder ***************************/ use rocket::response::Redirect; @@ -178,6 +214,7 @@ async fn custom(kind: Option) -> StoredData { fn rocket() -> _ { rocket::build() .mount("/", routes![many_his, one_hi_per_ms, file, upload, delete]) + .mount("/", routes![progress_stream, progress_page]) .mount("/", routes![redir_root, redir_login, maybe_redir]) .mount("/", routes![xml, json, json_or_msgpack]) .mount("/", routes![custom])