From 336a03e27f43d0ec3b02d8b99e10d2df202523ad Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Wed, 28 Apr 2021 02:07:32 -0700 Subject: [PATCH] Introduce async streams. This reworks the entire 'response::stream' module for async streams. Resolves #1066. --- core/lib/Cargo.toml | 4 + core/lib/src/response/mod.rs | 1 + core/lib/src/response/stream.rs | 633 ++++++++++++++++++++++++++++--- examples/responders/src/main.rs | 65 +++- examples/responders/src/tests.rs | 62 ++- site/guide/5-responses.md | 59 ++- 6 files changed, 729 insertions(+), 95 deletions(-) diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 74c2de1e..ac63f10c 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -45,6 +45,10 @@ indexmap = { version = "1.0", features = ["serde-1"] } tempfile = "3" async-trait = "0.1.43" +[dependencies.async-stream] +git = "https://github.com/SergioBenitez/async-stream.git" +rev = "c46ada9" + [dependencies.state] git = "https://github.com/SergioBenitez/state.git" rev = "8f94dc" diff --git a/core/lib/src/response/mod.rs b/core/lib/src/response/mod.rs index 3e951248..f9d7afef 100644 --- a/core/lib/src/response/mod.rs +++ b/core/lib/src/response/mod.rs @@ -34,6 +34,7 @@ pub(crate) mod flash; pub mod content; pub mod status; +pub mod stream; #[doc(hidden)] pub use rocket_codegen::Responder; diff --git a/core/lib/src/response/stream.rs b/core/lib/src/response/stream.rs index cfe6997b..cd5638f4 100644 --- a/core/lib/src/response/stream.rs +++ b/core/lib/src/response/stream.rs @@ -1,73 +1,616 @@ -use std::fmt::{self, Debug}; +//! Potentially infinite async [`Stream`] response types. +//! +//! A [`Stream`] is the async analog of an `Iterator`: it +//! generates a sequence of values asynchronously, otherwise known as an async +//! _generator_. Types in this module allow for returning responses that are +//! streams. +//! +//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html +//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html +//! +//! # Raw Streams +//! +//! Rust does not yet natively support syntax for creating arbitrary generators, +//! and as such, for creating streams. To ameliorate this, Rocket exports +//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to +//! be defined using `yield` and `for await` syntax: +//! +//! ```rust +//! use rocket::futures::stream::Stream; +//! use rocket::response::stream::stream; +//! +//! fn make_stream() -> impl Stream { +//! stream! { +//! for i in 0..3 { +//! yield i; +//! } +//! } +//! } +//! ``` +//! +//! See [`stream!`] for full usage details. +//! +//! # Typed Streams +//! +//! A raw stream is not a `Responder`, so it cannot be directly returned from a +//! route handler. Instead, one of three _typed_ streams may be used. Each typed +//! stream places type bounds on the `Item` of the stream, allowing for +//! `Responder` implementation on the stream itself. +//! +//! Each typed stream exists both as a type and as a macro. They are: +//! +//! * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead` +//! * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>` +//! * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef` +//! +//! Each type implements `Responder`; each macro can be invoked to generate a +//! typed stream, exactly like [`stream!`] above. Additionally, each macro is +//! also a _type_ macro, expanding to a wrapped `impl Stream`, where +//! `$T` is the input to the macro. +//! +//! As a concrete example, the route below produces an infinite series of +//! `"hello"`s, one per second: +//! +//! ```rust +//! # use rocket::get; +//! use rocket::tokio::time::{self, Duration}; +//! use rocket::response::stream::TextStream; +//! +//! /// Produce an infinite series of `"hello"`s, one per second. +//! #[get("/infinite-hellos")] +//! fn hello() -> TextStream![&'static str] { +//! TextStream! { +//! let mut interval = time::interval(Duration::from_secs(1)); +//! loop { +//! yield "hello"; +//! interval.tick().await; +//! } +//! } +//! } +//! ``` +//! +//! The `TextStream![&'static str]` invocation expands to: +//! +//! ```rust +//! # use rocket::response::stream::TextStream; +//! # use rocket::futures::stream::Stream; +//! # use rocket::response::stream::stream; +//! # fn f() -> +//! TextStream> +//! # { TextStream::from(stream! { yield "hi" }) } +//! ``` +//! +//! While the inner `TextStream! { .. }` invocation expands to: +//! +//! ```rust +//! # use rocket::response::stream::{TextStream, stream}; +//! TextStream::from(stream! { /* .. */ }) +//! # ; +//! ``` +//! +//! The expansions are identical for `ReaderStream` and `ByteStream`, with +//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively. +//! +//! # Graceful Shutdown +//! +//! Infinite responders, like the one defined in `hello` above, will prolong +//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for +//! the defined grace period. After the grace period has elapsed, Rocket will +//! abruptly terminate the responder. +//! +//! To avoid abrupt termination, graceful shutdown can be detected via the +//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to +//! gracefully shut itself down. The following example modifies the previous +//! `hello` with shutdown detection: +//! +//! ```rust +//! # use rocket::get; +//! use rocket::Shutdown; +//! use rocket::response::stream::TextStream; +//! use rocket::tokio::select; +//! use rocket::tokio::time::{self, Duration}; +//! +//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown. +//! #[get("/infinite-hellos")] +//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] { +//! TextStream! { +//! let mut interval = time::interval(Duration::from_secs(1)); +//! loop { +//! select! { +//! _ = interval.tick() => yield "hello", +//! _ = &mut shutdown => { +//! yield "goodbye"; +//! break; +//! } +//! }; +//! } +//! } +//! } +//! ``` -use tokio::io::AsyncRead; +use std::{fmt, io}; +use std::task::{Context, Poll}; +use std::pin::Pin; + +use futures::stream::{Stream, StreamExt}; +use tokio::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; use crate::request::Request; -use crate::response::{self, Response, Responder, DEFAULT_CHUNK_SIZE}; +use crate::response::{self, Response, Responder}; +use crate::http::ContentType; -/// Streams a response to a client from an arbitrary `AsyncRead`er type. -/// -/// The client is sent a "chunked" response, where the chunk size is at most -/// 4KiB. This means that at most 4KiB are stored in memory while the response -/// is being sent. This type should be used when sending responses that are -/// arbitrarily large in size, such as when streaming from a local socket. -pub struct Stream(T, usize); - -impl Stream { - /// Create a new stream from the given `reader` and sets the chunk size for - /// each streamed chunk to `chunk_size` bytes. +pin_project! { + /// A potentially infinite stream of readers: `T: AsyncRead`. + /// + /// A `ReaderStream` can be constructed from any [`Stream`] of items of type + /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using + /// [`ReaderStream::one()`]. Because a `ReaderStream` is itself `AsyncRead`, + /// it can be used as a building-block for other stream-based responders as + /// a `streamed_body`, though it may also be used as a responder itself. + /// + /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html + /// + /// ```rust + /// use std::io::Cursor; + /// + /// use rocket::{Request, Response}; + /// use rocket::futures::stream::{Stream, StreamExt}; + /// use rocket::response::{self, Responder, stream::ReaderStream}; + /// use rocket::http::ContentType; + /// + /// struct MyStream(S); + /// + /// impl<'r, S: Stream> Responder<'r, 'r> for MyStream + /// where S: Send + 'r + /// { + /// fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> { + /// Response::build() + /// .header(ContentType::Text) + /// .streamed_body(ReaderStream::from(self.0.map(Cursor::new))) + /// .ok() + /// } + /// } + /// ``` + /// + /// # Responder + /// + /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type` + /// is set. The body is [unsized](crate::response::Body#unsized), and values + /// are sent as soon as they are yielded by the internal iterator. /// /// # Example /// - /// Stream a response from whatever is in `stdin` with a chunk size of 10 - /// bytes. Note: you probably shouldn't do this. - /// /// ```rust - /// use rocket::response::Stream; + /// # use rocket::*; + /// use rocket::response::stream::ReaderStream; + /// use rocket::futures::stream::{repeat, StreamExt}; + /// use rocket::tokio::time::{self, Duration}; + /// use rocket::tokio::fs::File; /// - /// # #[allow(unused_variables)] - /// let response = Stream::chunked(tokio::io::stdin(), 10); + /// #[get("/reader/stream")] + /// fn stream() -> ReaderStream![File] { + /// ReaderStream! { + /// let paths = &["safe/path", "another/safe/path"]; + /// for path in paths { + /// if let Ok(file) = File::open(path).await { + /// yield file; + /// } + /// } + /// } + /// } + /// + /// #[get("/reader/stream/one")] + /// async fn stream_one() -> std::io::Result { + /// let file = File::open("safe/path").await?; + /// Ok(ReaderStream::one(file)) + /// } /// ``` - pub fn chunked(reader: T, chunk_size: usize) -> Stream { - Stream(reader, chunk_size) + /// + /// The syntax of `ReaderStream` as an expression is identical to that of + /// [`stream!`]. + pub struct ReaderStream { + #[pin] + stream: S, + #[pin] + state: State, } } -impl Debug for Stream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("Stream").field(&self.0).finish() +pin_project! { + #[project = StateProjection] + #[derive(Debug)] + enum State { + Pending, + Reading { #[pin] reader: R }, + Done, } } -/// Create a new stream from the given `reader`. +/// A potentially infinite stream of bytes: any `T: AsRef<[u8]>`. +/// +/// A `ByteStream` can be constructed from any [`Stream`] of items of type `T` +/// where `T: AsRef<[u8]>`. This includes `Vec`, `&[u8]`, `&str`, `&RawStr`, +/// and more. The stream can be constructed directly, via `ByteStream(..)` or +/// [`ByteStream::from()`], or through generator syntax via [`ByteStream!`]. +/// +/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html +/// +/// # Responder +/// +/// `ByteStream` is a (potentially infinite) responder. The response +/// `Content-Type` is set to [`Binary`](ContentType::Binary). The body is +/// [unsized](crate::response::Body#unsized), and values are sent as soon as +/// they are yielded by the internal iterator. /// /// # Example /// -/// Stream a response from whatever is in `stdin`. Note: you probably -/// shouldn't do this. +/// ```rust +/// # use rocket::*; +/// use rocket::response::stream::ByteStream; +/// use rocket::futures::stream::{repeat, StreamExt}; +/// use rocket::tokio::time::{self, Duration}; +/// +/// #[get("/bytes")] +/// fn bytes() -> ByteStream![&'static [u8]] { +/// ByteStream(repeat(&[1, 2, 3][..])) +/// } +/// +/// #[get("/byte/stream")] +/// fn stream() -> ByteStream![Vec] { +/// ByteStream! { +/// let mut interval = time::interval(Duration::from_secs(1)); +/// for i in 0..10u8 { +/// yield vec![i, i + 1, i + 2]; +/// interval.tick().await; +/// } +/// } +/// } +/// ``` +/// +/// The syntax of `ByteStream` as an expression is identical to that of +/// [`stream!`]. +#[derive(Debug, Clone)] +pub struct ByteStream(pub S); + +/// A potentially infinite stream of text: `T: AsRef`. +/// +/// A `TextStream` can be constructed from any [`Stream`] of items of type `T` +/// where `T: AsRef`. This includes `&str`, `String`, `Cow`, +/// `&RawStr`, and more. The stream can be constructed directly, via +/// `TextStream(..)` or [`TextStream::from()`], or through generator syntax via +/// [`TextStream!`]. +/// +/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html +/// +/// # Responder +/// +/// `TextStream` is a (potentially infinite) responder. The response +/// `Content-Type` is set to [`Text`](ContentType::Text). The body is +/// [unsized](crate::response::Body#unsized), and values are sent as soon as +/// they are yielded by the internal iterator. +/// +/// # Example /// /// ```rust -/// use rocket::response::Stream; +/// # use rocket::*; +/// use rocket::response::stream::TextStream; +/// use rocket::futures::stream::{repeat, StreamExt}; +/// use rocket::tokio::time::{self, Duration}; /// -/// # #[allow(unused_variables)] -/// let response = Stream::from(tokio::io::stdin()); +/// #[get("/text")] +/// fn text() -> TextStream![&'static str] { +/// TextStream(repeat("hi")) +/// } +/// +/// #[get("/text/stream")] +/// fn stream() -> TextStream![String] { +/// TextStream! { +/// let mut interval = time::interval(Duration::from_secs(1)); +/// for i in 0..10 { +/// yield format!("n: {}", i); +/// interval.tick().await; +/// } +/// } +/// } /// ``` -impl From for Stream { - fn from(reader: T) -> Self { - Stream(reader, DEFAULT_CHUNK_SIZE) +/// +/// The syntax of `TextStream` as an expression is identical to that of +/// [`stream!`]. +#[derive(Debug, Clone)] +pub struct TextStream(pub S); + +impl From for ReaderStream { + fn from(stream: S) -> Self { + ReaderStream { stream, state: State::Pending } } } -/// Sends a response to the client using the "Chunked" transfer encoding. The -/// maximum chunk size is 4KiB. -/// -/// # Failure -/// -/// If reading from the input stream fails at any point during the response, the -/// response is abandoned, and the response ends abruptly. An error is printed -/// to the console with an indication of what went wrong. -impl<'r, 'o: 'r, T: AsyncRead + Send + 'o> Responder<'r, 'o> for Stream { - fn respond_to(self, _: &'r Request<'_>) -> response::Result<'o> { - Response::build().chunked_body(self.0, self.1).ok() +impl From for TextStream { + /// Creates a `TextStream` from any [`S: Stream`](Stream). + fn from(stream: S) -> Self { + TextStream(stream) + } +} + +impl From for ByteStream { + /// Creates a `ByteStream` from any [`S: Stream`](Stream). + fn from(stream: S) -> Self { + ByteStream(stream) + } +} + +impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream + where S: Send + 'r, S::Item: AsyncRead + Send, +{ + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> { + Response::build() + .streamed_body(self) + .ok() + } +} + +/// A stream that yields a value exactly once. +/// +/// A `ReaderStream` which wraps this type and yields one `AsyncRead` type can +/// be created via [`ReaderStream::one()`]. +/// +/// # Example +/// +/// ```rust +/// use rocket::response::stream::Once; +/// use rocket::futures::stream::StreamExt; +/// +/// # rocket::async_test(async { +/// let mut stream = Once::from("hello!"); +/// let values: Vec<_> = stream.collect().await; +/// assert_eq!(values, ["hello!"]); +/// # }); +/// ``` +pub struct Once(Option); + +impl From for Once { + fn from(value: T) -> Self { + Once(Some(value)) + } +} + +impl Stream for Once { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(self.0.take()) + } +} + +impl ReaderStream> { + /// Create a `ReaderStream` that yields exactly one reader, streaming the + /// contents of the reader itself. + /// + /// # Example + /// + /// Stream the bytes from a remote TCP connection: + /// + /// ```rust + /// # use rocket::*; + /// use std::io; + /// use std::net::SocketAddr; + /// + /// use rocket::tokio::net::TcpStream; + /// use rocket::response::stream::ReaderStream; + /// + /// #[get("/stream")] + /// async fn stream() -> io::Result { + /// let addr = SocketAddr::from(([127, 0, 0, 1], 9999)); + /// let stream = TcpStream::connect(addr).await?; + /// Ok(ReaderStream::one(stream)) + /// } + /// ``` + pub fn one(reader: R) -> Self { + ReaderStream::from(Once::from(reader)) + } +} + +impl<'r, S: Stream> Responder<'r, 'r> for TextStream + where S: Send + 'r, S::Item: AsRef + Send + Unpin + 'r +{ + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> { + struct ByteStr(T); + + impl> AsRef<[u8]> for ByteStr { + fn as_ref(&self) -> &[u8] { + self.0.as_ref().as_bytes() + } + } + + let inner = self.0.map(ByteStr).map(io::Cursor::new); + Response::build() + .header(ContentType::Text) + .streamed_body(ReaderStream::from(inner)) + .ok() + } +} + +impl<'r, S: Stream> Responder<'r, 'r> for ByteStream + where S: Send + 'r, S::Item: AsRef<[u8]> + Send + Unpin + 'r +{ + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> { + Response::build() + .header(ContentType::Binary) + .streamed_body(ReaderStream::from(self.0.map(io::Cursor::new))) + .ok() + } +} + +impl AsyncRead for ReaderStream + where S::Item: AsyncRead + Send +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_> + ) -> Poll> { + let mut me = self.project(); + loop { + match me.state.as_mut().project() { + StateProjection::Pending => match me.stream.as_mut().poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => me.state.set(State::Done), + Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }), + }, + StateProjection::Reading { reader } => { + let init = buf.filled().len(); + match reader.poll_read(cx, buf) { + Poll::Ready(Ok(())) if buf.filled().len() == init => { + me.state.set(State::Pending); + }, + Poll::Ready(Ok(())) => return Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + }, + StateProjection::Done => return Poll::Ready(Ok(())), + } + } + } +} + +impl fmt::Debug for ReaderStream + where S::Item: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReaderStream") + .field("stream", &self.stream) + .field("state", &self.state) + .finish() + } +} + +crate::export! { + /// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax. + /// + /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html + /// + /// This macro takes any series of statements and expands them into an + /// expression of type `impl Stream`, a stream that `yield`s + /// elements of type `T`. It supports any Rust statement syntax with the + /// following additions: + /// + /// * `yield expr` + /// + /// Yields the result of evaluating `expr` to the caller (the stream + /// consumer). `expr` must be of type `T`. + /// + /// * `for await x in stream { .. }` + /// + /// `await`s the next element in `stream`, binds it to `x`, and + /// executes the block with the binding. `stream` must implement + /// `Stream`; the type of `x` is `T`. + /// + /// # Examples + /// + /// ```rust + /// use rocket::response::stream::stream; + /// use rocket::futures::stream::Stream; + /// + /// fn f(stream: impl Stream) -> impl Stream { + /// stream! { + /// for s in &["hi", "there"]{ + /// yield s.to_string(); + /// } + /// + /// for await n in stream { + /// yield format!("n: {}", n); + /// } + /// } + /// } + /// + /// # rocket::async_test(async { + /// use rocket::futures::stream::{self, StreamExt}; + /// + /// let stream = f(stream::iter(vec![3, 7, 11])); + /// let strings: Vec<_> = stream.collect().await; + /// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]); + /// # }); + /// ``` + /// + /// Using `?` on an `Err` short-cicuits stream termination: + /// + /// ```rust + /// use std::io; + /// + /// use rocket::response::stream::stream; + /// use rocket::futures::stream::Stream; + /// + /// fn g(stream: S) -> impl Stream> + /// where S: Stream> + /// { + /// stream! { + /// for await s in stream { + /// let num = s?.parse(); + /// let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + /// yield Ok(num); + /// } + /// } + /// } + /// + /// # rocket::async_test(async { + /// use rocket::futures::stream::{self, StreamExt}; + /// + /// let e = io::Error::last_os_error(); + /// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")])); + /// let results: Vec<_> = stream.collect().await; + /// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)])); + /// # }); + /// ``` + macro_rules! stream { + ($($t:tt)*) => ($crate::async_stream::stream!($($t)*)); + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! _typed_stream { + ($S:ident, $T:ty) => ( + $crate::response::stream::$S> + ); + ($S:ident, $($t:tt)*) => ( + $crate::response::stream::$S::from($crate::response::stream::stream!($($t)*)) + ); +} + +crate::export! { + /// Type and stream expression macro for [`struct@ReaderStream`]. + /// + /// See [`struct@ReaderStream`] and the [module level + /// docs](crate::response::stream#typed-streams) for usage details. + macro_rules! ReaderStream { + ($T:ty) => ($crate::_typed_stream!(ReaderStream, $T)); + ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*)); + } +} + +crate::export! { + /// Type and stream expression macro for [`struct@ByteStream`]. + /// + /// See [`struct@ByteStream`] and the [module level + /// docs](crate::response::stream#typed-streams) for usage details. + macro_rules! ByteStream { + ($T:ty) => ($crate::_typed_stream!(ByteStream, $T)); + ($($s:tt)*) => ($crate::_typed_stream!(ByteStream, $($s)*)); + } +} + +crate::export! { + /// Type and stream expression macro for [`struct@TextStream`]. + /// + /// See [`struct@TextStream`] and the [module level + /// docs](crate::response::stream#typed-streams) for usage details. + macro_rules! TextStream { + ($T:ty) => ($crate::_typed_stream!(TextStream, $T)); + ($($s:tt)*) => ($crate::_typed_stream!(TextStream, $($s)*)); } } diff --git a/examples/responders/src/main.rs b/examples/responders/src/main.rs index 97ca9aab..8ce0fc75 100644 --- a/examples/responders/src/main.rs +++ b/examples/responders/src/main.rs @@ -2,43 +2,66 @@ #[cfg(test)] mod tests; -/***************************** `Stream` Responder *****************************/ +/****************** `Result`, `Option` `NameFile` Responder *******************/ use std::{io, env}; -use rocket::tokio::fs::{self, File}; -use rocket::tokio::io::{repeat, AsyncRead, AsyncReadExt}; -use rocket::response::{content, Stream}; +use rocket::tokio::fs; + use rocket::data::{Capped, TempFile}; +use rocket::response::NamedFile; // Upload your `big_file.dat` by POSTing it to /upload. // try `curl --data-binary @file.txt http://127.0.0.1:8000/stream/file` const FILENAME: &str = "big_file.dat"; -#[get("/stream/a")] -fn many_as() -> content::Plain> { - content::Plain(Stream::from(repeat('a' as u8).take(25000))) -} - -#[get("/stream/file")] -async fn file() -> Option> { - // NOTE: Rocket _always_ streams data from an `AsyncRead`, even when - // `Stream` isn't used. By using `Stream`, however, the data is sent using - // chunked-encoding in HTTP 1.1. DATA frames are sent in HTTP/2. - File::open(env::temp_dir().join(FILENAME)).await.map(Stream::from).ok() -} - -#[post("/stream/file", data = "")] +// This is a *raw* file upload, _not_ a multipart upload! +#[post("/file", data = "")] async fn upload(mut file: Capped>) -> io::Result { file.persist_to(env::temp_dir().join(FILENAME)).await?; Ok(format!("{} bytes at {}", file.n.written, file.path().unwrap().display())) } -#[delete("/stream/file")] +#[get("/file")] +async fn file() -> Option { + NamedFile::open(env::temp_dir().join(FILENAME)).await.ok() +} + +#[delete("/file")] async fn delete() -> Option<()> { fs::remove_file(env::temp_dir().join(FILENAME)).await.ok() } +/***************************** `Stream` Responder *****************************/ + +use rocket::tokio::select; +use rocket::tokio::time::{self, Duration}; +use rocket::futures::stream::{repeat, StreamExt}; + +use rocket::Shutdown; +use rocket::response::stream::TextStream; + +#[get("/stream/hi")] +fn many_his() -> TextStream![&'static str] { + TextStream(repeat("hi").take(100)) +} + +#[get("/stream/hi/")] +fn one_hi_per_ms(mut shutdown: Shutdown, n: u8) -> TextStream![&'static str] { + TextStream! { + let mut interval = time::interval(Duration::from_millis(n as u64)); + loop { + select! { + _ = interval.tick() => yield "hi", + _ = &mut shutdown => { + yield "goodbye"; + break; + } + }; + } + } +} + /***************************** `Redirect` Responder ***************************/ use rocket::response::Redirect; @@ -64,6 +87,7 @@ fn maybe_redir(name: &str) -> Result<&'static str, Redirect> { /***************************** `content` Responders ***************************/ use rocket::Request; +use rocket::response::content; // NOTE: This example explicitly uses the `Json` type from `response::content` // for demonstration purposes. In a real application, _always_ prefer to use @@ -119,7 +143,6 @@ fn json_or_msgpack(kind: &str) -> Either, MsgPack<&'static [u use std::borrow::Cow; -use rocket::response::NamedFile; use rocket::response::content::Html; #[derive(Responder)] @@ -154,7 +177,7 @@ async fn custom(kind: Option) -> StoredData { #[launch] fn rocket() -> _ { rocket::build() - .mount("/", routes![many_as, file, upload, delete]) + .mount("/", routes![many_his, one_hi_per_ms, file, upload, delete]) .mount("/", routes![redir_root, redir_login, maybe_redir]) .mount("/", routes![xml, json, json_or_msgpack]) .mount("/", routes![custom]) diff --git a/examples/responders/src/tests.rs b/examples/responders/src/tests.rs index 17f43bef..3e3beae0 100644 --- a/examples/responders/src/tests.rs +++ b/examples/responders/src/tests.rs @@ -1,22 +1,11 @@ use rocket::local::blocking::Client; use rocket::http::Status; +/****************************** `File` Responder ******************************/ + // We use a lock to synchronize between tests so FS operations don't race. static FS_LOCK: parking_lot::Mutex<()> = parking_lot::const_mutex(()); -/***************************** `Stream` Responder *****************************/ - -#[test] -fn test_many_as() { - let client = Client::tracked(super::rocket()).unwrap(); - let res = client.get(uri!(super::many_as)).dispatch(); - - // Check that we have exactly 25,000 'a's. - let bytes = res.into_bytes().unwrap(); - assert_eq!(bytes.len(), 25000); - assert!(bytes.iter().all(|b| *b == b'a')); -} - #[test] fn test_file() { const CONTENTS: &str = "big_file contents...not so big here"; @@ -39,6 +28,53 @@ fn test_file() { assert_eq!(response.status(), Status::Ok); } +/***************************** `Stream` Responder *****************************/ + +#[test] +fn test_many_his() { + let client = Client::tracked(super::rocket()).unwrap(); + let res = client.get(uri!(super::many_his)).dispatch(); + + // Check that we have exactly 100 `hi`s. + let bytes = res.into_bytes().unwrap(); + assert_eq!(bytes.len(), 200); + assert!(bytes.chunks(2).all(|b| b == b"hi")); +} + +#[async_test] +async fn test_one_hi_per_second() { + use rocket::local::asynchronous::Client; + use rocket::tokio::time::{self, Instant, Duration}; + use rocket::tokio::{self, select}; + + // Listen for 1 second at 1 `hi` per 250ms, see if we get ~4 `hi`'s, then + // send a shutdown() signal, meaning we should get a `goodbye`. + let client = Client::tracked(super::rocket()).await.unwrap(); + let response = client.get(uri!(super::one_hi_per_ms: 250)).dispatch().await; + let response = response.into_string(); + let timer = time::sleep(Duration::from_secs(1)); + + tokio::pin!(timer, response); + let start = Instant::now(); + let response = loop { + select! { + _ = &mut timer => { + client.rocket().shutdown().notify(); + timer.as_mut().reset(Instant::now() + Duration::from_millis(100)); + if start.elapsed() > Duration::from_secs(2) { + panic!("responder did not terminate with shutdown"); + } + } + response = &mut response => break response.unwrap(), + } + }; + + match &*response { + "hihihigoodbye" | "hihihihigoodbye" | "hihihihihigoodbye" => { /* ok */ }, + s => panic!("unexpected response from infinite responder: {}", s) + } +} + /***************************** `Redirect` Responder ***************************/ #[test] diff --git a/site/guide/5-responses.md b/site/guide/5-responses.md index 5c9087bb..fae83ad8 100644 --- a/site/guide/5-responses.md +++ b/site/guide/5-responses.md @@ -287,35 +287,62 @@ library. Among these are: [`Stream`]: @api/rocket/response/struct.Stream.html [`Flash`]: @api/rocket/response/struct.Flash.html [`MsgPack`]: @api/rocket_contrib/msgpack/struct.MsgPack.html +[`rocket_contrib`]: @api/rocket_contrib/ -### Streaming +### Async Streams -The `Stream` type deserves special attention. When a large amount of data needs -to be sent to the client, it is better to stream the data to the client to avoid -consuming large amounts of memory. Rocket provides the [`Stream`] type, making -this easy. The `Stream` type can be created from any `AsyncRead` type. For -example, to stream from a local Unix stream, we might write: +The [`stream`] responders allow serving potentially infinite async [`Stream`]s. +A stream can be created from any async `Stream` or `AsyncRead` type, or via +generator syntax using the [`stream!`] macro and its typed equivalents. + +The simplest version creates a [`ReaderStream`] from a single `AsyncRead` type. +For example, to stream from a TCP connection, we might write: ```rust -# #[macro_use] extern crate rocket; -# fn main() {} - -# mod test { +# use rocket::*; +use std::io; use std::net::SocketAddr; -use rocket::response::{Stream, Debug}; - use rocket::tokio::net::TcpStream; +use rocket::response::stream::ReaderStream; #[get("/stream")] -async fn stream() -> Result, Debug> { +async fn stream() -> io::Result { let addr = SocketAddr::from(([127, 0, 0, 1], 9999)); - Ok(TcpStream::connect(addr).await.map(Stream::from)?) + let stream = TcpStream::connect(addr).await?; + Ok(ReaderStream::one(stream)) } -# } ``` -[`rocket_contrib`]: @api/rocket_contrib/ +Streams can also be created using generator syntax. The following example +returns an infinite [`TextStream`] that produces one `"hello"` every second: + +```rust +# use rocket::get; +use rocket::tokio::time::{self, Duration}; +use rocket::response::stream::TextStream; + +/// Produce an infinite series of `"hello"`s, one per second. +#[get("/infinite-hellos")] +fn hello() -> TextStream![&'static str] { + TextStream! { + let mut interval = time::interval(Duration::from_secs(1)); + loop { + yield "hello"; + interval.tick().await; + } + } +} +``` + +See the [`stream`] docs for full details on creating streams including notes on +how to detect and handle graceful shutdown requests. + +[`stream`]: @api/rocket/response/stream/index.html +[`stream!`]: @api/rocket/response/stream/macro.stream.html +[`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html +[`ReaderStream`]: @api/rocket/response/stream/struct.ReaderStream.html +[`TextStream`]: @api/rocket/response/stream/struct.TextStream.html ### JSON