Gracefully shutdown database pools in 'db_pools'.

This commit is contained in:
Sergio Benitez 2022-07-13 14:16:14 -07:00
parent 7275df9fdf
commit a933e7234d
3 changed files with 49 additions and 4 deletions

View File

@ -1,7 +1,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use rocket::{error, info_, Build, Ignite, Phase, Rocket, Sentinel}; use rocket::{error, info_, Build, Ignite, Phase, Rocket, Sentinel, Orbit};
use rocket::fairing::{self, Fairing, Info, Kind}; use rocket::fairing::{self, Fairing, Info, Kind};
use rocket::request::{FromRequest, Outcome, Request}; use rocket::request::{FromRequest, Outcome, Request};
use rocket::http::Status; use rocket::http::Status;
@ -250,7 +250,7 @@ impl<D: Database> Fairing for Initializer<D> {
fn info(&self) -> Info { fn info(&self) -> Info {
Info { Info {
name: self.0.unwrap_or(std::any::type_name::<Self>()), name: self.0.unwrap_or(std::any::type_name::<Self>()),
kind: Kind::Ignite, kind: Kind::Ignite | Kind::Shutdown,
} }
} }
@ -272,6 +272,12 @@ impl<D: Database> Fairing for Initializer<D> {
} }
} }
} }
async fn on_shutdown(&self, rocket: &Rocket<Orbit>) {
if let Some(db) = D::fetch(rocket) {
db.close().await;
}
}
} }
#[rocket::async_trait] #[rocket::async_trait]

View File

@ -98,8 +98,10 @@
//! //!
//! # Supported Drivers //! # Supported Drivers
//! //!
//! At present, this crate supports _three_ drivers: [`deadpool`], [`sqlx`], //! At present, this crate supports _three_ drivers: [`deadpool`], [`sqlx`], and
//! and [`mongodb`]. Each driver may support multiple databases. //! [`mongodb`]. Each driver may support multiple databases. Drivers have a
//! varying degree of support for graceful shutdown, affected by the
//! `Type::init()` fairing on Rocket shutdown.
//! //!
//! ## `deadpool` (v0.9) //! ## `deadpool` (v0.9)
//! //!
@ -108,6 +110,9 @@
//! | Postgres | `deadpool_postgres` | [`deadpool_postgres::Pool`] | [`deadpool_postgres::ClientWrapper`] | //! | Postgres | `deadpool_postgres` | [`deadpool_postgres::Pool`] | [`deadpool_postgres::ClientWrapper`] |
//! | Redis | `deadpool_redis` | [`deadpool_redis::Pool`] | [`deadpool_redis::Connection`] | //! | Redis | `deadpool_redis` | [`deadpool_redis::Pool`] | [`deadpool_redis::Connection`] |
//! //!
//! On shutdown, new connections are denied. Shutdown _does not_ wait for
//! connections to be returned.
//!
//! ## `sqlx` (v0.5) //! ## `sqlx` (v0.5)
//! //!
//! | Database | Feature | [`Pool`] Type | [`Connection`] Deref | //! | Database | Feature | [`Pool`] Type | [`Connection`] Deref |
@ -126,12 +131,17 @@
//! [`sqlx::PoolConnection<Sqlite>`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html //! [`sqlx::PoolConnection<Sqlite>`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html
//! [`sqlx::PoolConnection<Mssql>`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html //! [`sqlx::PoolConnection<Mssql>`]: https://docs.rs/sqlx/0.5/sqlx/pool/struct.PoolConnection.html
//! //!
//! On shutdown, new connections are denied. Shutdown waits for connections to
//! be returned.
//!
//! ## `mongodb` (v2) //! ## `mongodb` (v2)
//! //!
//! | Database | Feature | [`Pool`] Type and [`Connection`] Deref | //! | Database | Feature | [`Pool`] Type and [`Connection`] Deref |
//! |----------|-----------|----------------------------------------| //! |----------|-----------|----------------------------------------|
//! | MongoDB | `mongodb` | [`mongodb::Client`] | //! | MongoDB | `mongodb` | [`mongodb::Client`] |
//! //!
//! Graceful shutdown is not supported.
//!
//! ## Enabling Additional Driver Features //! ## Enabling Additional Driver Features
//! //!
//! Only the minimal features for each driver crate are enabled by //! Only the minimal features for each driver crate are enabled by

View File

@ -39,6 +39,10 @@ use {std::time::Duration, crate::{Error, Config}};
/// async fn get(&self) -> Result<Self::Connection, Self::Error> { /// async fn get(&self) -> Result<Self::Connection, Self::Error> {
/// todo!("fetch one connection from the pool"); /// todo!("fetch one connection from the pool");
/// } /// }
///
/// async fn close(&self) {
/// todo!("gracefully shutdown connection pool");
/// }
/// } /// }
/// ``` /// ```
/// ///
@ -76,6 +80,8 @@ use {std::time::Duration, crate::{Error, Config}};
/// # fn acquire(&self) -> Result<Connection, GetError> { /// # fn acquire(&self) -> Result<Connection, GetError> {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// #
/// # async fn shutdown(&self) { }
/// # } /// # }
/// ///
/// #[rocket::async_trait] /// #[rocket::async_trait]
@ -101,6 +107,10 @@ use {std::time::Duration, crate::{Error, Config}};
/// // Map errors of type `GetError` to `Error<_, GetError>`. /// // Map errors of type `GetError` to `Error<_, GetError>`.
/// self.acquire().map_err(Error::Get) /// self.acquire().map_err(Error::Get)
/// } /// }
///
/// async fn close(&self) {
/// self.shutdown().await;
/// }
/// } /// }
/// ``` /// ```
#[rocket::async_trait] #[rocket::async_trait]
@ -133,6 +143,13 @@ pub trait Pool: Sized + Send + Sync + 'static {
/// such as a preconfigured timeout elapsing or when the database server is /// such as a preconfigured timeout elapsing or when the database server is
/// unavailable. /// unavailable.
async fn get(&self) -> Result<Self::Connection, Self::Error>; async fn get(&self) -> Result<Self::Connection, Self::Error>;
/// Shutdown the connection pool, disallowing any new connections from being
/// retrieved and waking up any tasks with active connections.
///
/// The returned future may either resolve when all connections are known to
/// have closed or at any point prior. Details are implementation specific.
async fn close(&self);
} }
#[cfg(feature = "deadpool")] #[cfg(feature = "deadpool")]
@ -183,6 +200,10 @@ mod deadpool_postgres {
async fn get(&self) -> Result<Self::Connection, Self::Error> { async fn get(&self) -> Result<Self::Connection, Self::Error> {
self.get().await.map_err(Error::Get) self.get().await.map_err(Error::Get)
} }
async fn close(&self) {
<Pool<M, C>>::close(self)
}
} }
} }
@ -236,6 +257,10 @@ mod sqlx {
async fn get(&self) -> Result<Self::Connection, Self::Error> { async fn get(&self) -> Result<Self::Connection, Self::Error> {
self.acquire().await.map_err(Error::Get) self.acquire().await.map_err(Error::Get)
} }
async fn close(&self) {
<sqlx::Pool<D>>::close(self).await;
}
} }
} }
@ -264,5 +289,9 @@ mod mongodb {
async fn get(&self) -> Result<Self::Connection, Self::Error> { async fn get(&self) -> Result<Self::Connection, Self::Error> {
Ok(self.clone()) Ok(self.clone())
} }
async fn close(&self) {
// nothing to do for mongodb
}
} }
} }