From 078cf1725f5c7e50b45beeb1ec3e5e0e3a2cb0dc Mon Sep 17 00:00:00 2001 From: Jeb Rosen Date: Fri, 23 Apr 2021 07:55:55 -0700 Subject: [PATCH] Init sync DB pools inside of 'spawn_blocking'. One situation where this is noticeable is when initialization failure leads to `Drop`; unlike in a successful initialization, `postgres` detects and panics when `Drop` is called from within asynchronous code. Several other database pools do not panic in this same situation, but would still block the current thread. Also sets the minimum version of `tokio` to 1.4 in `rocket_contrib`, which is the version where `Handle::block_on` (used in `Drop` impls) was introduced. Fixes #1610. --- contrib/lib/Cargo.toml | 2 +- contrib/lib/src/databases/connection.rs | 44 +++++++++++++++---------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/contrib/lib/Cargo.toml b/contrib/lib/Cargo.toml index 89fa6887..b8d144b1 100644 --- a/contrib/lib/Cargo.toml +++ b/contrib/lib/Cargo.toml @@ -42,7 +42,7 @@ diesel_mysql_pool = ["databases", "diesel/mysql", "diesel/r2d2"] [dependencies] # Global dependencies. -tokio = { version = "1.0", optional = true } +tokio = { version = "1.4", optional = true } rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } log = "0.4" diff --git a/contrib/lib/src/databases/connection.rs b/contrib/lib/src/databases/connection.rs index 0f698701..8d321b61 100644 --- a/contrib/lib/src/databases/connection.rs +++ b/contrib/lib/src/databases/connection.rs @@ -71,23 +71,25 @@ macro_rules! dberr { impl ConnectionPool { pub fn fairing(fairing_name: &'static str, db: &'static str) -> impl Fairing { AdHoc::try_on_ignite(fairing_name, move |rocket| async move { - let config = match Config::from(db, &rocket) { - Ok(config) => config, - Err(e) => dberr!("config", db, "{}", e, rocket), - }; + run_blocking(move || { + let config = match Config::from(db, &rocket) { + Ok(config) => config, + Err(e) => dberr!("config", db, "{}", e, rocket), + }; - let pool_size = config.pool_size; - match C::pool(db, &rocket) { - Ok(pool) => Ok(rocket.manage(ConnectionPool:: { - config, - pool: Some(pool), - semaphore: Arc::new(Semaphore::new(pool_size as usize)), - _marker: PhantomData, - })), - Err(Error::Config(e)) => dberr!("config", db, "{}", e, rocket), - Err(Error::Pool(e)) => dberr!("pool init", db, "{}", e, rocket), - Err(Error::Custom(e)) => dberr!("pool manager", db, "{:?}", e, rocket), - } + let pool_size = config.pool_size; + match C::pool(db, &rocket) { + Ok(pool) => Ok(rocket.manage(ConnectionPool:: { + config, + pool: Some(pool), + semaphore: Arc::new(Semaphore::new(pool_size as usize)), + _marker: PhantomData, + })), + Err(Error::Config(e)) => dberr!("config", db, "{}", e, rocket), + Err(Error::Pool(e)) => dberr!("pool init", db, "{}", e, rocket), + Err(Error::Custom(e)) => dberr!("pool manager", db, "{:?}", e, rocket), + } + }).await }) } @@ -158,7 +160,10 @@ impl Connection { run_blocking(move || { // And then re-enter the runtime to wait on the async mutex, but in // a blocking fashion. - let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await }); + let mut connection = tokio::runtime::Handle::current().block_on(async { + connection.lock_owned().await + }); + let conn = connection.as_mut() .expect("internal invariant broken: self.connection is Some"); f(conn) @@ -173,7 +178,10 @@ impl Drop for Connection { // See same motivation above for this arrangement of spawn_blocking/block_on tokio::task::spawn_blocking(move || { - let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await }); + let mut connection = tokio::runtime::Handle::current().block_on(async { + connection.lock_owned().await + }); + if let Some(conn) = connection.take() { drop(conn); }