From a933e7234d95868fd2bee84069a8c085a589bd20 Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Wed, 13 Jul 2022 14:16:14 -0700 Subject: [PATCH] Gracefully shutdown database pools in 'db_pools'. --- contrib/db_pools/lib/src/database.rs | 10 ++++++++-- contrib/db_pools/lib/src/lib.rs | 14 ++++++++++++-- contrib/db_pools/lib/src/pool.rs | 29 ++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/contrib/db_pools/lib/src/database.rs b/contrib/db_pools/lib/src/database.rs index 8b67706f..61b10ced 100644 --- a/contrib/db_pools/lib/src/database.rs +++ b/contrib/db_pools/lib/src/database.rs @@ -1,7 +1,7 @@ use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; -use rocket::{error, info_, Build, Ignite, Phase, Rocket, Sentinel}; +use rocket::{error, info_, Build, Ignite, Phase, Rocket, Sentinel, Orbit}; use rocket::fairing::{self, Fairing, Info, Kind}; use rocket::request::{FromRequest, Outcome, Request}; use rocket::http::Status; @@ -250,7 +250,7 @@ impl Fairing for Initializer { fn info(&self) -> Info { Info { name: self.0.unwrap_or(std::any::type_name::()), - kind: Kind::Ignite, + kind: Kind::Ignite | Kind::Shutdown, } } @@ -272,6 +272,12 @@ impl Fairing for Initializer { } } } + + async fn on_shutdown(&self, rocket: &Rocket) { + if let Some(db) = D::fetch(rocket) { + db.close().await; + } + } } #[rocket::async_trait] diff --git a/contrib/db_pools/lib/src/lib.rs b/contrib/db_pools/lib/src/lib.rs index afd81ae5..5c68df94 100644 --- a/contrib/db_pools/lib/src/lib.rs +++ b/contrib/db_pools/lib/src/lib.rs @@ -98,8 +98,10 @@ //! //! # Supported Drivers //! -//! At present, this crate supports _three_ drivers: [`deadpool`], [`sqlx`], -//! and [`mongodb`]. Each driver may support multiple databases. +//! At present, this crate supports _three_ drivers: [`deadpool`], [`sqlx`], and +//! [`mongodb`]. Each driver may support multiple databases. Drivers have a +//! varying degree of support for graceful shutdown, affected by the +//! `Type::init()` fairing on Rocket shutdown. //! //! ## `deadpool` (v0.9) //! @@ -108,6 +110,9 @@ //! | Postgres | `deadpool_postgres` | [`deadpool_postgres::Pool`] | [`deadpool_postgres::ClientWrapper`] | //! | Redis | `deadpool_redis` | [`deadpool_redis::Pool`] | [`deadpool_redis::Connection`] | //! +//! On shutdown, new connections are denied. Shutdown _does not_ wait for +//! connections to be returned. +//! //! ## `sqlx` (v0.5) //! //! | Database | Feature | [`Pool`] Type | [`Connection`] Deref | @@ -126,12 +131,17 @@ //! [`sqlx::PoolConnection`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html //! [`sqlx::PoolConnection`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html //! +//! On shutdown, new connections are denied. Shutdown waits for connections to +//! be returned. +//! //! ## `mongodb` (v2) //! //! | Database | Feature | [`Pool`] Type and [`Connection`] Deref | //! |----------|-----------|----------------------------------------| //! | MongoDB | `mongodb` | [`mongodb::Client`] | //! +//! Graceful shutdown is not supported. +//! //! ## Enabling Additional Driver Features //! //! Only the minimal features for each driver crate are enabled by diff --git a/contrib/db_pools/lib/src/pool.rs b/contrib/db_pools/lib/src/pool.rs index 6acae001..60270fe5 100644 --- a/contrib/db_pools/lib/src/pool.rs +++ b/contrib/db_pools/lib/src/pool.rs @@ -39,6 +39,10 @@ use {std::time::Duration, crate::{Error, Config}}; /// async fn get(&self) -> Result { /// todo!("fetch one connection from the pool"); /// } +/// +/// async fn close(&self) { +/// todo!("gracefully shutdown connection pool"); +/// } /// } /// ``` /// @@ -76,6 +80,8 @@ use {std::time::Duration, crate::{Error, Config}}; /// # fn acquire(&self) -> Result { /// # Ok(()) /// # } +/// # +/// # async fn shutdown(&self) { } /// # } /// /// #[rocket::async_trait] @@ -101,6 +107,10 @@ use {std::time::Duration, crate::{Error, Config}}; /// // Map errors of type `GetError` to `Error<_, GetError>`. /// self.acquire().map_err(Error::Get) /// } +/// +/// async fn close(&self) { +/// self.shutdown().await; +/// } /// } /// ``` #[rocket::async_trait] @@ -133,6 +143,13 @@ pub trait Pool: Sized + Send + Sync + 'static { /// such as a preconfigured timeout elapsing or when the database server is /// unavailable. async fn get(&self) -> Result; + + /// Shutdown the connection pool, disallowing any new connections from being + /// retrieved and waking up any tasks with active connections. + /// + /// The returned future may either resolve when all connections are known to + /// have closed or at any point prior. Details are implementation specific. + async fn close(&self); } #[cfg(feature = "deadpool")] @@ -183,6 +200,10 @@ mod deadpool_postgres { async fn get(&self) -> Result { self.get().await.map_err(Error::Get) } + + async fn close(&self) { + >::close(self) + } } } @@ -236,6 +257,10 @@ mod sqlx { async fn get(&self) -> Result { self.acquire().await.map_err(Error::Get) } + + async fn close(&self) { + >::close(self).await; + } } } @@ -264,5 +289,9 @@ mod mongodb { async fn get(&self) -> Result { Ok(self.clone()) } + + async fn close(&self) { + // nothing to do for mongodb + } } }