mirror of https://github.com/rwf2/Rocket.git
Fix SSE heartbeat so it never interrupts events.
Previously, the heartbeat message, in its raw form, was ":\n\n". This commit changes the message to be ":\n". The former message, when parsed as Server-Sent Events, contained an empty comment (as desired) _and_ a new line (erroneously). The new line resulted in emitting any event that was presently being emitted, even if it wasn't complete. That is, emitting an event partly, such as the event's data but not its name. Removing the extra new line resolves this issue and ensures that events aren't interrupted by the heartbeat. Fixes #2152.
This commit is contained in:
parent
b117210ca6
commit
6fc24789e9
|
@ -524,7 +524,7 @@ impl<S: Stream<Item = Event>> EventStream<S> {
|
||||||
|
|
||||||
self.heartbeat
|
self.heartbeat
|
||||||
.map(|beat| IntervalStream::new(interval(beat)))
|
.map(|beat| IntervalStream::new(interval(beat)))
|
||||||
.map(|stream| stream.map(|_| RawLinedEvent::raw(":\n")))
|
.map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
|
fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
|
||||||
|
@ -762,7 +762,7 @@ mod sse_tests {
|
||||||
use futures::future::ready;
|
use futures::future::ready;
|
||||||
use futures::stream::{once, iter, StreamExt};
|
use futures::stream::{once, iter, StreamExt};
|
||||||
|
|
||||||
const HEARTBEAT: &str = ":\n\n";
|
const HEARTBEAT: &str = ":\n";
|
||||||
|
|
||||||
// Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
|
// Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
|
||||||
// get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
|
// get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
|
||||||
|
|
|
@ -39,7 +39,7 @@ use rocket::tokio::time::{self, Duration};
|
||||||
use rocket::futures::stream::{repeat, StreamExt};
|
use rocket::futures::stream::{repeat, StreamExt};
|
||||||
|
|
||||||
use rocket::Shutdown;
|
use rocket::Shutdown;
|
||||||
use rocket::response::stream::TextStream;
|
use rocket::response::stream::{TextStream, EventStream, Event};
|
||||||
|
|
||||||
#[get("/stream/hi")]
|
#[get("/stream/hi")]
|
||||||
fn many_his() -> TextStream![&'static str] {
|
fn many_his() -> TextStream![&'static str] {
|
||||||
|
@ -62,6 +62,42 @@ fn one_hi_per_ms(mut shutdown: Shutdown, n: u8) -> TextStream![&'static str] {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[get("/progress", rank = 2)]
|
||||||
|
fn progress_page() -> RawHtml<&'static str> {
|
||||||
|
RawHtml(r#"
|
||||||
|
<script type="text/javascript">
|
||||||
|
const evtSource = new EventSource("progress");
|
||||||
|
evtSource.addEventListener("progress", (event) => {
|
||||||
|
const el = document.getElementById("prog");
|
||||||
|
el.textContent = event.data + "%";
|
||||||
|
});
|
||||||
|
evtSource.addEventListener("done", (_) => {
|
||||||
|
const el = document.getElementById("prog");
|
||||||
|
el.textContent = "done";
|
||||||
|
evtSource.close()
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
|
||||||
|
<p id="prog"></p>
|
||||||
|
"#)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[get("/progress", format = "text/event-stream", rank = 1)]
|
||||||
|
fn progress_stream() -> EventStream![] {
|
||||||
|
let stream = EventStream! {
|
||||||
|
let mut interval = time::interval(Duration::from_secs(1));
|
||||||
|
|
||||||
|
for count in 0..100 {
|
||||||
|
interval.tick().await;
|
||||||
|
yield Event::data(count.to_string()).event("progress");
|
||||||
|
}
|
||||||
|
|
||||||
|
yield Event::data("").event("done");
|
||||||
|
};
|
||||||
|
|
||||||
|
stream.heartbeat(Duration::from_secs(3))
|
||||||
|
}
|
||||||
|
|
||||||
/***************************** `Redirect` Responder ***************************/
|
/***************************** `Redirect` Responder ***************************/
|
||||||
|
|
||||||
use rocket::response::Redirect;
|
use rocket::response::Redirect;
|
||||||
|
@ -178,6 +214,7 @@ async fn custom(kind: Option<Kind>) -> StoredData {
|
||||||
fn rocket() -> _ {
|
fn rocket() -> _ {
|
||||||
rocket::build()
|
rocket::build()
|
||||||
.mount("/", routes![many_his, one_hi_per_ms, file, upload, delete])
|
.mount("/", routes![many_his, one_hi_per_ms, file, upload, delete])
|
||||||
|
.mount("/", routes![progress_stream, progress_page])
|
||||||
.mount("/", routes![redir_root, redir_login, maybe_redir])
|
.mount("/", routes![redir_root, redir_login, maybe_redir])
|
||||||
.mount("/", routes![xml, json, json_or_msgpack])
|
.mount("/", routes![xml, json, json_or_msgpack])
|
||||||
.mount("/", routes![custom])
|
.mount("/", routes![custom])
|
||||||
|
|
Loading…
Reference in New Issue