Introduce 'mercy' connection shutdown period.

This improves graceful shutdown by allowing connection-level I/O to
shutdown gracefully within a 'mercy' period.
This commit is contained in:
Sergio Benitez 2021-04-29 19:19:07 -07:00
parent fe23eaebd1
commit 3a7559edce
3 changed files with 156 additions and 52 deletions

View File

@ -67,29 +67,49 @@ impl fmt::Display for Sig {
/// Graceful shutdown configuration.
///
/// # Summary
///
/// This structure configures when and how graceful shutdown occurs. The `ctrlc`
/// and `signals` properties control _when_ and the `grace` property controls
/// _how_.
/// and `signals` properties control _when_ and the `grace` and `mercy`
/// properties control _how_.
///
/// When a shutdown is triggered by an externally or internally initiated
/// [`Shutdown::notify()`], Rocket allows application I/O to make progress for
/// at most `grace` seconds before initiating connection-level shutdown.
/// Connection shutdown forcibly terminates _application_ I/O, but connections
/// are allowed an additional `mercy` seconds to shutdown before being
/// forcefully terminated. This implies that a _cooperating_ and active remote
/// client maintaining an open connection can stall shutdown for at most `grace`
/// seconds, while an _uncooperative_ remote client can stall shutdown for at
/// most `grace + mercy` seconds.
///
/// # Triggers
///
/// _All_ graceful shutdowns are initiated via
/// [`Shutdown::notify()`](crate::Shutdown::notify()). Rocket can be configured
/// to trigger shutdown automatically on certain conditions, specified via the
/// `ctrlc` and `signals` properties of this structure. More specifically, if
/// `ctrlc` is `true` (the default), `ctrl-c` (`SIGINT`) initiates a server
/// shutdown, and on Unix, `signals` specifies a list of IPC signals that
/// trigger a shutdown (`["term"]` by default).
/// _All_ graceful shutdowns are initiated via [`Shutdown::notify()`]. Rocket
/// can be configured to call [`Shutdown::notify()`] automatically on certain
/// conditions, specified via the `ctrlc` and `signals` properties of this
/// structure. More specifically, if `ctrlc` is `true` (the default), `ctrl-c`
/// (`SIGINT`) initiates a server shutdown, and on Unix, `signals` specifies a
/// list of IPC signals that trigger a shutdown (`["term"]` by default).
///
/// [`Shutdown::notify()`]: crate::Shutdown::notify()
///
/// # Grace Period
///
/// Once a shutdown is triggered, Rocket stops accepting new connections and
/// waits at most `grace` seconds before force-closing all outstanding I/O.
/// waits at most `grace` seconds before initiating connection shutdown.
/// Applications can `await` the [`Shutdown`](crate::Shutdown) future to detect
/// a shutdown and cancel any server-initiated I/O, such, as from [infinite
/// a shutdown and cancel any server-initiated I/O, such as from [infinite
/// responders](crate::response::stream#graceful-shutdown), to avoid abrupt I/O
/// cancellation.
///
/// # Mercy Period
///
/// After the grace period has elapsed, Rocket initiates connection shutdown,
/// allowing connection-level I/O termination such as TLS's `close_notify` to
/// proceed nominally. Rocket waits at most `mercy` seconds for connections to
/// shutdown before forcefully terminating all connections.
///
/// # Example
///
/// As with all Rocket configuration options, when using the default
@ -108,12 +128,14 @@ impl fmt::Display for Sig {
/// ctrlc = false
/// signals = ["term", "hup"]
/// grace = 10
/// mercy = 5
/// # "#).nested();
///
/// // The config parses as follows:
/// # let config = Config::from(Figment::from(Config::debug_default()).merge(toml));
/// assert_eq!(config.shutdown.ctrlc, false);
/// assert_eq!(config.shutdown.grace, 10);
/// assert_eq!(config.shutdown.mercy, 5);
///
/// # #[cfg(unix)] {
/// use rocket::config::Sig;
@ -144,13 +166,15 @@ impl fmt::Display for Sig {
/// set.insert(Sig::Hup);
/// set
/// },
/// grace: 10
/// grace: 10,
/// mercy: 5,
/// },
/// ..Config::default()
/// };
///
/// assert_eq!(config.shutdown.ctrlc, false);
/// assert_eq!(config.shutdown.grace, 10);
/// assert_eq!(config.shutdown.mercy, 5);
///
/// #[cfg(unix)] {
/// assert_eq!(config.shutdown.signals.len(), 2);
@ -172,11 +196,16 @@ pub struct Shutdown {
#[cfg(unix)]
#[cfg_attr(nightly, doc(cfg(unix)))]
pub signals: HashSet<Sig>,
/// The shutdown grace period: number of seconds to continue to try to
/// finish outstanding I/O for before forcibly terminating it.
/// The grace period: number of seconds to continue to try to finish
/// outstanding _server_ I/O for before forcibly terminating it.
///
/// **default: `5`**
/// **default: `2`**
pub grace: u32,
/// The mercy period: number of seconds to continue to try to finish
/// outstanding _connection_ I/O for before forcibly terminating it.
///
/// **default: `3`**
pub mercy: u32,
}
impl fmt::Display for Shutdown {
@ -192,7 +221,7 @@ impl fmt::Display for Shutdown {
write!(f, "], ")?;
}
write!(f, "grace = {}s", self.grace)?;
write!(f, "grace = {}s, mercy = {}s", self.grace, self.mercy)?;
Ok(())
}
}
@ -203,7 +232,8 @@ impl Default for Shutdown {
ctrlc: true,
#[cfg(unix)]
signals: { let mut set = HashSet::new(); set.insert(Sig::Term); set },
grace: 5,
grace: 2,
mercy: 3,
}
}
}

View File

@ -129,6 +129,20 @@ impl<T: AsyncRead, U: AsyncRead> AsyncRead for Chain<T, U> {
}
}
enum State {
/// I/O has not been cancelled. Proceed as normal.
Active,
/// I/O has been cancelled. See if we can finish before the timer expires.
Grace(Pin<Box<Sleep>>),
/// Grace period elapsed. Shutdown the connection, waiting for the timer
/// until we force close.
Mercy(Pin<Box<Sleep>>),
/// We failed to shutdown and are force-closing the connection.
Terminated,
/// We successfuly shutdown the connection.
Inactive,
}
pin_project! {
/// I/O that can be cancelled when a future `F` resolves.
#[must_use = "futures do nothing unless polled"]
@ -137,48 +151,109 @@ pin_project! {
io: I,
#[pin]
trigger: Fuse<F>,
sleep: Option<Pin<Box<Sleep>>>,
state: State,
grace: Duration,
mercy: Duration,
}
}
impl<F: Future, I> CancellableIo<F, I> {
pub fn new(trigger: F, io: I, grace: Duration) -> Self {
impl<F: Future, I: AsyncWrite> CancellableIo<F, I> {
pub fn new(trigger: F, io: I, grace: Duration, mercy: Duration) -> Self {
CancellableIo {
io, grace, mercy,
trigger: trigger.fuse(),
sleep: None,
io, grace,
state: State::Active
}
}
fn poll_trigger(
/// Returns `Ok(true)` if connection processing should continue.
fn poll_trigger_then<T>(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> io::Result<()> {
let me = self.project();
cx: &mut Context<'_>,
io: impl FnOnce(Pin<&mut I>, &mut Context<'_>) -> Poll<io::Result<T>>,
) -> Poll<io::Result<T>> {
let mut me = self.project();
if me.trigger.poll(cx).is_ready() {
*me.sleep = Some(Box::pin(sleep(*me.grace)));
}
// CORRECTNESS: _EVERY_ branch must reset `state`! If `state` is
// unchanged in a branch, that branch _must_ `break`! No `return`!
let mut state = std::mem::replace(me.state, State::Active);
let result = loop {
match state {
State::Active => {
if me.trigger.as_mut().poll(cx).is_ready() {
state = State::Grace(Box::pin(sleep(*me.grace)));
} else {
state = State::Active;
break io(me.io, cx);
}
}
State::Grace(mut sleep) => {
if sleep.as_mut().poll(cx).is_ready() {
if let Some(deadline) = sleep.deadline().checked_add(*me.mercy) {
sleep.as_mut().reset(deadline);
state = State::Mercy(sleep);
} else {
state = State::Terminated;
}
} else {
state = State::Grace(sleep);
break io(me.io, cx);
}
},
State::Mercy(mut sleep) => {
if sleep.as_mut().poll(cx).is_ready() {
state = State::Terminated;
continue;
}
if let Some(sleep) = me.sleep {
if sleep.as_mut().poll(cx).is_ready() {
return Err(io::Error::new(io::ErrorKind::TimedOut, "..."));
match me.io.as_mut().poll_shutdown(cx) {
Poll::Ready(Err(e)) => {
state = State::Terminated;
break Poll::Ready(Err(e));
}
Poll::Ready(Ok(())) => {
state = State::Inactive;
break Poll::Ready(Err(gone()));
}
Poll::Pending => {
state = State::Mercy(sleep);
break Poll::Pending;
}
}
},
State::Terminated => {
// Just in case, as a last ditch effort. Ignore pending.
state = State::Terminated;
let _ = me.io.as_mut().poll_shutdown(cx);
break Poll::Ready(Err(time_out()));
},
State::Inactive => {
state = State::Inactive;
break Poll::Ready(Err(gone()));
}
}
}
};
Ok(())
*me.state = state;
result
}
}
impl<F: Future, I: AsyncRead> AsyncRead for CancellableIo<F, I> {
fn time_out() -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, "Shutdown grace timed out")
}
fn gone() -> io::Error {
io::Error::new(io::ErrorKind::BrokenPipe, "IO driver has terminated")
}
impl<F: Future, I: AsyncRead + AsyncWrite> AsyncRead for CancellableIo<F, I> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger(cx)?;
self.as_mut().project().io.poll_read(cx, buf)
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_read(cx, buf))
}
}
@ -187,34 +262,30 @@ impl<F: Future, I: AsyncWrite> AsyncWrite for CancellableIo<F, I> {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.as_mut().poll_trigger(cx)?;
self.as_mut().project().io.poll_write(cx, buf)
) -> Poll<io::Result<usize>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_write(cx, buf))
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger(cx)?;
self.as_mut().project().io.poll_flush(cx)
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_flush(cx))
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), io::Error>> {
self.as_mut().poll_trigger(cx)?;
self.as_mut().project().io.poll_shutdown(cx)
) -> Poll<io::Result<()>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_shutdown(cx))
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
self.as_mut().poll_trigger(cx)?;
self.as_mut().project().io.poll_write_vectored(cx, bufs)
) -> Poll<io::Result<usize>> {
self.as_mut().poll_trigger_then(cx, |io, cx| io.poll_write_vectored(cx, bufs))
}
fn is_write_vectored(&self) -> bool {
@ -236,12 +307,14 @@ pin_project! {
#[pin]
pub listener: L,
pub grace: Duration,
pub mercy: Duration,
}
}
impl<F, L> CancellableListener<F, L> {
pub fn new(trigger: F, listener: L, grace: u64) -> Self {
CancellableListener { trigger, listener, grace: Duration::from_secs(grace) }
pub fn new(trigger: F, listener: L, grace: u64, mercy: u64) -> Self {
let (grace, mercy) = (Duration::from_secs(grace), Duration::from_secs(mercy));
CancellableListener { trigger, listener, grace, mercy }
}
}
@ -259,7 +332,7 @@ impl<L: Listener, F: Future + Clone> Listener for CancellableListener<F, L> {
self.as_mut().project().listener
.poll_accept(cx)
.map(|res| res.map(|conn| {
CancellableIo::new(self.trigger.clone(), conn, self.grace)
CancellableIo::new(self.trigger.clone(), conn, self.grace, self.mercy)
}))
}
}

View File

@ -408,6 +408,7 @@ impl Rocket<Orbit> {
let shutdown = self.shutdown();
let external_shutdown = self.config.shutdown.collective_signal();
let grace = self.config.shutdown.grace as u64;
let mercy = self.config.shutdown.mercy as u64;
let rocket = Arc::new(self);
let service_fn = move |conn: &CancellableIo<_, L::Connection>| {
@ -421,7 +422,7 @@ impl Rocket<Orbit> {
};
// NOTE: `hyper` uses `tokio::spawn()` as the default executor.
let listener = CancellableListener::new(shutdown.clone(), listener, grace);
let listener = CancellableListener::new(shutdown.clone(), listener, grace, mercy);
let server = hyper::Server::builder(Incoming::new(listener))
.http1_keepalive(http1_keepalive)
.http2_keep_alive_interval(http2_keep_alive)