Add 'Rocket::spawn_on' to spawn a server on a user-provided (tokio) runtime.

This commit is contained in:
Jacob Pratt 2019-08-17 22:00:32 -04:00 committed by Sergio Benitez
parent e44c5896b8
commit a87e1577aa
1 changed files with 63 additions and 29 deletions

View File

@ -8,7 +8,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::pin::Pin; use std::pin::Pin;
use futures::future::Future; use futures::future::{Future, FutureExt, TryFutureExt};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::task::SpawnExt; use futures::task::SpawnExt;
use futures_tokio_compat::Compat as TokioCompat; use futures_tokio_compat::Compat as TokioCompat;
@ -702,46 +702,44 @@ impl Rocket {
Ok(self) Ok(self)
} }
/// Starts the application server and begins listening for and dispatching /// Similar to `launch()`, but using a custom Tokio runtime and returning
/// requests to mounted routes and catchers. Unless there is an error, this /// a `Future` that completes along with the server. The runtime has no
/// function does not return and blocks until program termination. /// restrictions other than being Tokio-based, and can have other tasks
/// /// running on it.
/// # Error
///
/// If there is a problem starting the application, a [`LaunchError`] is
/// returned. Note that a value of type `LaunchError` panics if dropped
/// without first being inspected. See the [`LaunchError`] documentation for
/// more information.
/// ///
/// # Example /// # Example
/// ///
/// ```rust /// ```rust
/// use futures::future::FutureExt;
///
/// // This gives us the default behavior. Alternatively, we could use a
/// // `tokio::runtime::Builder` to configure with greater detail.
/// let runtime = tokio::runtime::Runtime::new().expect("error creating runtime");
///
/// # if false { /// # if false {
/// rocket::ignite().launch(); /// let server_done = rocket::ignite().spawn_on(&runtime).expect("error launching server");
/// runtime.block_on(async move {
/// let result = server_done.await;
/// assert!(result.is_ok());
/// });
/// # } /// # }
/// ``` /// ```
pub fn launch(mut self) -> LaunchError { // TODO.async Decide on an return type, possibly creating a discriminated union.
pub fn spawn_on(
mut self,
runtime: &tokio::runtime::Runtime,
) -> Result<impl Future<Output = Result<(), Box<dyn std::error::Error>>>, LaunchError> {
#[cfg(feature = "tls")] use crate::http::tls; #[cfg(feature = "tls")] use crate::http::tls;
self = match self.prelaunch_check() { self = self.prelaunch_check()?;
Ok(rocket) => rocket,
Err(launch_error) => return launch_error
};
self.fairings.pretty_print_counts(); self.fairings.pretty_print_counts();
// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");
let full_addr = format!("{}:{}", self.config.address, self.config.port); let full_addr = format!("{}:{}", self.config.address, self.config.port);
let addrs = match full_addr.to_socket_addrs() { let addrs = match full_addr.to_socket_addrs() {
Ok(a) => a.collect::<Vec<_>>(), Ok(a) => a.collect::<Vec<_>>(),
// TODO.async: Reconsider this error type // TODO.async: Reconsider this error type
Err(e) => return From::from(io::Error::new(io::ErrorKind::Other, e)), Err(e) => return Err(From::from(io::Error::new(io::ErrorKind::Other, e))),
}; };
// TODO.async: support for TLS, unix sockets. // TODO.async: support for TLS, unix sockets.
@ -749,7 +747,7 @@ impl Rocket {
let mut incoming = match hyper::AddrIncoming::bind(&addrs[0]) { let mut incoming = match hyper::AddrIncoming::bind(&addrs[0]) {
Ok(incoming) => incoming, Ok(incoming) => incoming,
Err(e) => return LaunchError::new(LaunchErrorKind::Bind(e)), Err(e) => return Err(LaunchError::new(LaunchErrorKind::Bind(e))),
}; };
// Determine the address and port we actually binded to. // Determine the address and port we actually binded to.
@ -794,10 +792,46 @@ impl Rocket {
.executor(runtime.executor()) .executor(runtime.executor())
.serve(service); .serve(service);
// TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error> let (future, handle) = server.remote_handle();
runtime.block_on(server).expect("TODO.async handle error"); runtime.spawn(future);
Ok(handle.err_into())
}
unreachable!("the call to `block_on` should block on success") /// Starts the application server and begins listening for and dispatching
/// requests to mounted routes and catchers. Unless there is an error, this
/// function does not return and blocks until program termination.
///
/// # Error
///
/// If there is a problem starting the application, a [`LaunchError`] is
/// returned. Note that a value of type `LaunchError` panics if dropped
/// without first being inspected. See the [`LaunchError`] documentation for
/// more information.
///
/// # Example
///
/// ```rust
/// # if false {
/// rocket::ignite().launch();
/// # }
/// ```
// TODO.async Decide on an return type, possibly creating a discriminated union.
pub fn launch(self) -> Box<dyn std::error::Error> {
// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");
// TODO.async: Use with_graceful_shutdown, and let launch() return a Result<(), Error>
match self.spawn_on(&runtime) {
Ok(fut) => match runtime.block_on(fut) {
Ok(_) => unreachable!("the call to `block_on` should block on success"),
Err(err) => err,
}
Err(err) => Box::new(err),
}
} }
/// Returns an iterator over all of the routes mounted on this instance of /// Returns an iterator over all of the routes mounted on this instance of