diff --git a/core/http/Cargo.toml b/core/http/Cargo.toml index c4ca136b..953f6635 100644 --- a/core/http/Cargo.toml +++ b/core/http/Cargo.toml @@ -22,16 +22,21 @@ private-cookies = ["cookie/private", "cookie/key-expansion"] [dependencies] smallvec = "1.0" percent-encoding = "1" -hyper = { version = "=0.13.0-alpha.1", default-features = false, features = ["runtime"] } +hyper = { version = "=0.13.0-alpha.1", default-features = false } http = "0.1.17" mime = "0.3.13" time = "0.2.11" indexmap = "1.0" state = "0.4" -tokio-rustls = { version = "0.10.3", optional = true } +tokio-rustls = { version = "0.12.0-alpha.2", optional = true } +tokio-io = "=0.2.0-alpha.4" +tokio-net = "=0.2.0-alpha.4" +tokio-timer = "=0.3.0-alpha.4" cookie = { version = "0.14.0", features = ["percent-encode"] } pear = "0.1" unicode-xid = "0.2" +futures-preview = "0.3.0-alpha.18" +log = "0.4" [dev-dependencies] rocket = { version = "0.5.0-dev", path = "../lib" } diff --git a/core/http/src/hyper.rs b/core/http/src/hyper.rs index 3de5c2ae..36ba9b01 100644 --- a/core/http/src/hyper.rs +++ b/core/http/src/hyper.rs @@ -8,7 +8,6 @@ #[doc(hidden)] pub use hyper::body::{Payload, Sender as BodySender}; #[doc(hidden)] pub use hyper::error::Error; #[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, MakeService, Service}; -#[doc(hidden)] pub use hyper::server::conn::{AddrIncoming, AddrStream}; #[doc(hidden)] pub use hyper::Chunk; #[doc(hidden)] pub use http::header::HeaderMap; diff --git a/core/http/src/lib.rs b/core/http/src/lib.rs index c27ed519..fe951fa4 100644 --- a/core/http/src/lib.rs +++ b/core/http/src/lib.rs @@ -37,6 +37,7 @@ mod header; mod accept; mod raw_str; mod parse; +mod listener; pub mod uncased; @@ -51,6 +52,7 @@ pub mod private { // This one we need to expose for core. pub use crate::cookies::{Key, CookieJar}; + pub use crate::listener::{Incoming, Listener, Connection, bind_tcp}; } pub use crate::method::Method; diff --git a/core/http/src/listener.rs b/core/http/src/listener.rs new file mode 100644 index 00000000..f4b13d1e --- /dev/null +++ b/core/http/src/listener.rs @@ -0,0 +1,187 @@ +use std::fmt; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use futures::ready; +use futures::stream::Stream; + +use log::{debug, error}; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; +use tokio_net::tcp::{TcpListener, TcpStream}; + +// TODO.async: 'Listener' and 'Connection' provide common enough functionality +// that they could be introduced in upstream libraries. +/// A 'Listener' yields incoming connections +pub trait Listener { + type Connection: Connection; + + /// Return the actual address this listener bound to. + fn local_addr(&self) -> Option; + + /// Try to accept an incoming Connection if ready + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll>; +} + +/// A 'Connection' represents an open connection to a client +pub trait Connection: AsyncRead + AsyncWrite { + fn remote_addr(&self) -> Option; +} + +/// This is a genericized version of hyper's AddrIncoming that is intended to be +/// usable with listeners other than a plain TCP stream, e.g. TLS and/or Unix +/// sockets. It does this by briding the `Listener` trait to what hyper wants (a +/// Stream of AsyncRead+AsyncWrite). This type is internal to Rocket. +#[must_use = "streams do nothing unless polled"] +pub struct Incoming { + listener: L, + sleep_on_errors: Option, + pending_error_delay: Option, +} + +impl Incoming { + /// Construct an `Incoming` from an existing `Listener`. + pub fn from_listener(listener: L) -> Self { + Self { + listener, + sleep_on_errors: Some(Duration::from_secs(1)), + pending_error_delay: None, + } + } + + /// Set whether to sleep on accept errors. + /// + /// A possible scenario is that the process has hit the max open files + /// allowed, and so trying to accept a new connection will fail with + /// `EMFILE`. In some cases, it's preferable to just wait for some time, if + /// the application will likely close some files (or connections), and try + /// to accept the connection again. If this option is `true`, the error + /// will be logged at the `error` level, since it is still a big deal, + /// and then the listener will sleep for 1 second. + /// + /// In other cases, hitting the max open files should be treat similarly + /// to being out-of-memory, and simply error (and shutdown). Setting + /// this option to `None` will allow that. + /// + /// Default is 1 second. + pub fn set_sleep_on_errors(&mut self, val: Option) { + self.sleep_on_errors = val; + } + + fn poll_next_(&mut self, cx: &mut Context<'_>) -> Poll> { + // Check if a previous delay is active that was set by IO errors. + if let Some(ref mut delay) = self.pending_error_delay { + match Pin::new(delay).poll(cx) { + Poll::Ready(()) => {} + Poll::Pending => return Poll::Pending, + } + } + self.pending_error_delay = None; + + loop { + match self.listener.poll_accept(cx) { + Poll::Ready(Ok(stream)) => { + return Poll::Ready(Ok(stream)); + }, + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => { + // Connection errors can be ignored directly, continue by + // accepting the next request. + if is_connection_error(&e) { + debug!("accepted connection already errored: {}", e); + continue; + } + + if let Some(duration) = self.sleep_on_errors { + error!("accept error: {}", e); + + // Sleep for the specified duration + let delay = Instant::now() + duration; + // TODO.async: This depends on a tokio Timer being set in the environment + let mut error_delay = tokio_timer::delay(delay); + + match Pin::new(&mut error_delay).poll(cx) { + Poll::Ready(()) => { + // Wow, it's been a second already? Ok then... + continue + }, + Poll::Pending => { + self.pending_error_delay = Some(error_delay); + return Poll::Pending; + }, + } + } else { + return Poll::Ready(Err(e)); + } + }, + } + } + } +} + +impl Stream for Incoming { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let result = ready!(self.poll_next_(cx)); + Poll::Ready(Some(result)) + } +} + +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a delay before next `accept()` is performed. +/// The delay is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn is_connection_error(e: &io::Error) -> bool { + match e.kind() { + io::ErrorKind::ConnectionRefused | + io::ErrorKind::ConnectionAborted | + io::ErrorKind::ConnectionReset => true, + _ => false, + } +} + +impl fmt::Debug for Incoming { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Incoming") + .field("listener", &self.listener) + .finish() + } +} + +// TODO.async: Put these under a feature such as #[cfg(feature = "tokio-runtime")] + +pub fn bind_tcp(address: SocketAddr) -> Pin> + Send>> { + Box::pin(async move { + Ok(TcpListener::bind(address).await?) + }) +} + +impl Listener for TcpListener { + type Connection = TcpStream; + + fn local_addr(&self) -> Option { + self.local_addr().ok() + } + + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { + // NB: This is only okay because TcpListener::accept() is stateless. + let accept = self.accept(); + futures::pin_mut!(accept); + accept.poll(cx).map_ok(|(stream, _addr)| stream) + } +} + +impl Connection for TcpStream { + fn remote_addr(&self) -> Option { + self.peer_addr().ok() + } +} diff --git a/core/http/src/tls.rs b/core/http/src/tls.rs index 5e236d9b..9dd31f0a 100644 --- a/core/http/src/tls.rs +++ b/core/http/src/tls.rs @@ -1,8 +1,146 @@ -pub use tokio_rustls::TlsAcceptor; -pub use tokio_rustls::rustls; +use std::fs; +use std::future::Future; +use std::io::{self, BufReader}; +use std::net::SocketAddr; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use tokio_net::tcp::{TcpListener, TcpStream}; + +use tokio_rustls::{TlsAcceptor, server::TlsStream}; +use tokio_rustls::rustls; pub use rustls::internal::pemfile; -pub use rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig}; +pub use rustls::{Certificate, PrivateKey, ServerConfig}; -// TODO.async: extract from hyper-sync-rustls some convenience -// functions to load certs and keys +use crate::listener::{Connection, Listener}; + +#[derive(Debug)] +pub enum Error { + Io(io::Error), + BadCerts, + BadKeyCount, + BadKey, +} + +// TODO.async: consider using async fs operations +pub fn load_certs>(path: P) -> Result, Error> { + let certfile = fs::File::open(path.as_ref()).map_err(|e| Error::Io(e))?; + let mut reader = BufReader::new(certfile); + pemfile::certs(&mut reader).map_err(|_| Error::BadCerts) +} + +pub fn load_private_key>(path: P) -> Result { + use std::io::Seek; + use std::io::BufRead; + + let keyfile = fs::File::open(path.as_ref()).map_err(Error::Io)?; + let mut reader = BufReader::new(keyfile); + + // "rsa" (PKCS1) PEM files have a different first-line header than PKCS8 + // PEM files, use that to determine the parse function to use. + let mut first_line = String::new(); + reader.read_line(&mut first_line).map_err(Error::Io)?; + reader.seek(io::SeekFrom::Start(0)).map_err(Error::Io)?; + + let private_keys_fn = match first_line.trim_end() { + "-----BEGIN RSA PRIVATE KEY-----" => pemfile::rsa_private_keys, + "-----BEGIN PRIVATE KEY-----" => pemfile::pkcs8_private_keys, + _ => return Err(Error::BadKey), + }; + + let key = private_keys_fn(&mut reader) + .map_err(|_| Error::BadKey) + .and_then(|mut keys| match keys.len() { + 0 => Err(Error::BadKey), + 1 => Ok(keys.remove(0)), + _ => Err(Error::BadKeyCount), + })?; + + // Ensure we can use the key. + if rustls::sign::RSASigningKey::new(&key).is_err() { + Err(Error::BadKey) + } else { + Ok(key) + } +} + +// TODO.async: Put these under a feature such as #[cfg(feature = "tokio-runtime")] + +pub struct TlsListener { + listener: TcpListener, + acceptor: TlsAcceptor, + state: TlsListenerState, +} + +enum TlsListenerState { + Listening, + Accepting(Pin, io::Error>> + Send>>), +} + +impl Listener for TlsListener { + type Connection = TlsStream; + + fn local_addr(&self) -> Option { + self.listener.local_addr().ok() + } + + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.state { + TlsListenerState::Listening => { + match self.listener.poll_accept(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(stream)) => { + self.state = TlsListenerState::Accepting(Box::pin(self.acceptor.accept(stream))); + } + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(e)); + } + } + } + TlsListenerState::Accepting(fut) => { + match fut.as_mut().poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(result) => { + self.state = TlsListenerState::Listening; + return Poll::Ready(result); + } + } + } + } + } + } +} + +pub fn bind_tls(address: SocketAddr, cert_chain: Vec, key: PrivateKey) + -> Pin> + Send>> +{ + Box::pin(async move { + let listener = TcpListener::bind(address).await?; + + let client_auth = rustls::NoClientAuth::new(); + let mut tls_config = ServerConfig::new(client_auth); + let cache = rustls::ServerSessionMemoryCache::new(1024); + tls_config.set_persistence(cache); + tls_config.ticketer = rustls::Ticketer::new(); + tls_config.set_single_cert(cert_chain, key).expect("invalid key"); + + + let acceptor = TlsAcceptor::from(Arc::new(tls_config)); + + Ok(TlsListener { + listener, + acceptor, + state: TlsListenerState::Listening, + }) + }) +} + +impl Connection for TlsStream { + fn remote_addr(&self) -> Option { + self.get_ref().0.remote_addr() + } +} diff --git a/core/lib/src/config/config.rs b/core/lib/src/config/config.rs index af5e417d..8efed903 100644 --- a/core/lib/src/config/config.rs +++ b/core/lib/src/config/config.rs @@ -574,33 +574,23 @@ impl Config { /// ``` #[cfg(feature = "tls")] pub fn set_tls(&mut self, certs_path: &str, key_path: &str) -> Result<()> { - use crate::http::tls::pemfile::{certs, rsa_private_keys}; - use std::fs::File; - use std::io::BufReader; + use crate::http::tls::{load_certs, load_private_key, Error}; let pem_err = "malformed PEM file"; - // TODO.async: Fully copy from hyper-sync-rustls, move to http/src/tls - // Partially extracted from hyper-sync-rustls - // Load the certificates. - let certs = match File::open(self.root_relative(certs_path)) { - Ok(file) => certs(&mut BufReader::new(file)).map_err(|_| { - self.bad_type("tls", pem_err, "a valid certificates file") - }), - Err(e) => Err(ConfigError::Io(e, "tls.certs"))?, - }?; + let certs = load_certs(self.root_relative(certs_path)) + .map_err(|e| match e { + Error::Io(e) => ConfigError::Io(e, "tls.certs"), + _ => self.bad_type("tls", pem_err, "a valid certificates file") + })?; // And now the private key. - let mut keys = match File::open(self.root_relative(key_path)) { - Ok(file) => rsa_private_keys(&mut BufReader::new(file)).map_err(|_| { - self.bad_type("tls", pem_err, "a valid private key file") - }), - Err(e) => Err(ConfigError::Io(e, "tls.key")), - }?; - - // TODO.async: Proper check for one key - let key = keys.remove(0); + let key = load_private_key(self.root_relative(key_path)) + .map_err(|e| match e { + Error::Io(e) => ConfigError::Io(e, "tls.key"), + _ => self.bad_type("tls", pem_err, "a valid private key file") + })?; self.tls = Some(TlsConfig { certs, key }); Ok(()) diff --git a/core/lib/src/data/data.rs b/core/lib/src/data/data.rs index db11991d..ead5c874 100644 --- a/core/lib/src/data/data.rs +++ b/core/lib/src/data/data.rs @@ -76,6 +76,8 @@ impl Data { pub(crate) fn from_hyp(body: hyper::Body) -> impl Future { // TODO.async: This used to also set the read timeout to 5 seconds. + // Such a short read timeout is likely no longer necessary, but some + // kind of idle timeout should be implemented. Data::new(body) } diff --git a/core/lib/src/error.rs b/core/lib/src/error.rs index e9ed7696..8e50794c 100644 --- a/core/lib/src/error.rs +++ b/core/lib/src/error.rs @@ -26,7 +26,7 @@ pub enum Error { #[derive(Debug)] pub enum LaunchErrorKind { /// Binding to the provided address/port failed. - Bind(hyper::Error), + Bind(io::Error), /// An I/O error occurred during launch. Io(io::Error), /// Route collisions were detected. diff --git a/core/lib/src/rocket.rs b/core/lib/src/rocket.rs index 550dd95e..e5aeb6d1 100644 --- a/core/lib/src/rocket.rs +++ b/core/lib/src/rocket.rs @@ -3,21 +3,17 @@ use std::convert::{From, TryInto}; use std::cmp::min; use std::io; use std::mem; -use std::net::ToSocketAddrs; use std::sync::Arc; -use std::time::Duration; use futures::future::{Future, FutureExt, BoxFuture}; use futures::channel::{mpsc, oneshot}; use futures::stream::StreamExt; -use futures::task::SpawnExt; +use futures::task::{Spawn, SpawnExt}; use futures_tokio_compat::Compat as TokioCompat; use yansi::Paint; use state::Container; -#[cfg(feature = "tls")] use crate::http::tls::TlsAcceptor; - use crate::{logger, handler}; use crate::config::{Config, FullConfig, ConfigError, LoggedValue}; use crate::request::{Request, FormItems}; @@ -33,6 +29,7 @@ use crate::ext::AsyncReadExt; use crate::shutdown::{ShutdownHandle, ShutdownHandleManaged}; use crate::http::{Method, Status, Header}; +use crate::http::private::{Listener, Connection, Incoming}; use crate::http::hyper::{self, header}; use crate::http::uri::Origin; @@ -711,61 +708,32 @@ impl Rocket { Ok(self) } - /// Similar to `launch()`, but using a custom Tokio runtime and returning - /// a `Future` that completes along with the server. The runtime has no - /// restrictions other than being Tokio-based, and can have other tasks - /// running on it. - /// - /// # Example - /// - /// ```rust - /// use futures::future::FutureExt; - /// - /// // This gives us the default behavior. Alternatively, we could use a - /// // `tokio::runtime::Builder` to configure with greater detail. - /// let runtime = tokio::runtime::Runtime::new().expect("error creating runtime"); - /// - /// # if false { - /// let server_done = rocket::ignite().spawn_on(&runtime).expect("error launching server"); - /// runtime.block_on(async move { - /// let result = server_done.await; - /// assert!(result.is_ok()); - /// }); - /// # } - /// ``` - pub fn spawn_on( - mut self, - runtime: &tokio::runtime::Runtime, - ) -> Result>, LaunchError> { - #[cfg(feature = "tls")] use crate::http::tls; - - self = self.prelaunch_check()?; + // TODO.async: Solidify the Listener APIs and make this function public + async fn listen_on(mut self, listener: L, spawn: S) -> Result<(), crate::error::Error> + where + L: Listener + Send + Unpin + 'static, + ::Connection: Send + Unpin + 'static, + S: Spawn + Clone + Send + 'static, + { + self = self.prelaunch_check().map_err(crate::error::Error::Launch)?; self.fairings.pretty_print_counts(); - let full_addr = format!("{}:{}", self.config.address, self.config.port); - let addrs = match full_addr.to_socket_addrs() { - Ok(a) => a.collect::>(), - // TODO.async: Reconsider this error type - Err(e) => return Err(From::from(io::Error::new(io::ErrorKind::Other, e))), - }; - - // TODO.async: support for TLS, unix sockets. - // Likely will be implemented with a custom "Incoming" type. - - let mut incoming = match hyper::AddrIncoming::bind(&addrs[0]) { - Ok(incoming) => incoming, - Err(e) => return Err(LaunchError::new(LaunchErrorKind::Bind(e))), - }; - // Determine the address and port we actually binded to. - self.config.port = incoming.local_addr().port(); + self.config.port = listener.local_addr().map(|a| a.port()).unwrap_or(0); - let proto = "http://"; + let proto = if self.config.tls.is_some() { + "https://" + } else { + "http://" + }; + + let full_addr = format!("{}:{}", self.config.address, self.config.port); // Set the keep-alive. - let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64)); - incoming.set_keepalive(timeout); + // TODO.async: implement keep-alive in Listener + // let timeout = self.config.keep_alive.map(|s| Duration::from_secs(s as u64)); + // listener.set_keepalive(timeout); // Freeze managed state for synchronization-free accesses later. self.state.freeze(); @@ -782,41 +750,108 @@ impl Rocket { // Restore the log level back to what it originally was. logger::pop_max_level(); - // We need to get these values before moving `self` into an `Arc`. + // We need to get this before moving `self` into an `Arc`. let mut shutdown_receiver = self.shutdown_receiver .take().expect("shutdown receiver has already been used"); - #[cfg(feature = "ctrl_c_shutdown")] - let shutdown_handle = self.get_shutdown_handle(); - let rocket = Arc::new(self); - let spawn = Box::new(TokioCompat::new(runtime.executor())); - let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| { + let spawn_makeservice = spawn.clone(); + let service = hyper::make_service_fn(move |connection: &::Connection| { let rocket = rocket.clone(); - let remote_addr = socket.remote_addr(); - let spawn = spawn.clone(); + let remote_addr = connection.remote_addr().unwrap_or_else(|| "0.0.0.0".parse().unwrap()); + let spawn_service = spawn_makeservice.clone(); async move { Ok::<_, std::convert::Infallible>(hyper::service_fn(move |req| { - hyper_service_fn(rocket.clone(), remote_addr, spawn.clone(), req) + hyper_service_fn(rocket.clone(), remote_addr, spawn_service.clone(), req) })) } }); - #[cfg(feature = "ctrl_c_shutdown")] - let (cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver) = oneshot::channel(); - // NB: executor must be passed manually here, see hyperium/hyper#1537 - let (future, handle) = hyper::Server::builder(incoming) - .executor(runtime.executor()) + hyper::Server::builder(Incoming::from_listener(listener)) + .executor(TokioCompat::new(spawn)) .serve(service) .with_graceful_shutdown(async move { shutdown_receiver.next().await; }) - .inspect(|_| { - #[cfg(feature = "ctrl_c_shutdown")] - let _ = cancel_ctrl_c_listener_sender.send(()); - }) - .remote_handle(); + .await + .map_err(crate::error::Error::Run) + } - runtime.spawn(future); + /// Similar to `launch()`, but using a custom Tokio runtime and returning + /// a `Future` that completes along with the server. The runtime has no + /// restrictions other than being Tokio-based, and can have other tasks + /// running on it. + /// + /// # Example + /// + /// ```rust + /// use futures::future::FutureExt; + /// + /// // This gives us the default behavior. Alternatively, we could use a + /// // `tokio::runtime::Builder` to configure with greater detail. + /// let runtime = tokio::runtime::Runtime::new().expect("error creating runtime"); + /// + /// # if false { + /// let server_done = rocket::ignite().spawn_on(&runtime); + /// runtime.block_on(async move { + /// let result = server_done.await; + /// assert!(result.is_ok()); + /// }); + /// # } + /// ``` + pub fn spawn_on( + self, + runtime: &tokio::runtime::Runtime, + ) -> impl Future> { + use std::net::ToSocketAddrs; + + use crate::error::Error::Launch; + + let full_addr = format!("{}:{}", self.config.address, self.config.port); + let addrs = match full_addr.to_socket_addrs() { + Ok(a) => a.collect::>(), + Err(e) => return futures::future::err(Launch(From::from(e))).boxed(), + }; + let addr = addrs[0]; + let spawn = TokioCompat::new(runtime.executor()); + + #[cfg(feature = "ctrl_c_shutdown")] + let ( + shutdown_handle, + (cancel_ctrl_c_listener_sender, cancel_ctrl_c_listener_receiver) + ) = ( + self.get_shutdown_handle(), + oneshot::channel() + ); + + let server = async move { + macro_rules! listen_on { + ($spawn:expr, $expr:expr) => {{ + let listener = match $expr { + Ok(ok) => ok, + Err(err) => return Err(Launch(LaunchError::new(LaunchErrorKind::Bind(err)))), + }; + self.listen_on(listener, spawn).await + }}; + } + + #[cfg(feature = "tls")] + { + if let Some(tls) = self.config.tls.clone() { + listen_on!(spawn, crate::http::tls::bind_tls(addr, tls.certs, tls.key).await) + } else { + listen_on!(spawn, crate::http::private::bind_tcp(addr).await) + } + } + #[cfg(not(feature = "tls"))] + { + listen_on!(spawn, crate::http::private::bind_tcp(addr).await) + } + }; + + #[cfg(feature = "ctrl_c_shutdown")] + let server = server.inspect(|_| { + let _ = cancel_ctrl_c_listener_sender.send(()); + }); #[cfg(feature = "ctrl_c_shutdown")] match tokio::net::signal::ctrl_c() { @@ -843,7 +878,7 @@ impl Rocket { }, } - Ok(handle) + server.boxed() } /// Starts the application server and begins listening for and dispatching @@ -866,8 +901,6 @@ impl Rocket { /// # } /// ``` pub fn launch(self) -> Result<(), crate::error::Error> { - use crate::error::Error; - // TODO.async What meaning should config.workers have now? // Initialize the tokio runtime let runtime = tokio::runtime::Builder::new() @@ -875,10 +908,7 @@ impl Rocket { .build() .expect("Cannot build runtime!"); - match self.spawn_on(&runtime) { - Ok(fut) => runtime.block_on(fut).map_err(Error::Run), - Err(err) => Err(Error::Launch(err)), - } + runtime.block_on(self.spawn_on(&runtime)) } /// Returns a [`ShutdownHandle`], which can be used to gracefully terminate @@ -893,19 +923,17 @@ impl Rocket { /// # /// let rocket = rocket::ignite(); /// let handle = rocket.get_shutdown_handle(); - /// # let real_handle = rocket.get_shutdown_handle(); /// /// # if false { /// thread::spawn(move || { /// thread::sleep(Duration::from_secs(10)); /// handle.shutdown(); /// }); - /// # } - /// # real_handle.shutdown(); /// /// // Shuts down after 10 seconds /// let shutdown_result = rocket.launch(); /// assert!(shutdown_result.is_ok()); + /// # } /// ``` #[inline(always)] pub fn get_shutdown_handle(&self) -> ShutdownHandle {