mirror of https://github.com/rwf2/Rocket.git
Revamp shutdown to recover shutdown 'Rocket'.
The core improvement is that `Rocket::launch()` now resolves to `Ok(Rocket<Ignite>)` on nominal shutdown. Furthermore, shutdown never terminates the running process. Other changes directly related to shutdown: * Runtime worker thread names are now irrelevant to graceful shutdown. * `ErrorKind::Runtime` was removed; `ErrorKind::Shutdown` was added. * The `force` config value is only read from the default provider. * If `force`, Rocket's constructed async runtime is terminated. Other related changes: * The exported `hyper` module docs properly reflect public re-exports.
This commit is contained in:
parent
761ffb009e
commit
0ba56ccbb3
|
@ -388,14 +388,16 @@ pub fn async_test(args: TokenStream, input: TokenStream) -> TokenStream {
|
|||
/// ```rust,no_run
|
||||
/// #[rocket::main]
|
||||
/// async fn main() -> Result<(), rocket::Error> {
|
||||
/// rocket::build()
|
||||
/// let _rocket = rocket::build()
|
||||
/// .ignite().await?
|
||||
/// .launch().await
|
||||
/// .launch().await?;
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// It should be used only when inspection of an ignited instance of `Rocket` is
|
||||
/// required, or when the return value of `launch()` is to be inspected:
|
||||
/// It should be used only when the return values of `ignite()` or `launch()`
|
||||
/// are to be inspected:
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// #[rocket::main]
|
||||
|
@ -403,10 +405,10 @@ pub fn async_test(args: TokenStream, input: TokenStream) -> TokenStream {
|
|||
/// let rocket = rocket::build().ignite().await?;
|
||||
/// println!("Hello, Rocket: {:?}", rocket);
|
||||
///
|
||||
/// let result = rocket.launch().await;
|
||||
/// println!("The server shutdown: {:?}", result);
|
||||
/// let rocket = rocket.launch().await?;
|
||||
/// println!("Welcome back, Rocket: {:?}", rocket);
|
||||
///
|
||||
/// result
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
|
|
|
@ -72,9 +72,9 @@ mod e {
|
|||
mod f {
|
||||
// main with async, is async, with termination return.
|
||||
#[rocket::main]
|
||||
async fn main() -> Result<(), String> {
|
||||
let result = rocket::build().launch().await;
|
||||
result.map_err(|e| e.to_string())
|
||||
async fn main() -> Result<(), rocket::Error> {
|
||||
let _: rocket::Rocket<rocket::Ignite> = rocket::build().launch().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,5 +90,6 @@ mod g {
|
|||
#[rocket::main]
|
||||
async fn main() -> Result<(), String> {
|
||||
let result = rocket::build().launch().await;
|
||||
result.map_err(|e| e.to_string())
|
||||
let _: rocket::Rocket<rocket::Ignite> = result.map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
//! Re-exported hyper HTTP library types.
|
||||
//!
|
||||
//! All types that are re-exported from Hyper reside inside of this module.
|
||||
//! These types will, with certainty, be removed with time, but they reside here
|
||||
//! while necessary.
|
||||
//! Hyper re-exports.
|
||||
|
||||
#[doc(hidden)] pub use hyper::*;
|
||||
#[doc(hidden)] pub use http::*;
|
||||
pub use hyper::{Method, Error, Body, Uri, Version, Request, Response};
|
||||
pub use hyper::{body, server, service};
|
||||
pub use http::{HeaderValue, request, uri};
|
||||
|
||||
/// Reexported http header types.
|
||||
/// Reexported Hyper HTTP header types.
|
||||
pub mod header {
|
||||
macro_rules! import_http_headers {
|
||||
($($name:ident),*) => ($(
|
||||
|
|
|
@ -58,10 +58,10 @@ use crate::config::SecretKey;
|
|||
pub struct Config {
|
||||
/// The selected profile. **(default: _debug_ `debug` / _release_ `release`)**
|
||||
///
|
||||
/// **Note:** This field is never serialized nor deserialized. When a
|
||||
/// _**Note:** This field is never serialized nor deserialized. When a
|
||||
/// `Config` is merged into a `Figment` as a `Provider`, this profile is
|
||||
/// selected on the `Figment`. When a `Config` is extracted, this field is
|
||||
/// set to the extracting Figment's selected `Profile`.
|
||||
/// set to the extracting Figment's selected `Profile`._
|
||||
#[serde(skip)]
|
||||
pub profile: Profile,
|
||||
/// IP address to serve on. **(default: `127.0.0.1`)**
|
||||
|
@ -69,6 +69,9 @@ pub struct Config {
|
|||
/// Port to serve on. **(default: `8000`)**
|
||||
pub port: u16,
|
||||
/// Number of threads to use for executing futures. **(default: `num_cores`)**
|
||||
///
|
||||
/// _**Note:** Rocket only reads this value from sources in the [default
|
||||
/// provider](Config::figment())._
|
||||
pub workers: usize,
|
||||
/// How, if at all, to identify the server via the `Server` header.
|
||||
/// **(default: `"Rocket"`)**
|
||||
|
@ -87,8 +90,8 @@ pub struct Config {
|
|||
pub tls: Option<TlsConfig>,
|
||||
/// The secret key for signing and encrypting. **(default: `0`)**
|
||||
///
|
||||
/// **Note:** This field _always_ serializes as a 256-bit array of `0`s to
|
||||
/// aid in preventing leakage of the secret key.
|
||||
/// _**Note:** This field _always_ serializes as a 256-bit array of `0`s to
|
||||
/// aid in preventing leakage of the secret key._
|
||||
#[cfg(feature = "secrets")]
|
||||
#[cfg_attr(nightly, doc(cfg(feature = "secrets")))]
|
||||
#[serde(serialize_with = "SecretKey::serialize_zero")]
|
||||
|
|
|
@ -110,15 +110,14 @@ impl fmt::Display for Sig {
|
|||
///
|
||||
/// If tasks are _still_ executing after both periods _and_ a Rocket configured
|
||||
/// async runtime is in use, Rocket waits an unspecified amount of time (not to
|
||||
/// exceed 1s) and forcefully exits the current process with an exit code of
|
||||
/// `1`. This guarantees that the server process terminates, prohibiting
|
||||
/// uncooperative, runaway I/O from preventing shutdown altogether.
|
||||
/// exceed 1s) and forcefully terminates the asynchronous runtime. This
|
||||
/// guarantees that the server process terminates, prohibiting uncooperative,
|
||||
/// runaway I/O from preventing shutdown altogether.
|
||||
///
|
||||
/// A "Rocket configured runtime" is one started by the `#[rocket::main]` and
|
||||
/// `#[launch]` attributes. Rocket _never_ forcefully terminates a server that
|
||||
/// is running inside of a custom runtime. A server that creates its own async
|
||||
/// runtime must take care to terminate itself if tasks it spawns fail to
|
||||
/// cooperate.
|
||||
/// `#[launch]` attributes. Rocket _never_ forcefully terminates a custom
|
||||
/// runtime. A server that creates its own async runtime must take care to
|
||||
/// terminate itself if tasks it spawns fail to cooperate.
|
||||
///
|
||||
/// Under normal circumstances, forced termination should never occur. No use of
|
||||
/// "normal" cooperative I/O (that is, via `.await` or `task::spawn()`) should
|
||||
|
@ -228,13 +227,15 @@ pub struct Shutdown {
|
|||
///
|
||||
/// **default: `3`**
|
||||
pub mercy: u32,
|
||||
/// Whether to force termination of a process that refuses to cooperatively
|
||||
/// shutdown.
|
||||
/// Whether to force termination of an async runtime that refuses to
|
||||
/// cooperatively shutdown.
|
||||
///
|
||||
/// Rocket _never_ forcefully terminates a server that is running inside of
|
||||
/// a custom runtime irrespective of this value. A server that creates its
|
||||
/// own async runtime must take care to terminate itself if it fails to
|
||||
/// cooperate.
|
||||
/// Rocket _never_ forcefully terminates a custom runtime, irrespective of
|
||||
/// this value. A server that creates its own async runtime must take care
|
||||
/// to terminate itself if it fails to cooperate.
|
||||
///
|
||||
/// _**Note:** Rocket only reads this value from sources in the [default
|
||||
/// provider](crate::Config::figment())._
|
||||
///
|
||||
/// **default: `true`**
|
||||
#[serde(deserialize_with = "figment::util::bool_from_str_or_int")]
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
//! Types representing various errors that can occur in a Rocket application.
|
||||
|
||||
use std::{io, fmt};
|
||||
use std::sync::atomic::{Ordering, AtomicBool};
|
||||
use std::sync::{Arc, atomic::{Ordering, AtomicBool}};
|
||||
use std::error::Error as StdError;
|
||||
|
||||
use yansi::Paint;
|
||||
use figment::Profile;
|
||||
|
||||
use crate::{Rocket, Orbit};
|
||||
|
||||
/// An error that occurs during launch.
|
||||
///
|
||||
/// An `Error` is returned by [`launch()`](crate::Rocket::launch()) when
|
||||
/// launching an application fails or, more rarely, when the runtime fails after
|
||||
/// lauching.
|
||||
/// An `Error` is returned by [`launch()`](Rocket::launch()) when launching an
|
||||
/// application fails or, more rarely, when the runtime fails after lauching.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
|
@ -76,8 +78,6 @@ pub enum ErrorKind {
|
|||
Bind(io::Error),
|
||||
/// An I/O error occurred during launch.
|
||||
Io(io::Error),
|
||||
/// An I/O error occurred in the runtime.
|
||||
Runtime(Box<dyn std::error::Error + Send + Sync>),
|
||||
/// A valid [`Config`](crate::Config) could not be extracted from the
|
||||
/// configured figment.
|
||||
Config(figment::Error),
|
||||
|
@ -89,6 +89,13 @@ pub enum ErrorKind {
|
|||
SentinelAborts(Vec<crate::sentinel::Sentry>),
|
||||
/// The configuration profile is not debug but not secret key is configured.
|
||||
InsecureSecretKey(Profile),
|
||||
/// Shutdown failed.
|
||||
Shutdown(
|
||||
/// The instance of Rocket that failed to shutdown.
|
||||
Arc<Rocket<Orbit>>,
|
||||
/// The error that occurred during shutdown, if any.
|
||||
Option<Box<dyn StdError + Send + Sync>>
|
||||
),
|
||||
}
|
||||
|
||||
impl From<ErrorKind> for Error {
|
||||
|
@ -103,6 +110,14 @@ impl Error {
|
|||
Error { handled: AtomicBool::new(false), kind }
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn shutdown<E>(rocket: Arc<Rocket<Orbit>>, error: E) -> Error
|
||||
where E: Into<Option<crate::http::hyper::Error>>
|
||||
{
|
||||
let error = error.into().map(|e| Box::new(e) as Box<dyn StdError + Sync + Send>);
|
||||
Error::new(ErrorKind::Shutdown(rocket, error))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn was_handled(&self) -> bool {
|
||||
self.handled.load(Ordering::Acquire)
|
||||
|
@ -146,10 +161,11 @@ impl fmt::Display for ErrorKind {
|
|||
ErrorKind::Io(e) => write!(f, "I/O error: {}", e),
|
||||
ErrorKind::Collisions(_) => "collisions detected".fmt(f),
|
||||
ErrorKind::FailedFairings(_) => "launch fairing(s) failed".fmt(f),
|
||||
ErrorKind::Runtime(e) => write!(f, "runtime error: {}", e),
|
||||
ErrorKind::InsecureSecretKey(_) => "insecure secret key config".fmt(f),
|
||||
ErrorKind::Config(_) => "failed to extract configuration".fmt(f),
|
||||
ErrorKind::SentinelAborts(_) => "sentinel(s) aborted".fmt(f),
|
||||
ErrorKind::Shutdown(_, Some(e)) => write!(f, "shutdown failed: {}", e),
|
||||
ErrorKind::Shutdown(_, None) => "shutdown failed".fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -212,11 +228,6 @@ impl Drop for Error {
|
|||
|
||||
panic!("aborting due to fairing failure(s)");
|
||||
}
|
||||
ErrorKind::Runtime(ref err) => {
|
||||
error!("An error occurred in the runtime:");
|
||||
info_!("{}", err);
|
||||
panic!("aborting due to runtime failure");
|
||||
}
|
||||
ErrorKind::InsecureSecretKey(profile) => {
|
||||
error!("secrets enabled in non-debug without `secret_key`");
|
||||
info_!("selected profile: {}", Paint::default(profile).bold());
|
||||
|
@ -237,6 +248,14 @@ impl Drop for Error {
|
|||
|
||||
panic!("aborting due to sentinel-triggered abort(s)");
|
||||
}
|
||||
ErrorKind::Shutdown(_, error) => {
|
||||
error!("Rocket failed to shutdown gracefully.");
|
||||
if let Some(e) = error {
|
||||
info_!("{}", e);
|
||||
}
|
||||
|
||||
panic!("aborting due to failed shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use crate::form::{FromFormField, ValueField, DataField, error::Errors};
|
|||
use crate::outcome::IntoOutcome;
|
||||
use crate::fs::FileName;
|
||||
|
||||
use tokio::task;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tempfile::{NamedTempFile, TempPath};
|
||||
|
@ -171,8 +172,7 @@ impl<'v> TempFile<'v> {
|
|||
let path = mem::replace(either, Either::Right(new_path.clone()));
|
||||
match path {
|
||||
Either::Left(temp) => {
|
||||
let result = tokio::task::spawn_blocking(move || temp.persist(new_path))
|
||||
.await
|
||||
let result = task::spawn_blocking(move || temp.persist(new_path)).await
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "spawn_block"))?;
|
||||
|
||||
if let Err(e) = result {
|
||||
|
@ -242,8 +242,7 @@ impl<'v> TempFile<'v> {
|
|||
let old_path = mem::replace(either, Either::Right(either.to_path_buf()));
|
||||
match old_path {
|
||||
Either::Left(temp) => {
|
||||
let result = tokio::task::spawn_blocking(move || temp.keep())
|
||||
.await
|
||||
let result = task::spawn_blocking(move || temp.keep()).await
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "spawn_block"))?;
|
||||
|
||||
if let Err(e) = result {
|
||||
|
@ -452,15 +451,13 @@ impl<'v> TempFile<'v> {
|
|||
.unwrap_or(Limits::FILE);
|
||||
|
||||
let temp_dir = req.rocket().config().temp_dir.relative();
|
||||
let file = tokio::task::spawn_blocking(move || {
|
||||
NamedTempFile::new_in(temp_dir)
|
||||
}).await.map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, "spawn_block panic")
|
||||
})??;
|
||||
|
||||
let file = task::spawn_blocking(move || NamedTempFile::new_in(temp_dir)).await;
|
||||
let file = file.map_err(|_| io::Error::new(io::ErrorKind::Other, "spawn_block panic"))??;
|
||||
let (file, temp_path) = file.into_parts();
|
||||
|
||||
let mut file = File::from_std(file);
|
||||
let n = data.open(limit).stream_to(tokio::io::BufWriter::new(&mut file)).await?;
|
||||
let fut = data.open(limit).stream_to(tokio::io::BufWriter::new(&mut file));
|
||||
let n = fut.await?;
|
||||
let temp_file = TempFile::File {
|
||||
content_type, file_name,
|
||||
path: Either::Left(temp_path),
|
||||
|
|
|
@ -141,9 +141,21 @@ pub mod http {
|
|||
//! This module exports types that map to HTTP concepts or to the underlying
|
||||
//! HTTP library when needed.
|
||||
|
||||
#[doc(inline)]
|
||||
#[doc(hidden)]
|
||||
pub use rocket_http::*;
|
||||
|
||||
/// Re-exported hyper HTTP library types.
|
||||
///
|
||||
/// All types that are re-exported from Hyper reside inside of this module.
|
||||
/// These types will, with certainty, be removed with time, but they reside here
|
||||
/// while necessary.
|
||||
pub mod hyper {
|
||||
#[doc(hidden)]
|
||||
pub use rocket_http::hyper::*;
|
||||
|
||||
pub use rocket_http::hyper::header;
|
||||
}
|
||||
|
||||
#[doc(inline)]
|
||||
pub use crate::cookies::*;
|
||||
}
|
||||
|
@ -221,29 +233,36 @@ pub use async_trait::async_trait;
|
|||
|
||||
/// WARNING: This is unstable! Do not use this method outside of Rocket!
|
||||
#[doc(hidden)]
|
||||
pub fn async_test<R>(fut: impl std::future::Future<Output = R>) -> R {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
// NOTE: graceful shutdown depends on the "rocket-worker" prefix.
|
||||
.thread_name("rocket-worker-test-thread")
|
||||
.worker_threads(1)
|
||||
pub fn async_run<F, R>(fut: F, workers: usize, force_end: bool, name: &str) -> R
|
||||
where F: std::future::Future<Output = R>
|
||||
{
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name(name)
|
||||
.worker_threads(workers)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("create tokio runtime")
|
||||
.block_on(fut)
|
||||
.expect("create tokio runtime");
|
||||
|
||||
let result = runtime.block_on(fut);
|
||||
if force_end {
|
||||
runtime.shutdown_timeout(std::time::Duration::from_millis(500));
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// WARNING: This is unstable! Do not use this method outside of Rocket!
|
||||
#[doc(hidden)]
|
||||
pub fn async_test<R>(fut: impl std::future::Future<Output = R>) -> R {
|
||||
async_run(fut, 1, true, "rocket-worker-test-thread")
|
||||
}
|
||||
|
||||
/// WARNING: This is unstable! Do not use this method outside of Rocket!
|
||||
#[doc(hidden)]
|
||||
pub fn async_main<R>(fut: impl std::future::Future<Output = R> + Send) -> R {
|
||||
// FIXME: The `workers` value won't reflect swaps of `Rocket` in attach
|
||||
// FIXME: These config values won't reflect swaps of `Rocket` in attach
|
||||
// fairings with different config values, or values from non-Rocket configs.
|
||||
// See tokio-rs/tokio#3329 for a necessary solution in `tokio`.
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(Config::from(Config::figment()).workers)
|
||||
// NOTE: graceful shutdown depends on the "rocket-worker" prefix.
|
||||
.thread_name("rocket-worker-thread")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("create tokio runtime")
|
||||
.block_on(fut)
|
||||
let config = Config::from(Config::figment());
|
||||
async_run(fut, config.workers, config.shutdown.force, "rocket-worker-thread")
|
||||
}
|
||||
|
|
|
@ -80,9 +80,11 @@ use crate::log::PaintExt;
|
|||
/// ```rust,no_run
|
||||
/// #[rocket::main]
|
||||
/// async fn main() -> Result<(), rocket::Error> {
|
||||
/// rocket::build()
|
||||
/// let _rocket = rocket::build()
|
||||
/// .ignite().await?
|
||||
/// .launch().await
|
||||
/// .launch().await?;
|
||||
///
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
|
@ -92,7 +94,8 @@ use crate::log::PaintExt;
|
|||
/// ```rust,no_run
|
||||
/// #[rocket::main]
|
||||
/// async fn main() -> Result<(), rocket::Error> {
|
||||
/// rocket::build().launch().await
|
||||
/// let _rocket = rocket::build().launch().await?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
|
@ -642,22 +645,36 @@ impl Rocket<Ignite> {
|
|||
rocket
|
||||
}
|
||||
|
||||
async fn _launch(self) -> Result<(), Error> {
|
||||
self.into_orbit().default_tcp_http_server(|rkt| Box::pin(async move {
|
||||
rkt.fairings.handle_liftoff(&rkt).await;
|
||||
async fn _launch(self) -> Result<Rocket<Ignite>, Error> {
|
||||
self.into_orbit()
|
||||
.default_tcp_http_server(|rkt| Box::pin(async move {
|
||||
rkt.fairings.handle_liftoff(&rkt).await;
|
||||
|
||||
let proto = rkt.config.tls_enabled().then(|| "https").unwrap_or("http");
|
||||
let socket_addr = SocketAddr::new(rkt.config.address, rkt.config.port);
|
||||
let addr = format!("{}://{}", proto, socket_addr);
|
||||
launch_info!("{}{} {}",
|
||||
Paint::emoji("🚀 "),
|
||||
Paint::default("Rocket has launched from").bold(),
|
||||
Paint::default(addr).bold().underline());
|
||||
})).await
|
||||
let proto = rkt.config.tls_enabled().then(|| "https").unwrap_or("http");
|
||||
let socket_addr = SocketAddr::new(rkt.config.address, rkt.config.port);
|
||||
let addr = format!("{}://{}", proto, socket_addr);
|
||||
launch_info!("{}{} {}",
|
||||
Paint::emoji("🚀 "),
|
||||
Paint::default("Rocket has launched from").bold(),
|
||||
Paint::default(addr).bold().underline());
|
||||
}))
|
||||
.await
|
||||
.map(|rocket| rocket.into_ignite())
|
||||
}
|
||||
}
|
||||
|
||||
impl Rocket<Orbit> {
|
||||
fn into_ignite(self) -> Rocket<Ignite> {
|
||||
Rocket(Igniting {
|
||||
router: self.0.router,
|
||||
fairings: self.0.fairings,
|
||||
figment: self.0.figment,
|
||||
config: self.0.config,
|
||||
state: self.0.state,
|
||||
shutdown: self.0.shutdown,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the finalized, active configuration. This is guaranteed to
|
||||
/// remain stable after [`Rocket::ignite()`], through ignition and into
|
||||
/// orbit.
|
||||
|
@ -845,14 +862,16 @@ impl<P: Phase> Rocket<P> {
|
|||
///
|
||||
/// * graceful shutdown via [`Shutdown::notify()`] completes.
|
||||
///
|
||||
/// The returned value on `Ok(())` is previously running instance.
|
||||
///
|
||||
/// The `Future` does not resolve otherwise.
|
||||
///
|
||||
/// # Error
|
||||
///
|
||||
/// If there is a problem starting the application, an [`Error`] is
|
||||
/// returned. Note that a value of type `Error` panics if dropped without
|
||||
/// first being inspected. See the [`Error`] documentation for more
|
||||
/// information.
|
||||
/// If there is a problem starting the application or the application fails
|
||||
/// unexpectedly while running, an [`Error`] is returned. Note that a value
|
||||
/// of type `Error` panics if dropped without first being inspected. See the
|
||||
/// [`Error`] documentation for more information.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
|
@ -865,11 +884,11 @@ impl<P: Phase> Rocket<P> {
|
|||
/// println!("Rocket: deorbit.");
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn launch(self) -> Result<(), Error> {
|
||||
pub async fn launch(self) -> Result<Rocket<Ignite>, Error> {
|
||||
match self.0.into_state() {
|
||||
State::Build(s) => Rocket::from(s).ignite().await?._launch().await,
|
||||
State::Ignite(s) => Rocket::from(s)._launch().await,
|
||||
State::Orbit(_) => Ok(())
|
||||
State::Orbit(s) => Ok(Rocket::from(s).into_ignite())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,8 +4,9 @@ use std::time::Duration;
|
|||
|
||||
use yansi::Paint;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::sleep;
|
||||
use futures::stream::StreamExt;
|
||||
use futures::future::{self, FutureExt, Future, TryFutureExt, BoxFuture};
|
||||
use futures::future::{FutureExt, Future, BoxFuture};
|
||||
|
||||
use crate::{route, Rocket, Orbit, Request, Response, Data, Config};
|
||||
use crate::form::Form;
|
||||
|
@ -355,7 +356,7 @@ impl Rocket<Orbit> {
|
|||
crate::catcher::default_handler(Status::InternalServerError, req)
|
||||
}
|
||||
|
||||
pub(crate) async fn default_tcp_http_server<C>(mut self, ready: C) -> Result<(), Error>
|
||||
pub(crate) async fn default_tcp_http_server<C>(mut self, ready: C) -> Result<Self, Error>
|
||||
where C: for<'a> Fn(&'a Self) -> BoxFuture<'a, ()>
|
||||
{
|
||||
use std::net::ToSocketAddrs;
|
||||
|
@ -390,7 +391,7 @@ impl Rocket<Orbit> {
|
|||
}
|
||||
|
||||
// TODO.async: Solidify the Listener APIs and make this function public
|
||||
pub(crate) async fn http_server<L>(self, listener: L) -> Result<(), Error>
|
||||
pub(crate) async fn http_server<L>(self, listener: L) -> Result<Self, Error>
|
||||
where L: Listener + Send, <L as Listener>::Connection: Send + Unpin + 'static
|
||||
{
|
||||
// Emit a warning if we're not running inside of Rocket's async runtime.
|
||||
|
@ -411,7 +412,6 @@ impl Rocket<Orbit> {
|
|||
// received, results in triggering the notify.
|
||||
let shutdown = self.shutdown();
|
||||
let sig_stream = self.config.shutdown.signal_stream();
|
||||
let force_shutdown = self.config.shutdown.force;
|
||||
let grace = self.config.shutdown.grace as u64;
|
||||
let mercy = self.config.shutdown.mercy as u64;
|
||||
|
||||
|
@ -436,7 +436,7 @@ impl Rocket<Orbit> {
|
|||
|
||||
// Create the Hyper `Service`.
|
||||
let rocket = Arc::new(self);
|
||||
let service_fn = move |conn: &CancellableIo<_, L::Connection>| {
|
||||
let service_fn = |conn: &CancellableIo<_, L::Connection>| {
|
||||
let rocket = rocket.clone();
|
||||
let connection = ConnectionMeta {
|
||||
remote: conn.peer_address(),
|
||||
|
@ -452,7 +452,7 @@ impl Rocket<Orbit> {
|
|||
|
||||
// NOTE: `hyper` uses `tokio::spawn()` as the default executor.
|
||||
let listener = CancellableListener::new(shutdown.clone(), listener, grace, mercy);
|
||||
let builder = hyper::Server::builder(Incoming::new(listener).nodelay(true));
|
||||
let builder = hyper::server::Server::builder(Incoming::new(listener).nodelay(true));
|
||||
|
||||
#[cfg(feature = "http2")]
|
||||
let builder = builder.http2_keep_alive_interval(match keep_alive {
|
||||
|
@ -464,42 +464,90 @@ impl Rocket<Orbit> {
|
|||
.http1_keepalive(keep_alive != 0)
|
||||
.http1_preserve_header_case(true)
|
||||
.serve(hyper::service::make_service_fn(service_fn))
|
||||
.with_graceful_shutdown(shutdown.clone())
|
||||
.map_err(|e| Error::new(ErrorKind::Runtime(Box::new(e))));
|
||||
.with_graceful_shutdown(shutdown.clone());
|
||||
|
||||
// Wait for a shutdown notification or for the server to somehow fail.
|
||||
// This deserves some exaplanation.
|
||||
//
|
||||
// This is largely to deal with Hyper's dreadful and largely nonexistent
|
||||
// handling of shutdown, in general, nevermind graceful.
|
||||
//
|
||||
// When Hyper receives a "graceful shutdown" request, it stops accepting
|
||||
// new requests. That's it. It continues to process existing requests
|
||||
// and outgoing responses forever and never cancels them. As a result,
|
||||
// Rocket must take it upon itself to cancel any existing I/O.
|
||||
//
|
||||
// To do so, Rocket wraps all connections in a `CancellableIo` struct,
|
||||
// an internal structure that gracefully closes I/O when it receives a
|
||||
// signal. That signal is the `shutdown` future. When the future
|
||||
// resolves, `CancellableIo` begins to terminate in grace, mercy, and
|
||||
// finally force close phases. Since all connections are wrapped in
|
||||
// `CancellableIo`, this eventually ends all I/O.
|
||||
//
|
||||
// At that point, unless a user spawned an infinite, stand-alone task
|
||||
// that isn't monitoring `Shutdown`, all tasks should resolve. This
|
||||
// means that all instances of the shared `Arc<Rocket>` are dropped and
|
||||
// we can return the owned instance of `Rocket`.
|
||||
//
|
||||
// Unfortunately, the Hyper `server` future resolves as soon as it has
|
||||
// finishes processing requests without respect for ongoing responses.
|
||||
// That is, `server` resolves even when there are running tasks that are
|
||||
// generating a response. So, `server` resolving implies little to
|
||||
// nothing about the state of connections. As a result, we depend on the
|
||||
// timing of grace + mercy + some buffer to determine when all
|
||||
// connections should be closed, thus all tasks should be complete, thus
|
||||
// all references to `Arc<Rocket>` should be dropped and we can get a
|
||||
// unique reference.
|
||||
tokio::pin!(server);
|
||||
match future::select(shutdown, server).await {
|
||||
future::Either::Left((_, server)) => {
|
||||
// If a task has some runaway I/O, like an infinite loop, the
|
||||
// runtime will block indefinitely when it is dropped. To
|
||||
// subvert, we start a ticking process-exit time bomb here.
|
||||
if force_shutdown {
|
||||
// Only a worker thread will have the specified thread name.
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// We only hit our `exit()` if the process doesn't
|
||||
// otherwise exit since this `spawn()` won't block.
|
||||
let this = std::thread::current();
|
||||
std::thread::spawn(move || {
|
||||
std::thread::sleep(Duration::from_secs(grace + mercy));
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
if this.name().map_or(false, |s| s.starts_with("rocket-worker")) {
|
||||
error!("Server failed to shutdown cooperatively. Terminating.");
|
||||
std::process::exit(1);
|
||||
} else {
|
||||
warn!("Server failed to shutdown cooperatively.");
|
||||
warn_!("Server is executing inside of a custom runtime.");
|
||||
info_!("Rocket's runtime is `#[rocket::main]` or `#[launch]`.");
|
||||
warn_!("Refusing to terminate runaway custom runtime.");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
info!("Received shutdown request. Waiting for pending I/O...");
|
||||
server.await
|
||||
_ = shutdown => {
|
||||
info!("Shutdown requested. Waiting for pending I/O...");
|
||||
let grace_timer = sleep(Duration::from_secs(grace));
|
||||
let mercy_timer = sleep(Duration::from_secs(grace + mercy));
|
||||
let shutdown_timer = sleep(Duration::from_secs(grace + mercy + 1));
|
||||
tokio::pin!(grace_timer, mercy_timer, shutdown_timer);
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
result = &mut server => {
|
||||
if let Err(e) = result {
|
||||
warn!("Server failed while shutting down: {}", e);
|
||||
return Err(Error::shutdown(rocket.clone(), e));
|
||||
}
|
||||
|
||||
if Arc::strong_count(&rocket) != 1 { grace_timer.await; }
|
||||
if Arc::strong_count(&rocket) != 1 { mercy_timer.await; }
|
||||
if Arc::strong_count(&rocket) != 1 { shutdown_timer.await; }
|
||||
match Arc::try_unwrap(rocket) {
|
||||
Ok(rocket) => {
|
||||
info!("Graceful shutdown completed successfully.");
|
||||
Ok(rocket)
|
||||
}
|
||||
Err(rocket) => {
|
||||
warn!("Server failed to shutdown cooperatively.");
|
||||
Err(Error::shutdown(rocket, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = &mut shutdown_timer => {
|
||||
warn!("Server failed to shutdown cooperatively.");
|
||||
return Err(Error::shutdown(rocket.clone(), None));
|
||||
},
|
||||
}
|
||||
}
|
||||
result = &mut server => {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
info!("Server shutdown nominally.");
|
||||
Ok(Arc::try_unwrap(rocket).map_err(|r| Error::shutdown(r, None))?)
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Server failed prior to shutdown: {}:", e);
|
||||
Err(Error::shutdown(rocket.clone(), e))
|
||||
}
|
||||
}
|
||||
}
|
||||
future::Either::Right((result, _)) => result,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,10 +210,12 @@ runtime but unlike `#[launch]`, allows _you_ to start the server:
|
|||
|
||||
#[rocket::main]
|
||||
async fn main() -> Result<(), rocket::Error> {
|
||||
rocket::build()
|
||||
let _rocket = rocket::build()
|
||||
.mount("/hello", routes![world])
|
||||
.launch()
|
||||
.await
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ values:
|
|||
|----------------|-------------------|-------------------------------------------------|-------------------------|
|
||||
| `address` | `IpAddr` | IP address to serve on | `127.0.0.1` |
|
||||
| `port` | `u16` | Port to serve on. | `8000` |
|
||||
| `workers` | `usize` | Number of threads to use for executing futures. | cpu core count |
|
||||
| `workers`* | `usize` | Number of threads to use for executing futures. | cpu core count |
|
||||
| `ident` | `string`, `false` | If and how to identify via the `Server` header. | `"Rocket"` |
|
||||
| `keep_alive` | `u32` | Keep-alive timeout seconds; disabled when `0`. | `5` |
|
||||
| `log_level` | [`LogLevel`] | Max level to log. (off/normal/debug/critical) | `normal`/`critical` |
|
||||
|
@ -31,7 +31,10 @@ values:
|
|||
| `limits` | [`Limits`] | Streaming read size limits. | [`Limits::default()`] |
|
||||
| `limits.$name` | `&str`/`uint` | Read limit for `$name`. | form = "32KiB" |
|
||||
| `ctrlc` | `bool` | Whether `ctrl-c` initiates a server shutdown. | `true` |
|
||||
| `shutdown` | [`Shutdown`] | Graceful shutdown configuration. | [`Shutdown::default()`] |
|
||||
| `shutdown`* | [`Shutdown`] | Graceful shutdown configuration. | [`Shutdown::default()`] |
|
||||
|
||||
<small>* Note: the `workers` and `shutdown.force` configuration parameters are
|
||||
only read from the [default provider](#default_provider).</small>
|
||||
|
||||
### Profiles
|
||||
|
||||
|
@ -182,6 +185,8 @@ ROCKET_TLS={certs="abc",key="foo/bar"}
|
|||
ROCKET_LIMITS={form="64 KiB"}
|
||||
```
|
||||
|
||||
## Configuration Parameters
|
||||
|
||||
### Secret Key
|
||||
|
||||
The `secret_key` parameter configures a cryptographic key to use when encrypting
|
||||
|
|
Loading…
Reference in New Issue