Introduce async streams.

This reworks the entire 'response::stream' module for async streams.

Resolves #1066.
This commit is contained in:
Sergio Benitez 2021-04-28 02:07:32 -07:00
parent a72e8da735
commit 336a03e27f
6 changed files with 729 additions and 95 deletions

View File

@ -45,6 +45,10 @@ indexmap = { version = "1.0", features = ["serde-1"] }
tempfile = "3" tempfile = "3"
async-trait = "0.1.43" async-trait = "0.1.43"
[dependencies.async-stream]
git = "https://github.com/SergioBenitez/async-stream.git"
rev = "c46ada9"
[dependencies.state] [dependencies.state]
git = "https://github.com/SergioBenitez/state.git" git = "https://github.com/SergioBenitez/state.git"
rev = "8f94dc" rev = "8f94dc"

View File

@ -34,6 +34,7 @@ pub(crate) mod flash;
pub mod content; pub mod content;
pub mod status; pub mod status;
pub mod stream;
#[doc(hidden)] #[doc(hidden)]
pub use rocket_codegen::Responder; pub use rocket_codegen::Responder;

View File

@ -1,73 +1,616 @@
use std::fmt::{self, Debug}; //! Potentially infinite async [`Stream`] response types.
//!
//! A [`Stream<Item = T>`] is the async analog of an `Iterator<Item = T>`: it
//! generates a sequence of values asynchronously, otherwise known as an async
//! _generator_. Types in this module allow for returning responses that are
//! streams.
//!
//! [`Stream<Item = T>`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
//!
//! # Raw Streams
//!
//! Rust does not yet natively support syntax for creating arbitrary generators,
//! and as such, for creating streams. To ameliorate this, Rocket exports
//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to
//! be defined using `yield` and `for await` syntax:
//!
//! ```rust
//! use rocket::futures::stream::Stream;
//! use rocket::response::stream::stream;
//!
//! fn make_stream() -> impl Stream<Item = u8> {
//! stream! {
//! for i in 0..3 {
//! yield i;
//! }
//! }
//! }
//! ```
//!
//! See [`stream!`] for full usage details.
//!
//! # Typed Streams
//!
//! A raw stream is not a `Responder`, so it cannot be directly returned from a
//! route handler. Instead, one of three _typed_ streams may be used. Each typed
//! stream places type bounds on the `Item` of the stream, allowing for
//! `Responder` implementation on the stream itself.
//!
//! Each typed stream exists both as a type and as a macro. They are:
//!
//! * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
//! * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
//! * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
//!
//! Each type implements `Responder`; each macro can be invoked to generate a
//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
//! also a _type_ macro, expanding to a wrapped `impl Stream<Item = $T>`, where
//! `$T` is the input to the macro.
//!
//! As a concrete example, the route below produces an infinite series of
//! `"hello"`s, one per second:
//!
//! ```rust
//! # use rocket::get;
//! use rocket::tokio::time::{self, Duration};
//! use rocket::response::stream::TextStream;
//!
//! /// Produce an infinite series of `"hello"`s, one per second.
//! #[get("/infinite-hellos")]
//! fn hello() -> TextStream![&'static str] {
//! TextStream! {
//! let mut interval = time::interval(Duration::from_secs(1));
//! loop {
//! yield "hello";
//! interval.tick().await;
//! }
//! }
//! }
//! ```
//!
//! The `TextStream![&'static str]` invocation expands to:
//!
//! ```rust
//! # use rocket::response::stream::TextStream;
//! # use rocket::futures::stream::Stream;
//! # use rocket::response::stream::stream;
//! # fn f() ->
//! TextStream<impl Stream<Item = &'static str>>
//! # { TextStream::from(stream! { yield "hi" }) }
//! ```
//!
//! While the inner `TextStream! { .. }` invocation expands to:
//!
//! ```rust
//! # use rocket::response::stream::{TextStream, stream};
//! TextStream::from(stream! { /* .. */ })
//! # ;
//! ```
//!
//! The expansions are identical for `ReaderStream` and `ByteStream`, with
//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively.
//!
//! # Graceful Shutdown
//!
//! Infinite responders, like the one defined in `hello` above, will prolong
//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for
//! the defined grace period. After the grace period has elapsed, Rocket will
//! abruptly terminate the responder.
//!
//! To avoid abrupt termination, graceful shutdown can be detected via the
//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to
//! gracefully shut itself down. The following example modifies the previous
//! `hello` with shutdown detection:
//!
//! ```rust
//! # use rocket::get;
//! use rocket::Shutdown;
//! use rocket::response::stream::TextStream;
//! use rocket::tokio::select;
//! use rocket::tokio::time::{self, Duration};
//!
//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
//! #[get("/infinite-hellos")]
//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
//! TextStream! {
//! let mut interval = time::interval(Duration::from_secs(1));
//! loop {
//! select! {
//! _ = interval.tick() => yield "hello",
//! _ = &mut shutdown => {
//! yield "goodbye";
//! break;
//! }
//! };
//! }
//! }
//! }
//! ```
use tokio::io::AsyncRead; use std::{fmt, io};
use std::task::{Context, Poll};
use std::pin::Pin;
use futures::stream::{Stream, StreamExt};
use tokio::io::{AsyncRead, ReadBuf};
use pin_project_lite::pin_project;
use crate::request::Request; use crate::request::Request;
use crate::response::{self, Response, Responder, DEFAULT_CHUNK_SIZE}; use crate::response::{self, Response, Responder};
use crate::http::ContentType;
/// Streams a response to a client from an arbitrary `AsyncRead`er type. pin_project! {
/// /// A potentially infinite stream of readers: `T: AsyncRead`.
/// The client is sent a "chunked" response, where the chunk size is at most ///
/// 4KiB. This means that at most 4KiB are stored in memory while the response /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
/// is being sent. This type should be used when sending responses that are /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
/// arbitrarily large in size, such as when streaming from a local socket. /// [`ReaderStream::one()`]. Because a `ReaderStream` is itself `AsyncRead`,
pub struct Stream<T: AsyncRead>(T, usize); /// it can be used as a building-block for other stream-based responders as
/// a `streamed_body`, though it may also be used as a responder itself.
impl<T: AsyncRead> Stream<T> { ///
/// Create a new stream from the given `reader` and sets the chunk size for /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
/// each streamed chunk to `chunk_size` bytes. ///
/// ```rust
/// use std::io::Cursor;
///
/// use rocket::{Request, Response};
/// use rocket::futures::stream::{Stream, StreamExt};
/// use rocket::response::{self, Responder, stream::ReaderStream};
/// use rocket::http::ContentType;
///
/// struct MyStream<S>(S);
///
/// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
/// where S: Send + 'r
/// {
/// fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
/// Response::build()
/// .header(ContentType::Text)
/// .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
/// .ok()
/// }
/// }
/// ```
///
/// # Responder
///
/// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
/// is set. The body is [unsized](crate::response::Body#unsized), and values
/// are sent as soon as they are yielded by the internal iterator.
/// ///
/// # Example /// # Example
/// ///
/// Stream a response from whatever is in `stdin` with a chunk size of 10
/// bytes. Note: you probably shouldn't do this.
///
/// ```rust /// ```rust
/// use rocket::response::Stream; /// # use rocket::*;
/// use rocket::response::stream::ReaderStream;
/// use rocket::futures::stream::{repeat, StreamExt};
/// use rocket::tokio::time::{self, Duration};
/// use rocket::tokio::fs::File;
/// ///
/// # #[allow(unused_variables)] /// #[get("/reader/stream")]
/// let response = Stream::chunked(tokio::io::stdin(), 10); /// fn stream() -> ReaderStream![File] {
/// ReaderStream! {
/// let paths = &["safe/path", "another/safe/path"];
/// for path in paths {
/// if let Ok(file) = File::open(path).await {
/// yield file;
/// }
/// }
/// }
/// }
///
/// #[get("/reader/stream/one")]
/// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
/// let file = File::open("safe/path").await?;
/// Ok(ReaderStream::one(file))
/// }
/// ``` /// ```
pub fn chunked(reader: T, chunk_size: usize) -> Stream<T> { ///
Stream(reader, chunk_size) /// The syntax of `ReaderStream` as an expression is identical to that of
/// [`stream!`].
pub struct ReaderStream<S: Stream> {
#[pin]
stream: S,
#[pin]
state: State<S::Item>,
} }
} }
impl<T: AsyncRead + Debug> Debug for Stream<T> { pin_project! {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[project = StateProjection]
f.debug_tuple("Stream").field(&self.0).finish() #[derive(Debug)]
enum State<R> {
Pending,
Reading { #[pin] reader: R },
Done,
} }
} }
/// Create a new stream from the given `reader`. /// A potentially infinite stream of bytes: any `T: AsRef<[u8]>`.
///
/// A `ByteStream` can be constructed from any [`Stream`] of items of type `T`
/// where `T: AsRef<[u8]>`. This includes `Vec<u8>`, `&[u8]`, `&str`, `&RawStr`,
/// and more. The stream can be constructed directly, via `ByteStream(..)` or
/// [`ByteStream::from()`], or through generator syntax via [`ByteStream!`].
///
/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
///
/// # Responder
///
/// `ByteStream` is a (potentially infinite) responder. The response
/// `Content-Type` is set to [`Binary`](ContentType::Binary). The body is
/// [unsized](crate::response::Body#unsized), and values are sent as soon as
/// they are yielded by the internal iterator.
/// ///
/// # Example /// # Example
/// ///
/// Stream a response from whatever is in `stdin`. Note: you probably /// ```rust
/// shouldn't do this. /// # use rocket::*;
/// use rocket::response::stream::ByteStream;
/// use rocket::futures::stream::{repeat, StreamExt};
/// use rocket::tokio::time::{self, Duration};
///
/// #[get("/bytes")]
/// fn bytes() -> ByteStream![&'static [u8]] {
/// ByteStream(repeat(&[1, 2, 3][..]))
/// }
///
/// #[get("/byte/stream")]
/// fn stream() -> ByteStream![Vec<u8>] {
/// ByteStream! {
/// let mut interval = time::interval(Duration::from_secs(1));
/// for i in 0..10u8 {
/// yield vec![i, i + 1, i + 2];
/// interval.tick().await;
/// }
/// }
/// }
/// ```
///
/// The syntax of `ByteStream` as an expression is identical to that of
/// [`stream!`].
#[derive(Debug, Clone)]
pub struct ByteStream<S>(pub S);
/// A potentially infinite stream of text: `T: AsRef<str>`.
///
/// A `TextStream` can be constructed from any [`Stream`] of items of type `T`
/// where `T: AsRef<str>`. This includes `&str`, `String`, `Cow<str>`,
/// `&RawStr`, and more. The stream can be constructed directly, via
/// `TextStream(..)` or [`TextStream::from()`], or through generator syntax via
/// [`TextStream!`].
///
/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
///
/// # Responder
///
/// `TextStream` is a (potentially infinite) responder. The response
/// `Content-Type` is set to [`Text`](ContentType::Text). The body is
/// [unsized](crate::response::Body#unsized), and values are sent as soon as
/// they are yielded by the internal iterator.
///
/// # Example
/// ///
/// ```rust /// ```rust
/// use rocket::response::Stream; /// # use rocket::*;
/// use rocket::response::stream::TextStream;
/// use rocket::futures::stream::{repeat, StreamExt};
/// use rocket::tokio::time::{self, Duration};
/// ///
/// # #[allow(unused_variables)] /// #[get("/text")]
/// let response = Stream::from(tokio::io::stdin()); /// fn text() -> TextStream![&'static str] {
/// TextStream(repeat("hi"))
/// }
///
/// #[get("/text/stream")]
/// fn stream() -> TextStream![String] {
/// TextStream! {
/// let mut interval = time::interval(Duration::from_secs(1));
/// for i in 0..10 {
/// yield format!("n: {}", i);
/// interval.tick().await;
/// }
/// }
/// }
/// ``` /// ```
impl<T: AsyncRead> From<T> for Stream<T> { ///
fn from(reader: T) -> Self { /// The syntax of `TextStream` as an expression is identical to that of
Stream(reader, DEFAULT_CHUNK_SIZE) /// [`stream!`].
#[derive(Debug, Clone)]
pub struct TextStream<S>(pub S);
impl<S: Stream> From<S> for ReaderStream<S> {
fn from(stream: S) -> Self {
ReaderStream { stream, state: State::Pending }
} }
} }
/// Sends a response to the client using the "Chunked" transfer encoding. The impl<S> From<S> for TextStream<S> {
/// maximum chunk size is 4KiB. /// Creates a `TextStream` from any [`S: Stream`](Stream).
/// fn from(stream: S) -> Self {
/// # Failure TextStream(stream)
/// }
/// If reading from the input stream fails at any point during the response, the }
/// response is abandoned, and the response ends abruptly. An error is printed
/// to the console with an indication of what went wrong. impl<S> From<S> for ByteStream<S> {
impl<'r, 'o: 'r, T: AsyncRead + Send + 'o> Responder<'r, 'o> for Stream<T> { /// Creates a `ByteStream` from any [`S: Stream`](Stream).
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'o> { fn from(stream: S) -> Self {
Response::build().chunked_body(self.0, self.1).ok() ByteStream(stream)
}
}
impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
where S: Send + 'r, S::Item: AsyncRead + Send,
{
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
Response::build()
.streamed_body(self)
.ok()
}
}
/// A stream that yields a value exactly once.
///
/// A `ReaderStream` which wraps this type and yields one `AsyncRead` type can
/// be created via [`ReaderStream::one()`].
///
/// # Example
///
/// ```rust
/// use rocket::response::stream::Once;
/// use rocket::futures::stream::StreamExt;
///
/// # rocket::async_test(async {
/// let mut stream = Once::from("hello!");
/// let values: Vec<_> = stream.collect().await;
/// assert_eq!(values, ["hello!"]);
/// # });
/// ```
pub struct Once<T: Unpin>(Option<T>);
impl<T: Unpin> From<T> for Once<T> {
fn from(value: T) -> Self {
Once(Some(value))
}
}
impl<T: Unpin> Stream for Once<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.0.take())
}
}
impl<R: Unpin> ReaderStream<Once<R>> {
/// Create a `ReaderStream` that yields exactly one reader, streaming the
/// contents of the reader itself.
///
/// # Example
///
/// Stream the bytes from a remote TCP connection:
///
/// ```rust
/// # use rocket::*;
/// use std::io;
/// use std::net::SocketAddr;
///
/// use rocket::tokio::net::TcpStream;
/// use rocket::response::stream::ReaderStream;
///
/// #[get("/stream")]
/// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
/// let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
/// let stream = TcpStream::connect(addr).await?;
/// Ok(ReaderStream::one(stream))
/// }
/// ```
pub fn one(reader: R) -> Self {
ReaderStream::from(Once::from(reader))
}
}
impl<'r, S: Stream> Responder<'r, 'r> for TextStream<S>
where S: Send + 'r, S::Item: AsRef<str> + Send + Unpin + 'r
{
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
struct ByteStr<T>(T);
impl<T: AsRef<str>> AsRef<[u8]> for ByteStr<T> {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().as_bytes()
}
}
let inner = self.0.map(ByteStr).map(io::Cursor::new);
Response::build()
.header(ContentType::Text)
.streamed_body(ReaderStream::from(inner))
.ok()
}
}
impl<'r, S: Stream> Responder<'r, 'r> for ByteStream<S>
where S: Send + 'r, S::Item: AsRef<[u8]> + Send + Unpin + 'r
{
fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
Response::build()
.header(ContentType::Binary)
.streamed_body(ReaderStream::from(self.0.map(io::Cursor::new)))
.ok()
}
}
impl<S: Stream> AsyncRead for ReaderStream<S>
where S::Item: AsyncRead + Send
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<Result<(), io::Error>> {
let mut me = self.project();
loop {
match me.state.as_mut().project() {
StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => me.state.set(State::Done),
Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
},
StateProjection::Reading { reader } => {
let init = buf.filled().len();
match reader.poll_read(cx, buf) {
Poll::Ready(Ok(())) if buf.filled().len() == init => {
me.state.set(State::Pending);
},
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
},
StateProjection::Done => return Poll::Ready(Ok(())),
}
}
}
}
impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
where S::Item: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReaderStream")
.field("stream", &self.stream)
.field("state", &self.state)
.finish()
}
}
crate::export! {
/// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
///
/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
///
/// This macro takes any series of statements and expands them into an
/// expression of type `impl Stream<Item = T>`, a stream that `yield`s
/// elements of type `T`. It supports any Rust statement syntax with the
/// following additions:
///
/// * `yield expr`
///
/// Yields the result of evaluating `expr` to the caller (the stream
/// consumer). `expr` must be of type `T`.
///
/// * `for await x in stream { .. }`
///
/// `await`s the next element in `stream`, binds it to `x`, and
/// executes the block with the binding. `stream` must implement
/// `Stream<Item = T>`; the type of `x` is `T`.
///
/// # Examples
///
/// ```rust
/// use rocket::response::stream::stream;
/// use rocket::futures::stream::Stream;
///
/// fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
/// stream! {
/// for s in &["hi", "there"]{
/// yield s.to_string();
/// }
///
/// for await n in stream {
/// yield format!("n: {}", n);
/// }
/// }
/// }
///
/// # rocket::async_test(async {
/// use rocket::futures::stream::{self, StreamExt};
///
/// let stream = f(stream::iter(vec![3, 7, 11]));
/// let strings: Vec<_> = stream.collect().await;
/// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
/// # });
/// ```
///
/// Using `?` on an `Err` short-cicuits stream termination:
///
/// ```rust
/// use std::io;
///
/// use rocket::response::stream::stream;
/// use rocket::futures::stream::Stream;
///
/// fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
/// where S: Stream<Item = io::Result<&'static str>>
/// {
/// stream! {
/// for await s in stream {
/// let num = s?.parse();
/// let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
/// yield Ok(num);
/// }
/// }
/// }
///
/// # rocket::async_test(async {
/// use rocket::futures::stream::{self, StreamExt};
///
/// let e = io::Error::last_os_error();
/// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
/// let results: Vec<_> = stream.collect().await;
/// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));
/// # });
/// ```
macro_rules! stream {
($($t:tt)*) => ($crate::async_stream::stream!($($t)*));
}
}
#[doc(hidden)]
#[macro_export]
macro_rules! _typed_stream {
($S:ident, $T:ty) => (
$crate::response::stream::$S<impl $crate::futures::stream::Stream<Item = $T>>
);
($S:ident, $($t:tt)*) => (
$crate::response::stream::$S::from($crate::response::stream::stream!($($t)*))
);
}
crate::export! {
/// Type and stream expression macro for [`struct@ReaderStream`].
///
/// See [`struct@ReaderStream`] and the [module level
/// docs](crate::response::stream#typed-streams) for usage details.
macro_rules! ReaderStream {
($T:ty) => ($crate::_typed_stream!(ReaderStream, $T));
($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
}
}
crate::export! {
/// Type and stream expression macro for [`struct@ByteStream`].
///
/// See [`struct@ByteStream`] and the [module level
/// docs](crate::response::stream#typed-streams) for usage details.
macro_rules! ByteStream {
($T:ty) => ($crate::_typed_stream!(ByteStream, $T));
($($s:tt)*) => ($crate::_typed_stream!(ByteStream, $($s)*));
}
}
crate::export! {
/// Type and stream expression macro for [`struct@TextStream`].
///
/// See [`struct@TextStream`] and the [module level
/// docs](crate::response::stream#typed-streams) for usage details.
macro_rules! TextStream {
($T:ty) => ($crate::_typed_stream!(TextStream, $T));
($($s:tt)*) => ($crate::_typed_stream!(TextStream, $($s)*));
} }
} }

View File

@ -2,43 +2,66 @@
#[cfg(test)] mod tests; #[cfg(test)] mod tests;
/***************************** `Stream` Responder *****************************/ /****************** `Result`, `Option` `NameFile` Responder *******************/
use std::{io, env}; use std::{io, env};
use rocket::tokio::fs::{self, File}; use rocket::tokio::fs;
use rocket::tokio::io::{repeat, AsyncRead, AsyncReadExt};
use rocket::response::{content, Stream};
use rocket::data::{Capped, TempFile}; use rocket::data::{Capped, TempFile};
use rocket::response::NamedFile;
// Upload your `big_file.dat` by POSTing it to /upload. // Upload your `big_file.dat` by POSTing it to /upload.
// try `curl --data-binary @file.txt http://127.0.0.1:8000/stream/file` // try `curl --data-binary @file.txt http://127.0.0.1:8000/stream/file`
const FILENAME: &str = "big_file.dat"; const FILENAME: &str = "big_file.dat";
#[get("/stream/a")] // This is a *raw* file upload, _not_ a multipart upload!
fn many_as() -> content::Plain<Stream<impl AsyncRead>> { #[post("/file", data = "<file>")]
content::Plain(Stream::from(repeat('a' as u8).take(25000)))
}
#[get("/stream/file")]
async fn file() -> Option<Stream<File>> {
// NOTE: Rocket _always_ streams data from an `AsyncRead`, even when
// `Stream` isn't used. By using `Stream`, however, the data is sent using
// chunked-encoding in HTTP 1.1. DATA frames are sent in HTTP/2.
File::open(env::temp_dir().join(FILENAME)).await.map(Stream::from).ok()
}
#[post("/stream/file", data = "<file>")]
async fn upload(mut file: Capped<TempFile<'_>>) -> io::Result<String> { async fn upload(mut file: Capped<TempFile<'_>>) -> io::Result<String> {
file.persist_to(env::temp_dir().join(FILENAME)).await?; file.persist_to(env::temp_dir().join(FILENAME)).await?;
Ok(format!("{} bytes at {}", file.n.written, file.path().unwrap().display())) Ok(format!("{} bytes at {}", file.n.written, file.path().unwrap().display()))
} }
#[delete("/stream/file")] #[get("/file")]
async fn file() -> Option<NamedFile> {
NamedFile::open(env::temp_dir().join(FILENAME)).await.ok()
}
#[delete("/file")]
async fn delete() -> Option<()> { async fn delete() -> Option<()> {
fs::remove_file(env::temp_dir().join(FILENAME)).await.ok() fs::remove_file(env::temp_dir().join(FILENAME)).await.ok()
} }
/***************************** `Stream` Responder *****************************/
use rocket::tokio::select;
use rocket::tokio::time::{self, Duration};
use rocket::futures::stream::{repeat, StreamExt};
use rocket::Shutdown;
use rocket::response::stream::TextStream;
#[get("/stream/hi")]
fn many_his() -> TextStream![&'static str] {
TextStream(repeat("hi").take(100))
}
#[get("/stream/hi/<n>")]
fn one_hi_per_ms(mut shutdown: Shutdown, n: u8) -> TextStream![&'static str] {
TextStream! {
let mut interval = time::interval(Duration::from_millis(n as u64));
loop {
select! {
_ = interval.tick() => yield "hi",
_ = &mut shutdown => {
yield "goodbye";
break;
}
};
}
}
}
/***************************** `Redirect` Responder ***************************/ /***************************** `Redirect` Responder ***************************/
use rocket::response::Redirect; use rocket::response::Redirect;
@ -64,6 +87,7 @@ fn maybe_redir(name: &str) -> Result<&'static str, Redirect> {
/***************************** `content` Responders ***************************/ /***************************** `content` Responders ***************************/
use rocket::Request; use rocket::Request;
use rocket::response::content;
// NOTE: This example explicitly uses the `Json` type from `response::content` // NOTE: This example explicitly uses the `Json` type from `response::content`
// for demonstration purposes. In a real application, _always_ prefer to use // for demonstration purposes. In a real application, _always_ prefer to use
@ -119,7 +143,6 @@ fn json_or_msgpack(kind: &str) -> Either<Json<&'static str>, MsgPack<&'static [u
use std::borrow::Cow; use std::borrow::Cow;
use rocket::response::NamedFile;
use rocket::response::content::Html; use rocket::response::content::Html;
#[derive(Responder)] #[derive(Responder)]
@ -154,7 +177,7 @@ async fn custom(kind: Option<Kind>) -> StoredData {
#[launch] #[launch]
fn rocket() -> _ { fn rocket() -> _ {
rocket::build() rocket::build()
.mount("/", routes![many_as, file, upload, delete]) .mount("/", routes![many_his, one_hi_per_ms, file, upload, delete])
.mount("/", routes![redir_root, redir_login, maybe_redir]) .mount("/", routes![redir_root, redir_login, maybe_redir])
.mount("/", routes![xml, json, json_or_msgpack]) .mount("/", routes![xml, json, json_or_msgpack])
.mount("/", routes![custom]) .mount("/", routes![custom])

View File

@ -1,22 +1,11 @@
use rocket::local::blocking::Client; use rocket::local::blocking::Client;
use rocket::http::Status; use rocket::http::Status;
/****************************** `File` Responder ******************************/
// We use a lock to synchronize between tests so FS operations don't race. // We use a lock to synchronize between tests so FS operations don't race.
static FS_LOCK: parking_lot::Mutex<()> = parking_lot::const_mutex(()); static FS_LOCK: parking_lot::Mutex<()> = parking_lot::const_mutex(());
/***************************** `Stream` Responder *****************************/
#[test]
fn test_many_as() {
let client = Client::tracked(super::rocket()).unwrap();
let res = client.get(uri!(super::many_as)).dispatch();
// Check that we have exactly 25,000 'a's.
let bytes = res.into_bytes().unwrap();
assert_eq!(bytes.len(), 25000);
assert!(bytes.iter().all(|b| *b == b'a'));
}
#[test] #[test]
fn test_file() { fn test_file() {
const CONTENTS: &str = "big_file contents...not so big here"; const CONTENTS: &str = "big_file contents...not so big here";
@ -39,6 +28,53 @@ fn test_file() {
assert_eq!(response.status(), Status::Ok); assert_eq!(response.status(), Status::Ok);
} }
/***************************** `Stream` Responder *****************************/
#[test]
fn test_many_his() {
let client = Client::tracked(super::rocket()).unwrap();
let res = client.get(uri!(super::many_his)).dispatch();
// Check that we have exactly 100 `hi`s.
let bytes = res.into_bytes().unwrap();
assert_eq!(bytes.len(), 200);
assert!(bytes.chunks(2).all(|b| b == b"hi"));
}
#[async_test]
async fn test_one_hi_per_second() {
use rocket::local::asynchronous::Client;
use rocket::tokio::time::{self, Instant, Duration};
use rocket::tokio::{self, select};
// Listen for 1 second at 1 `hi` per 250ms, see if we get ~4 `hi`'s, then
// send a shutdown() signal, meaning we should get a `goodbye`.
let client = Client::tracked(super::rocket()).await.unwrap();
let response = client.get(uri!(super::one_hi_per_ms: 250)).dispatch().await;
let response = response.into_string();
let timer = time::sleep(Duration::from_secs(1));
tokio::pin!(timer, response);
let start = Instant::now();
let response = loop {
select! {
_ = &mut timer => {
client.rocket().shutdown().notify();
timer.as_mut().reset(Instant::now() + Duration::from_millis(100));
if start.elapsed() > Duration::from_secs(2) {
panic!("responder did not terminate with shutdown");
}
}
response = &mut response => break response.unwrap(),
}
};
match &*response {
"hihihigoodbye" | "hihihihigoodbye" | "hihihihihigoodbye" => { /* ok */ },
s => panic!("unexpected response from infinite responder: {}", s)
}
}
/***************************** `Redirect` Responder ***************************/ /***************************** `Redirect` Responder ***************************/
#[test] #[test]

View File

@ -287,35 +287,62 @@ library. Among these are:
[`Stream`]: @api/rocket/response/struct.Stream.html [`Stream`]: @api/rocket/response/struct.Stream.html
[`Flash`]: @api/rocket/response/struct.Flash.html [`Flash`]: @api/rocket/response/struct.Flash.html
[`MsgPack`]: @api/rocket_contrib/msgpack/struct.MsgPack.html [`MsgPack`]: @api/rocket_contrib/msgpack/struct.MsgPack.html
[`rocket_contrib`]: @api/rocket_contrib/
### Streaming ### Async Streams
The `Stream` type deserves special attention. When a large amount of data needs The [`stream`] responders allow serving potentially infinite async [`Stream`]s.
to be sent to the client, it is better to stream the data to the client to avoid A stream can be created from any async `Stream` or `AsyncRead` type, or via
consuming large amounts of memory. Rocket provides the [`Stream`] type, making generator syntax using the [`stream!`] macro and its typed equivalents.
this easy. The `Stream` type can be created from any `AsyncRead` type. For
example, to stream from a local Unix stream, we might write: The simplest version creates a [`ReaderStream`] from a single `AsyncRead` type.
For example, to stream from a TCP connection, we might write:
```rust ```rust
# #[macro_use] extern crate rocket; # use rocket::*;
# fn main() {} use std::io;
# mod test {
use std::net::SocketAddr; use std::net::SocketAddr;
use rocket::response::{Stream, Debug};
use rocket::tokio::net::TcpStream; use rocket::tokio::net::TcpStream;
use rocket::response::stream::ReaderStream;
#[get("/stream")] #[get("/stream")]
async fn stream() -> Result<Stream<TcpStream>, Debug<std::io::Error>> { async fn stream() -> io::Result<ReaderStream![TcpStream]> {
let addr = SocketAddr::from(([127, 0, 0, 1], 9999)); let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
Ok(TcpStream::connect(addr).await.map(Stream::from)?) let stream = TcpStream::connect(addr).await?;
Ok(ReaderStream::one(stream))
} }
# }
``` ```
[`rocket_contrib`]: @api/rocket_contrib/ Streams can also be created using generator syntax. The following example
returns an infinite [`TextStream`] that produces one `"hello"` every second:
```rust
# use rocket::get;
use rocket::tokio::time::{self, Duration};
use rocket::response::stream::TextStream;
/// Produce an infinite series of `"hello"`s, one per second.
#[get("/infinite-hellos")]
fn hello() -> TextStream![&'static str] {
TextStream! {
let mut interval = time::interval(Duration::from_secs(1));
loop {
yield "hello";
interval.tick().await;
}
}
}
```
See the [`stream`] docs for full details on creating streams including notes on
how to detect and handle graceful shutdown requests.
[`stream`]: @api/rocket/response/stream/index.html
[`stream!`]: @api/rocket/response/stream/macro.stream.html
[`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
[`ReaderStream`]: @api/rocket/response/stream/struct.ReaderStream.html
[`TextStream`]: @api/rocket/response/stream/struct.TextStream.html
### JSON ### JSON