From b00c89c22f37f5cffbbfda2f15e9a07728436775 Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Sat, 26 Jun 2021 16:37:21 -0700 Subject: [PATCH] Support lifetime bounds in typed stream macros. The syntax 'TypedStream![T + '_]' expands to: impl TypedStream + '_ This allows seamlessly borrowing in typed streams. Also adds 'Event::empty()', for convenience. --- core/codegen/src/attribute/route/mod.rs | 15 ++++- core/codegen/src/bang/mod.rs | 3 +- core/codegen/src/bang/typed_stream.rs | 78 ++++++++++++++++++------- core/codegen/src/syn_ext.rs | 31 +++++----- core/lib/src/response/stream/bytes.rs | 3 + core/lib/src/response/stream/mod.rs | 59 +++++++++++++++++++ core/lib/src/response/stream/raw_sse.rs | 19 +++++- core/lib/src/response/stream/reader.rs | 3 + core/lib/src/response/stream/sse.rs | 61 ++++++++++++++++--- core/lib/src/response/stream/text.rs | 3 + 10 files changed, 226 insertions(+), 49 deletions(-) diff --git a/core/codegen/src/attribute/route/mod.rs b/core/codegen/src/attribute/route/mod.rs index 0b9afa92..71218a38 100644 --- a/core/codegen/src/attribute/route/mod.rs +++ b/core/codegen/src/attribute/route/mod.rs @@ -274,11 +274,22 @@ fn sentinels_expr(route: &Route) -> TokenStream { // * returns `true` for the parent, and so the type has a parent, and // the theorem holds. // 3. these are all the cases. QED. - const TYPE_MACROS: &[&str] = &["ReaderStream", "TextStream", "ByteStream", "EventStream"]; + + const TY_MACS: &[&str] = &["ReaderStream", "TextStream", "ByteStream", "EventStream"]; + + fn ty_mac_mapper(tokens: &TokenStream) -> Option { + use crate::bang::typed_stream::Input; + + match syn::parse2(tokens.clone()).ok()? { + Input::Type(ty, ..) => Some(ty), + Input::Tokens(..) => None + } + } + let eligible_types = route.guards() .map(|guard| &guard.ty) .chain(ret_ty.as_ref().into_iter()) - .flat_map(|ty| ty.unfold_with_known_macros(TYPE_MACROS)) + .flat_map(|ty| ty.unfold_with_ty_macros(TY_MACS, ty_mac_mapper)) .filter(|ty| ty.is_concrete(&generic_idents)) .map(|child| (child.parent, child.ty)); diff --git a/core/codegen/src/bang/mod.rs b/core/codegen/src/bang/mod.rs index 58e32ae3..5633f442 100644 --- a/core/codegen/src/bang/mod.rs +++ b/core/codegen/src/bang/mod.rs @@ -2,7 +2,8 @@ mod uri; mod uri_parsing; mod test_guide; mod export; -mod typed_stream; + +pub mod typed_stream; use devise::Result; use syn::{Path, punctuated::Punctuated, parse::Parser, Token}; diff --git a/core/codegen/src/bang/typed_stream.rs b/core/codegen/src/bang/typed_stream.rs index 2199e31f..44c910b0 100644 --- a/core/codegen/src/bang/typed_stream.rs +++ b/core/codegen/src/bang/typed_stream.rs @@ -1,8 +1,8 @@ use proc_macro2::TokenStream; use syn::parse::{Parse, ParseStream, discouraged::Speculative}; -enum Input { - Type(syn::Type), +pub enum Input { + Type(syn::Type, Option<(syn::Token![+], syn::Lifetime)>), Tokens(TokenStream) } @@ -13,27 +13,60 @@ struct Invocation { input: Input, } +/// Reinterpret a `T + '_` (without the `dyn`) for `impl Stream + '_`. +fn trait_obj_recast(ty: &syn::Type) -> Option<(syn::Type, syn::Token![+], syn::Lifetime)> { + let bounds = match ty { + syn::Type::TraitObject(t) if t.dyn_token.is_none() => &t.bounds, + _ => return None + }; + + let mut bounds = bounds.pairs(); + let (first, second) = (bounds.next()?, bounds.next()?); + let plus = **first.punct().expect("have two so have punct"); + + let first = first.value(); + let real_ty = syn::parse2(quote!(#first)).ok()?; + let lifetime = match second.value() { + syn::TypeParamBound::Lifetime(lt) => lt.clone(), + _ => return None, + }; + + Some((real_ty, plus, lifetime)) +} + +impl Parse for Input { + fn parse(input: ParseStream<'_>) -> syn::Result { + let fork = input.fork(); + if let Ok(mut ty) = fork.parse() { + input.advance_to(&fork); + + // If there's an extra + '_, use it in the reinterpretation. + let mut bound = match input.parse() { + Ok(plus) => Some((plus, input.parse()?)), + _ => None, + }; + + // We might miss `A + '_`. Check if we did. + if let Some((real_ty, plus, lt)) = trait_obj_recast(&ty) { + ty = real_ty; + bound = Some((plus, lt)); + } + + Ok(Input::Type(ty, bound)) + } else { + Ok(Input::Tokens(input.parse()?)) + } + } +} + impl Parse for Invocation { fn parse(input: ParseStream<'_>) -> syn::Result { - let ty_stream_ty = input.parse()?; - input.parse::()?; - - let stream_mac = input.parse()?; - input.parse::()?; - - let stream_trait = input.parse()?; - input.parse::()?; - - let fork = input.fork(); - let input = match fork.parse() { - Ok(ty) => { - input.advance_to(&fork); - Input::Type(ty) - } - Err(_) => Input::Tokens(input.parse()?) - }; - - Ok(Invocation { ty_stream_ty, stream_mac, stream_trait, input }) + Ok(Invocation { + ty_stream_ty: (input.parse()?, input.parse::()?).0, + stream_mac: (input.parse()?, input.parse::()?).0, + stream_trait: (input.parse()?, input.parse::()?).0, + input: input.parse()?, + }) } } @@ -46,7 +79,8 @@ pub fn _macro(input: proc_macro::TokenStream) -> devise::Result { let (s_ty, mac, s_trait) = (i.ty_stream_ty, i.stream_mac, i.stream_trait); let tokens = match i.input { Input::Tokens(tt) => quote!(#s_ty::from(#mac!(#tt))), - Input::Type(ty) => quote!(#s_ty>), + Input::Type(ty, Some((p, l))) => quote!(#s_ty #p #l>), + Input::Type(ty, None) => quote!(#s_ty>), }; Ok(tokens) diff --git a/core/codegen/src/syn_ext.rs b/core/codegen/src/syn_ext.rs index c091b6a8..5e5b6f5f 100644 --- a/core/codegen/src/syn_ext.rs +++ b/core/codegen/src/syn_ext.rs @@ -6,7 +6,7 @@ use std::borrow::Cow; use syn::{self, Ident, ext::IdentExt as _, visit::Visit}; use proc_macro2::{Span, TokenStream}; -use devise::ext::PathExt; +use devise::ext::{PathExt, TypeExt as _}; use rocket_http::ext::IntoOwned; pub trait IdentExt { @@ -55,9 +55,11 @@ impl IntoOwned for Child<'_> { } } +type MacTyMapFn = fn(&TokenStream) -> Option; + pub trait TypeExt { fn unfold(&self) -> Vec>; - fn unfold_with_known_macros(&self, known_macros: &[&str]) -> Vec>; + fn unfold_with_ty_macros(&self, names: &[&str], mapper: MacTyMapFn) -> Vec>; fn is_concrete(&self, generic_ident: &[&Ident]) -> bool; } @@ -137,29 +139,32 @@ impl FnArgExt for syn::FnArg { } } -fn known_macro_inner_ty(t: &syn::TypeMacro, known: &[&str]) -> Option { - if !known.iter().any(|k| t.mac.path.last_ident().map_or(false, |i| i == k)) { +fn macro_inner_ty(t: &syn::TypeMacro, names: &[&str], m: MacTyMapFn) -> Option { + if !names.iter().any(|k| t.mac.path.last_ident().map_or(false, |i| i == k)) { return None; } - syn::parse2(t.mac.tokens.clone()).ok() + let mut ty = m(&t.mac.tokens)?; + ty.strip_lifetimes(); + Some(ty) } impl TypeExt for syn::Type { fn unfold(&self) -> Vec> { - self.unfold_with_known_macros(&[]) + self.unfold_with_ty_macros(&[], |_| None) } - fn unfold_with_known_macros<'a>(&'a self, known_macros: &[&str]) -> Vec> { + fn unfold_with_ty_macros(&self, names: &[&str], mapper: MacTyMapFn) -> Vec> { struct Visitor<'a, 'm> { parents: Vec>, children: Vec>, - known_macros: &'m [&'m str], + names: &'m [&'m str], + mapper: MacTyMapFn, } impl<'m> Visitor<'_, 'm> { - fn new(known_macros: &'m [&'m str]) -> Self { - Visitor { parents: vec![], children: vec![], known_macros } + fn new(names: &'m [&'m str], mapper: MacTyMapFn) -> Self { + Visitor { parents: vec![], children: vec![], names, mapper } } } @@ -168,8 +173,8 @@ impl TypeExt for syn::Type { let parent = self.parents.last().cloned(); if let syn::Type::Macro(t) = ty { - if let Some(inner_ty) = known_macro_inner_ty(t, self.known_macros) { - let mut visitor = Visitor::new(self.known_macros); + if let Some(inner_ty) = macro_inner_ty(t, self.names, self.mapper) { + let mut visitor = Visitor::new(self.names, self.mapper); if let Some(parent) = parent.clone().into_owned() { visitor.parents.push(parent); } @@ -188,7 +193,7 @@ impl TypeExt for syn::Type { } } - let mut visitor = Visitor::new(known_macros); + let mut visitor = Visitor::new(names, mapper); visitor.visit_type(self); visitor.children } diff --git a/core/lib/src/response/stream/bytes.rs b/core/lib/src/response/stream/bytes.rs index 28da6c4d..52782aa2 100644 --- a/core/lib/src/response/stream/bytes.rs +++ b/core/lib/src/response/stream/bytes.rs @@ -75,6 +75,9 @@ impl<'r, S: Stream> Responder<'r, 'r> for ByteStream crate::export! { /// Type and stream expression macro for [`struct@ByteStream`]. /// + /// See [`stream!`](crate::response::stream::stream) for the syntax + /// supported by this macro. + /// /// See [`struct@ByteStream`] and the [module level /// docs](crate::response::stream#typed-streams) for usage details. macro_rules! ByteStream { diff --git a/core/lib/src/response/stream/mod.rs b/core/lib/src/response/stream/mod.rs index f37e003a..ea377778 100644 --- a/core/lib/src/response/stream/mod.rs +++ b/core/lib/src/response/stream/mod.rs @@ -92,6 +92,65 @@ //! The expansions are identical for `ReaderStream` and `ByteStream`, with //! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively. //! +//! ## Borrowing +//! +//! A stream can _yield_ borrowed values with no extra effort: +//! +//! ```rust +//! # use rocket::get; +//! use rocket::State; +//! use rocket::response::stream::TextStream; +//! +//! /// Produce a single string borrowed from the request. +//! #[get("/infinite-hellos")] +//! fn hello(string: &State) -> TextStream![&str] { +//! TextStream! { +//! yield string.as_str(); +//! } +//! } +//! ``` +//! +//! If the stream _contains_ a borrowed value or uses one internally, Rust +//! requires this fact be explicit with a lifetime annotation: +//! +//! ```rust +//! # use rocket::get; +//! use rocket::State; +//! use rocket::response::stream::TextStream; +//! +//! #[get("/")] +//! fn borrow1(ctxt: &State) -> TextStream![&'static str + '_] { +//! TextStream! { +//! // By using `ctxt` in the stream, the borrow is moved into it. Thus, +//! // the stream object contains a borrow, prompting the '_ annotation. +//! if *ctxt.inner() { +//! yield "hello"; +//! } +//! } +//! } +//! +//! // Just as before but yielding an owned yield value. +//! #[get("/")] +//! fn borrow2(ctxt: &State) -> TextStream![String + '_] { +//! TextStream! { +//! if *ctxt.inner() { +//! yield "hello".to_string(); +//! } +//! } +//! } +//! +//! // As before but _also_ return a borrowed value. Without it, Rust gives: +//! // - lifetime `'r` is missing in item created through this procedural macro +//! #[get("/")] +//! fn borrow3<'r>(ctxt: &'r State, s: &'r State) -> TextStream![&'r str + 'r] { +//! TextStream! { +//! if *ctxt.inner() { +//! yield s.as_str(); +//! } +//! } +//! } +//! ``` +//! //! # Graceful Shutdown //! //! Infinite responders, like the one defined in `hello` above, will prolong diff --git a/core/lib/src/response/stream/raw_sse.rs b/core/lib/src/response/stream/raw_sse.rs index 0017dab8..d0c384d8 100644 --- a/core/lib/src/response/stream/raw_sse.rs +++ b/core/lib/src/response/stream/raw_sse.rs @@ -181,6 +181,21 @@ fn skip + Unpin>(buf: &mut Take>) { } } + +macro_rules! dbg_assert_ready { + ($e:expr) => ({ + let poll = $e; + debug_assert!(poll.is_ready()); + ::futures::ready!(poll) + }) +} + +// NOTE: The correctness of this implementation depends on the types of `name` +// and `value` having `AsyncRead` implementations that always return `Ready`. +// Otherwise, we may return `Pending` after having written data to `buf` which +// violates the contract. This can happen because even after a successful +// partial or full read of `name`, we loop back to a `ready!(name.poll())` if +// `buf` was not completely filled. So, we return `Pending` if that poll does. impl AsyncRead for RawLinedEvent { fn poll_read( mut self: Pin<&mut Self>, @@ -196,7 +211,7 @@ impl AsyncRead for RawLinedEvent { match self.state { State::Name => { - futures::ready!(Pin::new(&mut self.name).poll_read(cx, buf))?; + dbg_assert_ready!(Pin::new(&mut self.name).poll_read(cx, buf))?; if !self.name.has_remaining() { self.name.set_position(0); self.state = State::Colon; @@ -208,7 +223,7 @@ impl AsyncRead for RawLinedEvent { self.state = State::Value; } State::Value => { - futures::ready!(Pin::new(&mut self.value).poll_read(cx, buf))?; + dbg_assert_ready!(Pin::new(&mut self.value).poll_read(cx, buf))?; if self.value.limit() == 0 { self.state = State::NewLine; } diff --git a/core/lib/src/response/stream/reader.rs b/core/lib/src/response/stream/reader.rs index d70c8e5c..d3a3da71 100644 --- a/core/lib/src/response/stream/reader.rs +++ b/core/lib/src/response/stream/reader.rs @@ -196,6 +196,9 @@ impl fmt::Debug for ReaderStream crate::export! { /// Type and stream expression macro for [`struct@ReaderStream`]. /// + /// See [`stream!`](crate::response::stream::stream) for the syntax + /// supported by this macro. + /// /// See [`struct@ReaderStream`] and the [module level /// docs](crate::response::stream#typed-streams) for usage details. macro_rules! ReaderStream { diff --git a/core/lib/src/response/stream/sse.rs b/core/lib/src/response/stream/sse.rs index a3c52fbf..d69bacf4 100644 --- a/core/lib/src/response/stream/sse.rs +++ b/core/lib/src/response/stream/sse.rs @@ -122,12 +122,26 @@ pub struct Event { } impl Event { - /// Creates an empty `Event` with no fields. This is hidden so we never - /// generate an `Event` with nothing. - fn empty() -> Self { + // We hide this since we never want to construct an `Event` with nothing. + fn new() -> Self { Event { comment: None, retry: None, id: None, event: None, data: None, } } + /// Creates a new `Event` with an empty data field. + /// + /// This is exactly equivalent to `Event::data("")`. + /// + /// # Example + /// + /// ```rust + /// use rocket::response::stream::Event; + /// + /// let event = Event::empty(); + /// ``` + pub fn empty() -> Self { + Event::data("") + } + /// Creates a new `Event` with a data field of `data` serialized as JSON. /// /// # Example @@ -174,7 +188,7 @@ impl Event { /// let event = Event::data("Hello, SSE!"); /// ``` pub fn data>>(data: T) -> Self { - Self { data: Some(data.into()), ..Event::empty() } + Self { data: Some(data.into()), ..Event::new() } } /// Creates a new comment `Event`. @@ -192,7 +206,7 @@ impl Event { /// let event = Event::comment("bet you'll never see me!"); /// ``` pub fn comment>>(data: T) -> Self { - Self { comment: Some(data.into()), ..Event::empty() } + Self { comment: Some(data.into()), ..Event::new() } } /// Creates a new retry `Event`. @@ -206,7 +220,7 @@ impl Event { /// let event = Event::retry(Duration::from_millis(250)); /// ``` pub fn retry(period: Duration) -> Self { - Self { retry: Some(period), ..Event::empty() } + Self { retry: Some(period), ..Event::new() } } /// Sets the value of the 'event' (event type) field. @@ -396,6 +410,31 @@ impl Event { /// terminate an otherwise infinite stream, see [graceful /// shutdown](crate::response::stream#graceful-shutdown). /// +/// # Borrowing +/// +/// If an `EventStream` contains a borrow, the extended type syntax +/// `EventStream![Event + '_]` must be used: +/// +/// ```rust +/// # use rocket::get; +/// use rocket::State; +/// use rocket::response::stream::{Event, EventStream}; +/// +/// #[get("/events")] +/// fn events(ctxt: &State) -> EventStream![Event + '_] { +/// EventStream! { +/// // By using `ctxt` in the stream, the borrow is moved into it. Thus, +/// // the stream object contains a borrow, prompting the '_ annotation. +/// if *ctxt.inner() { +/// yield Event::data("hi"); +/// } +/// } +/// } +/// ``` +/// +/// See [`stream#borrowing`](crate::response::stream#borrowing) for further +/// details on borrowing in streams. +/// /// # Pitfalls /// /// Server-Sent Events are a rather simple mechanism, though there are some @@ -444,9 +483,8 @@ pub struct EventStream { impl> EventStream { /// Sets a "ping" interval for this `EventStream` to avoid connection - /// timeouts when no data is being transferred. The default `interval` for a - /// newly created `EventStream` is `None`, which disables this - /// functionality. + /// timeouts when no data is being transferred. The default `interval` is 30 + /// seconds. /// /// The ping is implemented by sending an empty comment to the client every /// `interval` seconds. @@ -541,6 +579,11 @@ impl<'r, S: Stream + Send + 'r> Responder<'r, 'r> for EventStream< crate::export! { /// Type and stream expression macro for [`struct@EventStream`]. /// + /// See [`stream!`](crate::response::stream::stream) for the syntax + /// supported by this macro. In addition to that syntax, this macro can also + /// be called with no arguments, `EventStream![]`, as shorthand for + /// `EventStream![Event]`. + /// /// See [`struct@EventStream`] and the [module level /// docs](crate::response::stream#typed-streams) for usage details. macro_rules! EventStream { diff --git a/core/lib/src/response/stream/text.rs b/core/lib/src/response/stream/text.rs index df4eddad..3064e0f0 100644 --- a/core/lib/src/response/stream/text.rs +++ b/core/lib/src/response/stream/text.rs @@ -85,6 +85,9 @@ impl<'r, S: Stream> Responder<'r, 'r> for TextStream crate::export! { /// Type and stream expression macro for [`struct@TextStream`]. /// + /// See [`stream!`](crate::response::stream::stream) for the syntax + /// supported by this macro. + /// /// See [`struct@TextStream`] and the [module level /// docs](crate::response::stream#typed-streams) for usage details. macro_rules! TextStream {