diff --git a/contrib/ws/README.md b/contrib/ws/README.md index 37759707..b77e9822 100644 --- a/contrib/ws/README.md +++ b/contrib/ws/README.md @@ -21,15 +21,15 @@ This crate provides WebSocket support for Rocket via integration with Rocket's 2. Use it! - ```rust - #[get("/echo")] - fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { - ws::stream! { ws => - for await message in ws { - yield message?; - } - } - } - ``` + ```rust + #[get("/echo")] + fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { + ws::stream! { ws => + for await message in ws { + yield message?; + } + } + } + ``` See the [crate docs] for full details. diff --git a/contrib/ws/src/duplex.rs b/contrib/ws/src/duplex.rs index 2a57ae90..76b5eac2 100644 --- a/contrib/ws/src/duplex.rs +++ b/contrib/ws/src/duplex.rs @@ -16,8 +16,7 @@ use crate::result::{Result, Error}; /// /// ```rust /// # use rocket::get; -/// use rocket_ws as ws; -/// +/// # use rocket_ws as ws; /// use rocket::futures::{SinkExt, StreamExt}; /// /// #[get("/echo/manual")] @@ -30,7 +29,7 @@ use crate::result::{Result, Error}; /// Ok(()) /// })) /// } -/// ```` +/// ``` /// /// [`StreamExt`]: rocket::futures::StreamExt /// [`SinkExt`]: rocket::futures::SinkExt diff --git a/contrib/ws/src/lib.rs b/contrib/ws/src/lib.rs index 7c180fb6..4b424a76 100644 --- a/contrib/ws/src/lib.rs +++ b/contrib/ws/src/lib.rs @@ -1,4 +1,74 @@ //! WebSocket support for Rocket. +//! +//! This crate implements support for WebSockets via Rocket's [connection +//! upgrade API](rocket::Response#upgrading) and +//! [tungstenite](tokio_tungstenite). +//! +//! # Usage +//! +//! Depend on the crate. Here, we rename the dependency to `ws` for convenience: +//! +//! ```toml +//! [dependencies] +//! ws = { package = "rocket_ws", version ="=0.1.0-rc.3" } +//! ``` +//! +//! Then, use [`WebSocket`] as a request guard in any route and either call +//! [`WebSocket::channel()`] or return a stream via [`Stream!`] or +//! [`WebSocket::stream()`] in the handler. The examples below are equivalent: +//! +//! ```rust +//! # use rocket::get; +//! # use rocket_ws as ws; +//! # +//! #[get("/echo?channel")] +//! fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> { +//! use rocket::futures::{SinkExt, StreamExt}; +//! +//! ws.channel(move |mut stream| Box::pin(async move { +//! while let Some(message) = stream.next().await { +//! let _ = stream.send(message?).await; +//! } +//! +//! Ok(()) +//! })) +//! } +//! +//! #[get("/echo?stream")] +//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { +//! ws::Stream! { ws => +//! for await message in ws { +//! yield message?; +//! } +//! } +//! } +//! +//! #[get("/echo?compose")] +//! fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] { +//! ws.stream(|io| io) +//! } +//! ``` +//! +//! WebSocket connections are configurable via [`WebSocket::config()`]: +//! +//! ```rust +//! # use rocket::get; +//! # use rocket_ws as ws; +//! # +//! #[get("/echo")] +//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { +//! let ws = ws.config(ws::Config { +//! max_send_queue: Some(5), +//! ..Default::default() +//! }); +//! +//! ws::Stream! { ws => +//! for await message in ws { +//! yield message?; +//! } +//! } +//! } +//! ``` #![doc(html_root_url = "https://api.rocket.rs/v0.5-rc/rocket_ws")] #![doc(html_favicon_url = "https://rocket.rs/images/favicon.ico")] @@ -11,10 +81,84 @@ mod tungstenite { mod duplex; mod websocket; -pub use self::tungstenite::Message; -pub use self::tungstenite::protocol::WebSocketConfig as Config; pub use self::websocket::{WebSocket, Channel}; +/// A WebSocket message. +/// +/// A value of this type is typically constructed by calling `.into()` on a +/// supported message type. This includes strings via `&str` and `String` and +/// bytes via `&[u8]` and `Vec`: +/// +/// ```rust +/// # use rocket::get; +/// # use rocket_ws as ws; +/// # +/// #[get("/echo")] +/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { +/// ws::Stream! { ws => +/// yield "Hello".into(); +/// yield String::from("Hello").into(); +/// yield (&[1u8, 2, 3][..]).into(); +/// yield vec![1u8, 2, 3].into(); +/// } +/// } +/// ``` +/// +/// Other kinds of messages can be constructed directly: +/// +/// ```rust +/// # use rocket::get; +/// # use rocket_ws as ws; +/// # +/// #[get("/echo")] +/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { +/// ws::Stream! { ws => +/// yield ws::Message::Ping(vec![b'h', b'i']) +/// } +/// } +/// ``` +pub use self::tungstenite::Message; + +/// WebSocket connection configuration. +/// +/// The default configuration for a [`WebSocket`] can be changed by calling +/// [`WebSocket::config()`] with a value of this type. The defaults are obtained +/// via [`Default::default()`]. You don't generally need to reconfigure a +/// `WebSocket` unless you're certain you need different values. In other words, +/// this structure should rarely be used. +/// +/// # Example +/// +/// ```rust +/// # use rocket::get; +/// # use rocket_ws as ws; +/// use rocket::data::ToByteUnit; +/// +/// #[get("/echo")] +/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { +/// let ws = ws.config(ws::Config { +/// // Enable backpressure with a max send queue size of `5`. +/// max_send_queue: Some(5), +/// // Decrease the maximum (complete) message size to 4MiB. +/// max_message_size: Some(4.mebibytes().as_u64() as usize), +/// // Decrease the maximum size of _one_ frame (not messag) to 1MiB. +/// max_frame_size: Some(1.mebibytes().as_u64() as usize), +/// // Use the default values for the rest. +/// ..Default::default() +/// }); +/// +/// ws::Stream! { ws => +/// for await message in ws { +/// yield message?; +/// } +/// } +/// } +/// ``` +/// +/// **Original `tungstenite` Documentation Follows** +/// +pub use self::tungstenite::protocol::WebSocketConfig as Config; + /// Structures for constructing raw WebSocket frames. pub mod frame { #[doc(hidden)] pub use crate::Message; diff --git a/contrib/ws/src/websocket.rs b/contrib/ws/src/websocket.rs index ce03cae1..fbf4ad11 100644 --- a/contrib/ws/src/websocket.rs +++ b/contrib/ws/src/websocket.rs @@ -10,8 +10,25 @@ use crate::{Config, Message}; use crate::stream::DuplexStream; use crate::result::{Result, Error}; -/// A request guard that identifies WebSocket requests. Converts into a -/// [`Channel`] or [`MessageStream`]. +/// A request guard identifying WebSocket requests. Converts into a [`Channel`] +/// or [`MessageStream`]. +/// +/// For example usage, see the [crate docs](crate#usage). +/// +/// ## Details +/// +/// This is the entrypoint to the library. Every WebSocket response _must_ +/// initiate via the `WebSocket` request guard. The guard identifies valid +/// WebSocket connection requests and, if the request is valid, succeeds to be +/// converted into a streaming WebSocket response via [`Stream!`], +/// [`WebSocket::channel()`], or [`WebSocket::stream()`]. The connection can be +/// configured via [`WebSocket::config()`]; see [`Config`] for details on +/// configuring a connection. +/// +/// ### Forwarding +/// +/// If the incoming request is not a valid WebSocket request, the guard +/// forwards. The guard never fails. pub struct WebSocket { config: Config, key: String, @@ -22,17 +39,119 @@ impl WebSocket { WebSocket { config: Config::default(), key } } + /// Change the default connection configuration to `config`. + /// + /// # Example + /// + /// ```rust + /// # use rocket::get; + /// # use rocket_ws as ws; + /// # + /// #[get("/echo")] + /// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { + /// let ws = ws.config(ws::Config { + /// max_send_queue: Some(5), + /// ..Default::default() + /// }); + /// + /// ws::Stream! { ws => + /// for await message in ws { + /// yield message?; + /// } + /// } + /// } + /// ``` pub fn config(mut self, config: Config) -> Self { self.config = config; self } + /// Create a read/write channel to the client and call `handler` with it. + /// + /// This method takes a `FnMut`, `handler`, that consumes a read/write + /// WebSocket channel, [`DuplexStream`] to the client. See [`DuplexStream`] + /// for details on how to make use of the channel. + /// + /// The `handler` must return a `Box`ed and `Pin`ned future: calling + /// [`Box::pin()`] with a future does just this as is the preferred + /// mechanism to create a `Box>`. The future must return a + /// [`Result<()>`](crate::result::Result). The WebSocket connection is + /// closed successfully if the future returns `Ok` and with an error if + /// the future returns `Err`. + /// + /// # Lifetimes + /// + /// The `Channel` may borrow from the request. If it does, the lifetime + /// should be specified as something other than `'static`. Otherwise, the + /// `'static` lifetime should be used. + /// + /// # Example + /// + /// ```rust + /// # use rocket::get; + /// # use rocket_ws as ws; + /// use rocket::futures::{SinkExt, StreamExt}; + /// + /// #[get("/hello/")] + /// fn hello(ws: ws::WebSocket, name: &str) -> ws::Channel<'_> { + /// ws.channel(move |mut stream| Box::pin(async move { + /// let message = format!("Hello, {}!", name); + /// let _ = stream.send(message.into()).await; + /// Ok(()) + /// })) + /// } + /// + /// #[get("/echo")] + /// fn echo(ws: ws::WebSocket) -> ws::Channel<'static> { + /// ws.channel(move |mut stream| Box::pin(async move { + /// while let Some(message) = stream.next().await { + /// let _ = stream.send(message?).await; + /// } + /// + /// Ok(()) + /// })) + /// } + /// ``` pub fn channel<'r, F: Send + 'r>(self, handler: F) -> Channel<'r> where F: FnMut(DuplexStream) -> BoxFuture<'r, Result<()>> + 'r { Channel { ws: self, handler: Box::new(handler), } } + /// Create a stream that consumes client [`Message`]s and emits its own. + /// + /// This method takes a `FnMut` `stream` that consumes a read-only stream + /// and returns a stream of [`Message`]s. While the returned stream can be + /// constructed in any manner, the [`Stream!`] macro is the preferred + /// method. In any case, the stream must be `Send`. + /// + /// The returned stream must emit items of type `Result`. Items + /// that are `Ok(Message)` are sent to the client while items of type + /// `Err(Error)` result in the connection being closed and the remainder of + /// the stream discarded. + /// + /// # Example + /// + /// ```rust + /// # use rocket::get; + /// # use rocket_ws as ws; + /// + /// // Use `Stream!`, which internally calls `WebSocket::stream()`. + /// #[get("/echo?stream")] + /// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { + /// ws::Stream! { ws => + /// for await message in ws { + /// yield message?; + /// } + /// } + /// } + /// + /// // Use a raw stream. + /// #[get("/echo?compose")] + /// fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] { + /// ws.stream(|io| io) + /// } + /// ``` pub fn stream<'r, F, S>(self, stream: F) -> MessageStream<'r, S> where F: FnMut(SplitStream) -> S + Send + 'r, S: futures::Stream> + Send + 'r @@ -42,6 +161,8 @@ impl WebSocket { } /// A streaming channel, returned by [`WebSocket::channel()`]. +/// +/// `Channel` has no methods or functionality beyond its trait implementations. pub struct Channel<'r> { ws: WebSocket, handler: Box BoxFuture<'r, Result<()>> + Send + 'r>, @@ -50,9 +171,9 @@ pub struct Channel<'r> { /// A [`Stream`](futures::Stream) of [`Message`]s, returned by /// [`WebSocket::stream()`], used via [`Stream!`]. /// -/// This type is not typically used directly. Instead, it is used via the +/// This type should not be used directly. Instead, it is used via the /// [`Stream!`] macro, which expands to both the type itself and an expression -/// which evaluates to this type. +/// which evaluates to this type. See [`Stream!`] for details. // TODO: Get rid of this or `Channel` via a single `enum`. pub struct MessageStream<'r, S> { ws: WebSocket, diff --git a/examples/upgrade/src/main.rs b/examples/upgrade/src/main.rs index a572e022..943ff070 100644 --- a/examples/upgrade/src/main.rs +++ b/examples/upgrade/src/main.rs @@ -3,8 +3,23 @@ use rocket::fs::{self, FileServer}; use rocket::futures::{SinkExt, StreamExt}; -#[get("/echo/manual")] -fn echo_manual<'r>(ws: ws::WebSocket) -> ws::Channel<'r> { +#[get("/echo?stream", rank = 1)] +fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { + ws::Stream! { ws => + for await message in ws { + yield message?; + } + } +} + +#[get("/echo?channel", rank = 2)] +fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> { + // This is entirely optional. Change default configuration. + let ws = ws.config(ws::Config { + max_send_queue: Some(5), + ..Default::default() + }); + ws.channel(move |mut stream| Box::pin(async move { while let Some(message) = stream.next().await { let _ = stream.send(message?).await; @@ -14,19 +29,14 @@ fn echo_manual<'r>(ws: ws::WebSocket) -> ws::Channel<'r> { })) } -#[get("/echo")] -fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] { - let ws = ws.config(ws::Config { max_send_queue: Some(5), ..Default::default() }); - ws::Stream! { ws => - for await message in ws { - yield message?; - } - } +#[get("/echo?raw", rank = 3)] +fn echo_raw(ws: ws::WebSocket) -> ws::Stream!['static] { + ws.stream(|stream| stream) } #[launch] fn rocket() -> _ { rocket::build() - .mount("/", routes![echo_manual, echo_stream]) + .mount("/", routes![echo_channel, echo_stream, echo_raw]) .mount("/", FileServer::from(fs::relative!("static"))) }