Protect graceful shutdown against runaway I/O.

This commit is contained in:
Sergio Benitez 2021-05-31 23:47:52 -07:00
parent 735bd99549
commit 3a3d0ce518
6 changed files with 123 additions and 48 deletions

View File

@ -55,6 +55,7 @@ tempfile = "3"
async-trait = "0.1.43" async-trait = "0.1.43"
async-stream = "0.3.2" async-stream = "0.3.2"
multer = { version = "2", features = ["tokio-io"] } multer = { version = "2", features = ["tokio-io"] }
tokio-stream = { version = "0.1.6", features = ["signal"] }
[dependencies.state] [dependencies.state]
git = "https://github.com/SergioBenitez/state.git" git = "https://github.com/SergioBenitez/state.git"

View File

@ -1,10 +1,9 @@
use std::fmt; use std::fmt;
use std::future::Future;
#[cfg(unix)] #[cfg(unix)]
use std::collections::HashSet; use std::collections::HashSet;
use futures::future::{Either, pending}; use futures::stream::Stream;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// A Unix signal for triggering graceful shutdown. /// A Unix signal for triggering graceful shutdown.
@ -17,8 +16,6 @@ use serde::{Deserialize, Serialize};
/// A `Sig` variant serializes and deserializes as a lowercase string equal to /// A `Sig` variant serializes and deserializes as a lowercase string equal to
/// the name of the variant: `"alrm"` for [`Sig::Alrm`], `"chld"` for /// the name of the variant: `"alrm"` for [`Sig::Alrm`], `"chld"` for
/// [`Sig::Chld`], and so on. /// [`Sig::Chld`], and so on.
#[cfg(unix)]
#[cfg_attr(nightly, doc(cfg(unix)))]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Sig { pub enum Sig {
@ -44,8 +41,6 @@ pub enum Sig {
Usr2 Usr2
} }
#[cfg(unix)]
#[cfg_attr(nightly, doc(cfg(unix)))]
impl fmt::Display for Sig { impl fmt::Display for Sig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self { let s = match self {
@ -110,6 +105,28 @@ impl fmt::Display for Sig {
/// proceed nominally. Rocket waits at most `mercy` seconds for connections to /// proceed nominally. Rocket waits at most `mercy` seconds for connections to
/// shutdown before forcefully terminating all connections. /// shutdown before forcefully terminating all connections.
/// ///
/// # Runaway I/O
///
/// If tasks are _still_ executing after both periods _and_ a Rocket configured
/// async runtime is in use, Rocket waits an unspecified amount of time (not to
/// exceed 1s) and forcefully exits the current process with an exit code of
/// `1`. This guarantees that the server process terminates, prohibiting
/// uncooperative, runaway I/O from preventing shutdown altogether.
///
/// A "Rocket configured runtime" is one started by the `#[rocket::main]` and
/// `#[launch]` attributes. Rocket _never_ forcefully terminates a server that
/// is running inside of a custom runtime. A server that creates its own async
/// runtime must take care to terminate itself if tasks it spawns fail to
/// cooperate.
///
/// Under normal circumstances, forced termination should never occur. No use of
/// "normal" cooperative I/O (that is, via `.await` or `task::spawn()`) should
/// trigger abrupt termination. Instead, forced cancellation is intended to
/// prevent _buggy_ code, such as an unintended infinite loop or unknown use of
/// blocking I/O, from preventing shutdown.
///
/// This behavior can be disabled by setting [`Shutdown::force`] to `false`.
///
/// # Example /// # Example
/// ///
/// As with all Rocket configuration options, when using the default /// As with all Rocket configuration options, when using the default
@ -129,6 +146,7 @@ impl fmt::Display for Sig {
/// signals = ["term", "hup"] /// signals = ["term", "hup"]
/// grace = 10 /// grace = 10
/// mercy = 5 /// mercy = 5
/// # force = false
/// # "#).nested(); /// # "#).nested();
/// ///
/// // The config parses as follows: /// // The config parses as follows:
@ -136,6 +154,7 @@ impl fmt::Display for Sig {
/// assert_eq!(config.shutdown.ctrlc, false); /// assert_eq!(config.shutdown.ctrlc, false);
/// assert_eq!(config.shutdown.grace, 10); /// assert_eq!(config.shutdown.grace, 10);
/// assert_eq!(config.shutdown.mercy, 5); /// assert_eq!(config.shutdown.mercy, 5);
/// # assert_eq!(config.shutdown.force, false);
/// ///
/// # #[cfg(unix)] { /// # #[cfg(unix)] {
/// use rocket::config::Sig; /// use rocket::config::Sig;
@ -168,6 +187,7 @@ impl fmt::Display for Sig {
/// }, /// },
/// grace: 10, /// grace: 10,
/// mercy: 5, /// mercy: 5,
/// force: true,
/// }, /// },
/// ..Config::default() /// ..Config::default()
/// }; /// };
@ -175,6 +195,7 @@ impl fmt::Display for Sig {
/// assert_eq!(config.shutdown.ctrlc, false); /// assert_eq!(config.shutdown.ctrlc, false);
/// assert_eq!(config.shutdown.grace, 10); /// assert_eq!(config.shutdown.grace, 10);
/// assert_eq!(config.shutdown.mercy, 5); /// assert_eq!(config.shutdown.mercy, 5);
/// assert_eq!(config.shutdown.force, true);
/// ///
/// #[cfg(unix)] { /// #[cfg(unix)] {
/// assert_eq!(config.shutdown.signals.len(), 2); /// assert_eq!(config.shutdown.signals.len(), 2);
@ -206,11 +227,21 @@ pub struct Shutdown {
/// ///
/// **default: `3`** /// **default: `3`**
pub mercy: u32, pub mercy: u32,
/// Whether to force termination of a process that refuses to cooperatively
/// shutdown.
///
/// Rocket _never_ forcefully terminates a server that is running inside of
/// a custom runtime irrespective of this value. A server that creates its
/// own async runtime must take care to terminate itself if it fails to
/// cooperate.
///
/// **default: `true`**
pub force: bool,
} }
impl fmt::Display for Shutdown { impl fmt::Display for Shutdown {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ctrlc = {}, ", self.ctrlc)?; write!(f, "ctrlc = {}, force = {}, ", self.ctrlc, self.force)?;
#[cfg(unix)] { #[cfg(unix)] {
write!(f, "signals = [")?; write!(f, "signals = [")?;
@ -234,18 +265,19 @@ impl Default for Shutdown {
signals: { let mut set = HashSet::new(); set.insert(Sig::Term); set }, signals: { let mut set = HashSet::new(); set.insert(Sig::Term); set },
grace: 2, grace: 2,
mercy: 3, mercy: 3,
force: true,
} }
} }
} }
impl Shutdown { impl Shutdown {
#[cfg(unix)] #[cfg(unix)]
pub(crate) fn collective_signal(&self) -> impl Future<Output = ()> { pub(crate) fn signal_stream(&self) -> Option<impl Stream<Item = Sig>> {
use futures::future::{FutureExt, select_all}; use tokio_stream::{StreamExt, StreamMap, wrappers::SignalStream};
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
if !self.ctrlc && self.signals.is_empty() { if !self.ctrlc && self.signals.is_empty() {
return Either::Right(pending()); return None;
} }
let mut signals = self.signals.clone(); let mut signals = self.signals.clone();
@ -253,7 +285,7 @@ impl Shutdown {
signals.insert(Sig::Int); signals.insert(Sig::Int);
} }
let mut sigfuts = vec![]; let mut map = StreamMap::new();
for sig in signals { for sig in signals {
let sigkind = match sig { let sigkind = match sig {
Sig::Alrm => SignalKind::alarm(), Sig::Alrm => SignalKind::alarm(),
@ -268,36 +300,26 @@ impl Shutdown {
Sig::Usr2 => SignalKind::user_defined2() Sig::Usr2 => SignalKind::user_defined2()
}; };
let sigfut = match signal(sigkind) { match signal(sigkind) {
Ok(mut signal) => Box::pin(async move { Ok(signal) => { map.insert(sig, SignalStream::new(signal)); },
signal.recv().await; Err(e) => warn!("Failed to enable `{}` shutdown signal: {}", sig, e),
warn!("Received {} signal. Requesting shutdown.", sig); }
}),
Err(e) => {
warn!("Failed to enable `{}` shutdown signal.", sig);
info_!("Error: {}", e);
continue
}
};
sigfuts.push(sigfut);
} }
Either::Left(select_all(sigfuts).map(|_| ())) Some(map.map(|(k, _)| k))
} }
#[cfg(not(unix))] #[cfg(not(unix))]
pub(crate) fn collective_signal(&self) -> impl Future<Output = ()> { pub(crate) fn signal_stream(&self) -> Option<impl Stream<Item = Sig>> {
use futures::future::FutureExt; use tokio_stream::StreamExt;
use futures::stream::once;
match self.ctrlc { self.ctrlc.then(|| tokio::signal::ctrl_c())
true => Either::Left(tokio::signal::ctrl_c().map(|result| { .map(|signal| once(Box::pin(signal)))
if let Err(e) = result { .map(|stream| stream.filter_map(|result| {
warn!("Failed to enable `ctrl-c` shutdown signal."); result.map(|_| Sig::Int)
info_!("Error: {}", e); .map_err(|e| warn!("Failed to enable `ctrl-c` shutdown signal: {}", e))
} .ok()
})), }))
false => Either::Right(pending()),
}
} }
} }

View File

@ -208,7 +208,8 @@ pub use async_trait::async_trait;
#[doc(hidden)] #[doc(hidden)]
pub fn async_test<R>(fut: impl std::future::Future<Output = R>) -> R { pub fn async_test<R>(fut: impl std::future::Future<Output = R>) -> R {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.thread_name("rocket-test-worker-thread") // NOTE: graceful shutdown depends on the "rocket-worker" prefix.
.thread_name("rocket-worker-test-thread")
.worker_threads(1) .worker_threads(1)
.enable_all() .enable_all()
.build() .build()
@ -224,6 +225,7 @@ pub fn async_main<R>(fut: impl std::future::Future<Output = R> + Send) -> R {
// See tokio-rs/tokio#3329 for a necessary solution in `tokio`. // See tokio-rs/tokio#3329 for a necessary solution in `tokio`.
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(Config::from(Config::figment()).workers) .worker_threads(Config::from(Config::figment()).workers)
// NOTE: graceful shutdown depends on the "rocket-worker" prefix.
.thread_name("rocket-worker-thread") .thread_name("rocket-worker-thread")
.enable_all() .enable_all()
.build() .build()

View File

@ -1,10 +1,11 @@
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use yansi::Paint;
use tokio::sync::oneshot;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures::future::{self, FutureExt, Future, TryFutureExt, BoxFuture}; use futures::future::{self, FutureExt, Future, TryFutureExt, BoxFuture};
use tokio::sync::oneshot;
use yansi::Paint;
use crate::{Rocket, Orbit, Request, Response, Data, route}; use crate::{Rocket, Orbit, Request, Response, Data, route};
use crate::form::Form; use crate::form::Form;
@ -398,7 +399,7 @@ impl Rocket<Orbit> {
let http1_keepalive = self.config.keep_alive != 0; let http1_keepalive = self.config.keep_alive != 0;
let http2_keep_alive = match self.config.keep_alive { let http2_keep_alive = match self.config.keep_alive {
0 => None, 0 => None,
n => Some(std::time::Duration::from_secs(n as u64)) n => Some(Duration::from_secs(n as u64))
}; };
// Set up cancellable I/O from the given listener. Shutdown occurs when // Set up cancellable I/O from the given listener. Shutdown occurs when
@ -406,7 +407,8 @@ impl Rocket<Orbit> {
// notification or indirectly through an external signal which, when // notification or indirectly through an external signal which, when
// received, results in triggering the notify. // received, results in triggering the notify.
let shutdown = self.shutdown(); let shutdown = self.shutdown();
let external_shutdown = self.config.shutdown.collective_signal(); let sig_stream = self.config.shutdown.signal_stream();
let force_shutdown = self.config.shutdown.force;
let grace = self.config.shutdown.grace as u64; let grace = self.config.shutdown.grace as u64;
let mercy = self.config.shutdown.mercy as u64; let mercy = self.config.shutdown.mercy as u64;
@ -430,15 +432,59 @@ impl Rocket<Orbit> {
.with_graceful_shutdown(shutdown.clone()) .with_graceful_shutdown(shutdown.clone())
.map_err(|e| Error::new(ErrorKind::Runtime(Box::new(e)))); .map_err(|e| Error::new(ErrorKind::Runtime(Box::new(e))));
tokio::pin!(server, external_shutdown); // Start a task that listens for external signals and notifies shutdown.
let selecter = future::select(external_shutdown, server); if let Some(mut stream) = sig_stream {
match selecter.await { let shutdown = shutdown.clone();
tokio::spawn(async move {
while let Some(sig) = stream.next().await {
if shutdown.0.tripped() {
warn!("Received {}. Shutdown already in progress.", sig);
} else {
warn!("Received {}. Requesting shutdown.", sig);
}
shutdown.0.trip();
}
});
}
// Wait for a shutdown notification or for the server to somehow fail.
tokio::pin!(server);
match future::select(shutdown, server).await {
future::Either::Left((_, server)) => { future::Either::Left((_, server)) => {
// External signal received. Request shutdown, wait for server. // If a task has some runaway I/O, like an infinite loop, the
shutdown.notify(); // runtime will block indefinitely when it is dropped. To
// subvert, we start a ticking process-exit time bomb here.
if force_shutdown {
use std::thread;
// Only a worker thread will have the specified thread name.
tokio::task::spawn_blocking(move || {
let this = thread::current();
let is_rocket_runtime = this.name()
.map_or(false, |s| s.starts_with("rocket-worker"));
// We only hit our `exit()` if the process doesn't
// otherwise exit since this `spawn()` won't block.
thread::spawn(move || {
thread::sleep(Duration::from_secs(grace + mercy));
thread::sleep(Duration::from_millis(500));
if is_rocket_runtime {
error!("Server failed to shutdown cooperatively. Terminating.");
std::process::exit(1);
} else {
warn!("Server failed to shutdown cooperatively.");
warn_!("Server is executing inside of a custom runtime.");
info_!("Rocket's runtime is `#[rocket::main]` or `#[launch]`.");
warn_!("Refusing to terminate runaway custom runtime.");
}
});
});
}
info!("Received shutdown request. Waiting for pending I/O...");
server.await server.await
} }
// Internal shutdown or server error. Return the result.
future::Either::Right((result, _)) => result, future::Either::Right((result, _)) => result,
} }
} }

View File

@ -88,7 +88,6 @@ impl Shutdown {
#[inline] #[inline]
pub fn notify(self) { pub fn notify(self) {
self.0.trip(); self.0.trip();
info!("Shutdown requested. Waiting for pending I/O to finish...");
} }
} }

View File

@ -101,6 +101,11 @@ impl TripWire {
self.notify.notify_waiters(); self.notify.notify_waiters();
self.notify.notify_one(); self.notify.notify_one();
} }
#[inline(always)]
pub fn tripped(&self) -> bool {
self.tripped.load(Ordering::Acquire)
}
} }
#[cfg(test)] #[cfg(test)]