diff --git a/core/lib/Cargo.toml b/core/lib/Cargo.toml index 0fa349ef..1b57d6c7 100644 --- a/core/lib/Cargo.toml +++ b/core/lib/Cargo.toml @@ -55,6 +55,7 @@ tempfile = "3" async-trait = "0.1.43" async-stream = "0.3.2" multer = { version = "2", features = ["tokio-io"] } +tokio-stream = { version = "0.1.6", features = ["signal"] } [dependencies.state] git = "https://github.com/SergioBenitez/state.git" diff --git a/core/lib/src/config/shutdown.rs b/core/lib/src/config/shutdown.rs index a8b0dd5f..ad6bcbf8 100644 --- a/core/lib/src/config/shutdown.rs +++ b/core/lib/src/config/shutdown.rs @@ -1,10 +1,9 @@ use std::fmt; -use std::future::Future; #[cfg(unix)] use std::collections::HashSet; -use futures::future::{Either, pending}; +use futures::stream::Stream; use serde::{Deserialize, Serialize}; /// A Unix signal for triggering graceful shutdown. @@ -17,8 +16,6 @@ use serde::{Deserialize, Serialize}; /// A `Sig` variant serializes and deserializes as a lowercase string equal to /// the name of the variant: `"alrm"` for [`Sig::Alrm`], `"chld"` for /// [`Sig::Chld`], and so on. -#[cfg(unix)] -#[cfg_attr(nightly, doc(cfg(unix)))] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Sig { @@ -44,8 +41,6 @@ pub enum Sig { Usr2 } -#[cfg(unix)] -#[cfg_attr(nightly, doc(cfg(unix)))] impl fmt::Display for Sig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { @@ -110,6 +105,28 @@ impl fmt::Display for Sig { /// proceed nominally. Rocket waits at most `mercy` seconds for connections to /// shutdown before forcefully terminating all connections. /// +/// # Runaway I/O +/// +/// If tasks are _still_ executing after both periods _and_ a Rocket configured +/// async runtime is in use, Rocket waits an unspecified amount of time (not to +/// exceed 1s) and forcefully exits the current process with an exit code of +/// `1`. This guarantees that the server process terminates, prohibiting +/// uncooperative, runaway I/O from preventing shutdown altogether. +/// +/// A "Rocket configured runtime" is one started by the `#[rocket::main]` and +/// `#[launch]` attributes. Rocket _never_ forcefully terminates a server that +/// is running inside of a custom runtime. A server that creates its own async +/// runtime must take care to terminate itself if tasks it spawns fail to +/// cooperate. +/// +/// Under normal circumstances, forced termination should never occur. No use of +/// "normal" cooperative I/O (that is, via `.await` or `task::spawn()`) should +/// trigger abrupt termination. Instead, forced cancellation is intended to +/// prevent _buggy_ code, such as an unintended infinite loop or unknown use of +/// blocking I/O, from preventing shutdown. +/// +/// This behavior can be disabled by setting [`Shutdown::force`] to `false`. +/// /// # Example /// /// As with all Rocket configuration options, when using the default @@ -129,6 +146,7 @@ impl fmt::Display for Sig { /// signals = ["term", "hup"] /// grace = 10 /// mercy = 5 +/// # force = false /// # "#).nested(); /// /// // The config parses as follows: @@ -136,6 +154,7 @@ impl fmt::Display for Sig { /// assert_eq!(config.shutdown.ctrlc, false); /// assert_eq!(config.shutdown.grace, 10); /// assert_eq!(config.shutdown.mercy, 5); +/// # assert_eq!(config.shutdown.force, false); /// /// # #[cfg(unix)] { /// use rocket::config::Sig; @@ -168,6 +187,7 @@ impl fmt::Display for Sig { /// }, /// grace: 10, /// mercy: 5, +/// force: true, /// }, /// ..Config::default() /// }; @@ -175,6 +195,7 @@ impl fmt::Display for Sig { /// assert_eq!(config.shutdown.ctrlc, false); /// assert_eq!(config.shutdown.grace, 10); /// assert_eq!(config.shutdown.mercy, 5); +/// assert_eq!(config.shutdown.force, true); /// /// #[cfg(unix)] { /// assert_eq!(config.shutdown.signals.len(), 2); @@ -206,11 +227,21 @@ pub struct Shutdown { /// /// **default: `3`** pub mercy: u32, + /// Whether to force termination of a process that refuses to cooperatively + /// shutdown. + /// + /// Rocket _never_ forcefully terminates a server that is running inside of + /// a custom runtime irrespective of this value. A server that creates its + /// own async runtime must take care to terminate itself if it fails to + /// cooperate. + /// + /// **default: `true`** + pub force: bool, } impl fmt::Display for Shutdown { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "ctrlc = {}, ", self.ctrlc)?; + write!(f, "ctrlc = {}, force = {}, ", self.ctrlc, self.force)?; #[cfg(unix)] { write!(f, "signals = [")?; @@ -234,18 +265,19 @@ impl Default for Shutdown { signals: { let mut set = HashSet::new(); set.insert(Sig::Term); set }, grace: 2, mercy: 3, + force: true, } } } impl Shutdown { #[cfg(unix)] - pub(crate) fn collective_signal(&self) -> impl Future { - use futures::future::{FutureExt, select_all}; + pub(crate) fn signal_stream(&self) -> Option> { + use tokio_stream::{StreamExt, StreamMap, wrappers::SignalStream}; use tokio::signal::unix::{signal, SignalKind}; if !self.ctrlc && self.signals.is_empty() { - return Either::Right(pending()); + return None; } let mut signals = self.signals.clone(); @@ -253,7 +285,7 @@ impl Shutdown { signals.insert(Sig::Int); } - let mut sigfuts = vec![]; + let mut map = StreamMap::new(); for sig in signals { let sigkind = match sig { Sig::Alrm => SignalKind::alarm(), @@ -268,36 +300,26 @@ impl Shutdown { Sig::Usr2 => SignalKind::user_defined2() }; - let sigfut = match signal(sigkind) { - Ok(mut signal) => Box::pin(async move { - signal.recv().await; - warn!("Received {} signal. Requesting shutdown.", sig); - }), - Err(e) => { - warn!("Failed to enable `{}` shutdown signal.", sig); - info_!("Error: {}", e); - continue - } - }; - - sigfuts.push(sigfut); + match signal(sigkind) { + Ok(signal) => { map.insert(sig, SignalStream::new(signal)); }, + Err(e) => warn!("Failed to enable `{}` shutdown signal: {}", sig, e), + } } - Either::Left(select_all(sigfuts).map(|_| ())) + Some(map.map(|(k, _)| k)) } #[cfg(not(unix))] - pub(crate) fn collective_signal(&self) -> impl Future { - use futures::future::FutureExt; + pub(crate) fn signal_stream(&self) -> Option> { + use tokio_stream::StreamExt; + use futures::stream::once; - match self.ctrlc { - true => Either::Left(tokio::signal::ctrl_c().map(|result| { - if let Err(e) = result { - warn!("Failed to enable `ctrl-c` shutdown signal."); - info_!("Error: {}", e); - } - })), - false => Either::Right(pending()), - } + self.ctrlc.then(|| tokio::signal::ctrl_c()) + .map(|signal| once(Box::pin(signal))) + .map(|stream| stream.filter_map(|result| { + result.map(|_| Sig::Int) + .map_err(|e| warn!("Failed to enable `ctrl-c` shutdown signal: {}", e)) + .ok() + })) } } diff --git a/core/lib/src/lib.rs b/core/lib/src/lib.rs index a0cb46dd..04322086 100644 --- a/core/lib/src/lib.rs +++ b/core/lib/src/lib.rs @@ -208,7 +208,8 @@ pub use async_trait::async_trait; #[doc(hidden)] pub fn async_test(fut: impl std::future::Future) -> R { tokio::runtime::Builder::new_multi_thread() - .thread_name("rocket-test-worker-thread") + // NOTE: graceful shutdown depends on the "rocket-worker" prefix. + .thread_name("rocket-worker-test-thread") .worker_threads(1) .enable_all() .build() @@ -224,6 +225,7 @@ pub fn async_main(fut: impl std::future::Future + Send) -> R { // See tokio-rs/tokio#3329 for a necessary solution in `tokio`. tokio::runtime::Builder::new_multi_thread() .worker_threads(Config::from(Config::figment()).workers) + // NOTE: graceful shutdown depends on the "rocket-worker" prefix. .thread_name("rocket-worker-thread") .enable_all() .build() diff --git a/core/lib/src/server.rs b/core/lib/src/server.rs index 3dd6820c..ebed03ac 100644 --- a/core/lib/src/server.rs +++ b/core/lib/src/server.rs @@ -1,10 +1,11 @@ use std::io; use std::sync::Arc; +use std::time::Duration; +use yansi::Paint; +use tokio::sync::oneshot; use futures::stream::StreamExt; use futures::future::{self, FutureExt, Future, TryFutureExt, BoxFuture}; -use tokio::sync::oneshot; -use yansi::Paint; use crate::{Rocket, Orbit, Request, Response, Data, route}; use crate::form::Form; @@ -398,7 +399,7 @@ impl Rocket { let http1_keepalive = self.config.keep_alive != 0; let http2_keep_alive = match self.config.keep_alive { 0 => None, - n => Some(std::time::Duration::from_secs(n as u64)) + n => Some(Duration::from_secs(n as u64)) }; // Set up cancellable I/O from the given listener. Shutdown occurs when @@ -406,7 +407,8 @@ impl Rocket { // notification or indirectly through an external signal which, when // received, results in triggering the notify. let shutdown = self.shutdown(); - let external_shutdown = self.config.shutdown.collective_signal(); + let sig_stream = self.config.shutdown.signal_stream(); + let force_shutdown = self.config.shutdown.force; let grace = self.config.shutdown.grace as u64; let mercy = self.config.shutdown.mercy as u64; @@ -430,15 +432,59 @@ impl Rocket { .with_graceful_shutdown(shutdown.clone()) .map_err(|e| Error::new(ErrorKind::Runtime(Box::new(e)))); - tokio::pin!(server, external_shutdown); - let selecter = future::select(external_shutdown, server); - match selecter.await { + // Start a task that listens for external signals and notifies shutdown. + if let Some(mut stream) = sig_stream { + let shutdown = shutdown.clone(); + tokio::spawn(async move { + while let Some(sig) = stream.next().await { + if shutdown.0.tripped() { + warn!("Received {}. Shutdown already in progress.", sig); + } else { + warn!("Received {}. Requesting shutdown.", sig); + } + + shutdown.0.trip(); + } + }); + } + + // Wait for a shutdown notification or for the server to somehow fail. + tokio::pin!(server); + match future::select(shutdown, server).await { future::Either::Left((_, server)) => { - // External signal received. Request shutdown, wait for server. - shutdown.notify(); + // If a task has some runaway I/O, like an infinite loop, the + // runtime will block indefinitely when it is dropped. To + // subvert, we start a ticking process-exit time bomb here. + if force_shutdown { + use std::thread; + + // Only a worker thread will have the specified thread name. + tokio::task::spawn_blocking(move || { + let this = thread::current(); + let is_rocket_runtime = this.name() + .map_or(false, |s| s.starts_with("rocket-worker")); + + // We only hit our `exit()` if the process doesn't + // otherwise exit since this `spawn()` won't block. + thread::spawn(move || { + thread::sleep(Duration::from_secs(grace + mercy)); + thread::sleep(Duration::from_millis(500)); + if is_rocket_runtime { + error!("Server failed to shutdown cooperatively. Terminating."); + std::process::exit(1); + } else { + warn!("Server failed to shutdown cooperatively."); + warn_!("Server is executing inside of a custom runtime."); + info_!("Rocket's runtime is `#[rocket::main]` or `#[launch]`."); + warn_!("Refusing to terminate runaway custom runtime."); + } + }); + }); + } + + info!("Received shutdown request. Waiting for pending I/O..."); server.await } - // Internal shutdown or server error. Return the result. future::Either::Right((result, _)) => result, } } diff --git a/core/lib/src/shutdown.rs b/core/lib/src/shutdown.rs index b16dc022..490114f5 100644 --- a/core/lib/src/shutdown.rs +++ b/core/lib/src/shutdown.rs @@ -88,7 +88,6 @@ impl Shutdown { #[inline] pub fn notify(self) { self.0.trip(); - info!("Shutdown requested. Waiting for pending I/O to finish..."); } } diff --git a/core/lib/src/trip_wire.rs b/core/lib/src/trip_wire.rs index 74329921..c4d649bf 100644 --- a/core/lib/src/trip_wire.rs +++ b/core/lib/src/trip_wire.rs @@ -101,6 +101,11 @@ impl TripWire { self.notify.notify_waiters(); self.notify.notify_one(); } + + #[inline(always)] + pub fn tripped(&self) -> bool { + self.tripped.load(Ordering::Acquire) + } } #[cfg(test)]