Fully document the 'ws' contrib crate.

This commit is contained in:
Sergio Benitez 2023-04-03 16:09:45 -07:00
parent 887558be60
commit d9f86d8647
5 changed files with 304 additions and 30 deletions

View File

@ -21,15 +21,15 @@ This crate provides WebSocket support for Rocket via integration with Rocket's
2. Use it!
```rust
#[get("/echo")]
fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
ws::stream! { ws =>
for await message in ws {
yield message?;
}
}
}
```
```rust
#[get("/echo")]
fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
ws::stream! { ws =>
for await message in ws {
yield message?;
}
}
}
```
See the [crate docs] for full details.

View File

@ -16,8 +16,7 @@ use crate::result::{Result, Error};
///
/// ```rust
/// # use rocket::get;
/// use rocket_ws as ws;
///
/// # use rocket_ws as ws;
/// use rocket::futures::{SinkExt, StreamExt};
///
/// #[get("/echo/manual")]
@ -30,7 +29,7 @@ use crate::result::{Result, Error};
/// Ok(())
/// }))
/// }
/// ````
/// ```
///
/// [`StreamExt`]: rocket::futures::StreamExt
/// [`SinkExt`]: rocket::futures::SinkExt

View File

@ -1,4 +1,74 @@
//! WebSocket support for Rocket.
//!
//! This crate implements support for WebSockets via Rocket's [connection
//! upgrade API](rocket::Response#upgrading) and
//! [tungstenite](tokio_tungstenite).
//!
//! # Usage
//!
//! Depend on the crate. Here, we rename the dependency to `ws` for convenience:
//!
//! ```toml
//! [dependencies]
//! ws = { package = "rocket_ws", version ="=0.1.0-rc.3" }
//! ```
//!
//! Then, use [`WebSocket`] as a request guard in any route and either call
//! [`WebSocket::channel()`] or return a stream via [`Stream!`] or
//! [`WebSocket::stream()`] in the handler. The examples below are equivalent:
//!
//! ```rust
//! # use rocket::get;
//! # use rocket_ws as ws;
//! #
//! #[get("/echo?channel")]
//! fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
//! use rocket::futures::{SinkExt, StreamExt};
//!
//! ws.channel(move |mut stream| Box::pin(async move {
//! while let Some(message) = stream.next().await {
//! let _ = stream.send(message?).await;
//! }
//!
//! Ok(())
//! }))
//! }
//!
//! #[get("/echo?stream")]
//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
//! ws::Stream! { ws =>
//! for await message in ws {
//! yield message?;
//! }
//! }
//! }
//!
//! #[get("/echo?compose")]
//! fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] {
//! ws.stream(|io| io)
//! }
//! ```
//!
//! WebSocket connections are configurable via [`WebSocket::config()`]:
//!
//! ```rust
//! # use rocket::get;
//! # use rocket_ws as ws;
//! #
//! #[get("/echo")]
//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
//! let ws = ws.config(ws::Config {
//! max_send_queue: Some(5),
//! ..Default::default()
//! });
//!
//! ws::Stream! { ws =>
//! for await message in ws {
//! yield message?;
//! }
//! }
//! }
//! ```
#![doc(html_root_url = "https://api.rocket.rs/v0.5-rc/rocket_ws")]
#![doc(html_favicon_url = "https://rocket.rs/images/favicon.ico")]
@ -11,10 +81,84 @@ mod tungstenite {
mod duplex;
mod websocket;
pub use self::tungstenite::Message;
pub use self::tungstenite::protocol::WebSocketConfig as Config;
pub use self::websocket::{WebSocket, Channel};
/// A WebSocket message.
///
/// A value of this type is typically constructed by calling `.into()` on a
/// supported message type. This includes strings via `&str` and `String` and
/// bytes via `&[u8]` and `Vec<u8>`:
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
/// #
/// #[get("/echo")]
/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
/// ws::Stream! { ws =>
/// yield "Hello".into();
/// yield String::from("Hello").into();
/// yield (&[1u8, 2, 3][..]).into();
/// yield vec![1u8, 2, 3].into();
/// }
/// }
/// ```
///
/// Other kinds of messages can be constructed directly:
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
/// #
/// #[get("/echo")]
/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
/// ws::Stream! { ws =>
/// yield ws::Message::Ping(vec![b'h', b'i'])
/// }
/// }
/// ```
pub use self::tungstenite::Message;
/// WebSocket connection configuration.
///
/// The default configuration for a [`WebSocket`] can be changed by calling
/// [`WebSocket::config()`] with a value of this type. The defaults are obtained
/// via [`Default::default()`]. You don't generally need to reconfigure a
/// `WebSocket` unless you're certain you need different values. In other words,
/// this structure should rarely be used.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
/// use rocket::data::ToByteUnit;
///
/// #[get("/echo")]
/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
/// let ws = ws.config(ws::Config {
/// // Enable backpressure with a max send queue size of `5`.
/// max_send_queue: Some(5),
/// // Decrease the maximum (complete) message size to 4MiB.
/// max_message_size: Some(4.mebibytes().as_u64() as usize),
/// // Decrease the maximum size of _one_ frame (not messag) to 1MiB.
/// max_frame_size: Some(1.mebibytes().as_u64() as usize),
/// // Use the default values for the rest.
/// ..Default::default()
/// });
///
/// ws::Stream! { ws =>
/// for await message in ws {
/// yield message?;
/// }
/// }
/// }
/// ```
///
/// **Original `tungstenite` Documentation Follows**
///
pub use self::tungstenite::protocol::WebSocketConfig as Config;
/// Structures for constructing raw WebSocket frames.
pub mod frame {
#[doc(hidden)] pub use crate::Message;

View File

@ -10,8 +10,25 @@ use crate::{Config, Message};
use crate::stream::DuplexStream;
use crate::result::{Result, Error};
/// A request guard that identifies WebSocket requests. Converts into a
/// [`Channel`] or [`MessageStream`].
/// A request guard identifying WebSocket requests. Converts into a [`Channel`]
/// or [`MessageStream`].
///
/// For example usage, see the [crate docs](crate#usage).
///
/// ## Details
///
/// This is the entrypoint to the library. Every WebSocket response _must_
/// initiate via the `WebSocket` request guard. The guard identifies valid
/// WebSocket connection requests and, if the request is valid, succeeds to be
/// converted into a streaming WebSocket response via [`Stream!`],
/// [`WebSocket::channel()`], or [`WebSocket::stream()`]. The connection can be
/// configured via [`WebSocket::config()`]; see [`Config`] for details on
/// configuring a connection.
///
/// ### Forwarding
///
/// If the incoming request is not a valid WebSocket request, the guard
/// forwards. The guard never fails.
pub struct WebSocket {
config: Config,
key: String,
@ -22,17 +39,119 @@ impl WebSocket {
WebSocket { config: Config::default(), key }
}
/// Change the default connection configuration to `config`.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
/// #
/// #[get("/echo")]
/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
/// let ws = ws.config(ws::Config {
/// max_send_queue: Some(5),
/// ..Default::default()
/// });
///
/// ws::Stream! { ws =>
/// for await message in ws {
/// yield message?;
/// }
/// }
/// }
/// ```
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}
/// Create a read/write channel to the client and call `handler` with it.
///
/// This method takes a `FnMut`, `handler`, that consumes a read/write
/// WebSocket channel, [`DuplexStream`] to the client. See [`DuplexStream`]
/// for details on how to make use of the channel.
///
/// The `handler` must return a `Box`ed and `Pin`ned future: calling
/// [`Box::pin()`] with a future does just this as is the preferred
/// mechanism to create a `Box<Pin<Future>>`. The future must return a
/// [`Result<()>`](crate::result::Result). The WebSocket connection is
/// closed successfully if the future returns `Ok` and with an error if
/// the future returns `Err`.
///
/// # Lifetimes
///
/// The `Channel` may borrow from the request. If it does, the lifetime
/// should be specified as something other than `'static`. Otherwise, the
/// `'static` lifetime should be used.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
/// use rocket::futures::{SinkExt, StreamExt};
///
/// #[get("/hello/<name>")]
/// fn hello(ws: ws::WebSocket, name: &str) -> ws::Channel<'_> {
/// ws.channel(move |mut stream| Box::pin(async move {
/// let message = format!("Hello, {}!", name);
/// let _ = stream.send(message.into()).await;
/// Ok(())
/// }))
/// }
///
/// #[get("/echo")]
/// fn echo(ws: ws::WebSocket) -> ws::Channel<'static> {
/// ws.channel(move |mut stream| Box::pin(async move {
/// while let Some(message) = stream.next().await {
/// let _ = stream.send(message?).await;
/// }
///
/// Ok(())
/// }))
/// }
/// ```
pub fn channel<'r, F: Send + 'r>(self, handler: F) -> Channel<'r>
where F: FnMut(DuplexStream) -> BoxFuture<'r, Result<()>> + 'r
{
Channel { ws: self, handler: Box::new(handler), }
}
/// Create a stream that consumes client [`Message`]s and emits its own.
///
/// This method takes a `FnMut` `stream` that consumes a read-only stream
/// and returns a stream of [`Message`]s. While the returned stream can be
/// constructed in any manner, the [`Stream!`] macro is the preferred
/// method. In any case, the stream must be `Send`.
///
/// The returned stream must emit items of type `Result<Message>`. Items
/// that are `Ok(Message)` are sent to the client while items of type
/// `Err(Error)` result in the connection being closed and the remainder of
/// the stream discarded.
///
/// # Example
///
/// ```rust
/// # use rocket::get;
/// # use rocket_ws as ws;
///
/// // Use `Stream!`, which internally calls `WebSocket::stream()`.
/// #[get("/echo?stream")]
/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
/// ws::Stream! { ws =>
/// for await message in ws {
/// yield message?;
/// }
/// }
/// }
///
/// // Use a raw stream.
/// #[get("/echo?compose")]
/// fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] {
/// ws.stream(|io| io)
/// }
/// ```
pub fn stream<'r, F, S>(self, stream: F) -> MessageStream<'r, S>
where F: FnMut(SplitStream<DuplexStream>) -> S + Send + 'r,
S: futures::Stream<Item = Result<Message>> + Send + 'r
@ -42,6 +161,8 @@ impl WebSocket {
}
/// A streaming channel, returned by [`WebSocket::channel()`].
///
/// `Channel` has no methods or functionality beyond its trait implementations.
pub struct Channel<'r> {
ws: WebSocket,
handler: Box<dyn FnMut(DuplexStream) -> BoxFuture<'r, Result<()>> + Send + 'r>,
@ -50,9 +171,9 @@ pub struct Channel<'r> {
/// A [`Stream`](futures::Stream) of [`Message`]s, returned by
/// [`WebSocket::stream()`], used via [`Stream!`].
///
/// This type is not typically used directly. Instead, it is used via the
/// This type should not be used directly. Instead, it is used via the
/// [`Stream!`] macro, which expands to both the type itself and an expression
/// which evaluates to this type.
/// which evaluates to this type. See [`Stream!`] for details.
// TODO: Get rid of this or `Channel` via a single `enum`.
pub struct MessageStream<'r, S> {
ws: WebSocket,

View File

@ -3,8 +3,23 @@
use rocket::fs::{self, FileServer};
use rocket::futures::{SinkExt, StreamExt};
#[get("/echo/manual")]
fn echo_manual<'r>(ws: ws::WebSocket) -> ws::Channel<'r> {
#[get("/echo?stream", rank = 1)]
fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
ws::Stream! { ws =>
for await message in ws {
yield message?;
}
}
}
#[get("/echo?channel", rank = 2)]
fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
// This is entirely optional. Change default configuration.
let ws = ws.config(ws::Config {
max_send_queue: Some(5),
..Default::default()
});
ws.channel(move |mut stream| Box::pin(async move {
while let Some(message) = stream.next().await {
let _ = stream.send(message?).await;
@ -14,19 +29,14 @@ fn echo_manual<'r>(ws: ws::WebSocket) -> ws::Channel<'r> {
}))
}
#[get("/echo")]
fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
let ws = ws.config(ws::Config { max_send_queue: Some(5), ..Default::default() });
ws::Stream! { ws =>
for await message in ws {
yield message?;
}
}
#[get("/echo?raw", rank = 3)]
fn echo_raw(ws: ws::WebSocket) -> ws::Stream!['static] {
ws.stream(|stream| stream)
}
#[launch]
fn rocket() -> _ {
rocket::build()
.mount("/", routes![echo_manual, echo_stream])
.mount("/", routes![echo_channel, echo_stream, echo_raw])
.mount("/", FileServer::from(fs::relative!("static")))
}