Init sync DB pools inside of 'spawn_blocking'.

One situation where this is noticeable is when initialization failure
leads to `Drop`; unlike in a successful initialization, `postgres`
detects and panics when `Drop` is called from within asynchronous code.
Several other database pools do not panic in this same situation, but
would still block the current thread.

Also sets the minimum version of `tokio` to 1.4 in `rocket_contrib`,
which is the version where `Handle::block_on` (used in `Drop` impls) was
introduced.

Fixes #1610.
This commit is contained in:
Jeb Rosen 2021-04-23 07:55:55 -07:00 committed by Sergio Benitez
parent 691d3f2d95
commit 078cf1725f
2 changed files with 27 additions and 19 deletions

View File

@ -42,7 +42,7 @@ diesel_mysql_pool = ["databases", "diesel/mysql", "diesel/r2d2"]
[dependencies]
# Global dependencies.
tokio = { version = "1.0", optional = true }
tokio = { version = "1.4", optional = true }
rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true }
rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false }
log = "0.4"

View File

@ -71,23 +71,25 @@ macro_rules! dberr {
impl<K: 'static, C: Poolable> ConnectionPool<K, C> {
pub fn fairing(fairing_name: &'static str, db: &'static str) -> impl Fairing {
AdHoc::try_on_ignite(fairing_name, move |rocket| async move {
let config = match Config::from(db, &rocket) {
Ok(config) => config,
Err(e) => dberr!("config", db, "{}", e, rocket),
};
run_blocking(move || {
let config = match Config::from(db, &rocket) {
Ok(config) => config,
Err(e) => dberr!("config", db, "{}", e, rocket),
};
let pool_size = config.pool_size;
match C::pool(db, &rocket) {
Ok(pool) => Ok(rocket.manage(ConnectionPool::<K, C> {
config,
pool: Some(pool),
semaphore: Arc::new(Semaphore::new(pool_size as usize)),
_marker: PhantomData,
})),
Err(Error::Config(e)) => dberr!("config", db, "{}", e, rocket),
Err(Error::Pool(e)) => dberr!("pool init", db, "{}", e, rocket),
Err(Error::Custom(e)) => dberr!("pool manager", db, "{:?}", e, rocket),
}
let pool_size = config.pool_size;
match C::pool(db, &rocket) {
Ok(pool) => Ok(rocket.manage(ConnectionPool::<K, C> {
config,
pool: Some(pool),
semaphore: Arc::new(Semaphore::new(pool_size as usize)),
_marker: PhantomData,
})),
Err(Error::Config(e)) => dberr!("config", db, "{}", e, rocket),
Err(Error::Pool(e)) => dberr!("pool init", db, "{}", e, rocket),
Err(Error::Custom(e)) => dberr!("pool manager", db, "{:?}", e, rocket),
}
}).await
})
}
@ -158,7 +160,10 @@ impl<K: 'static, C: Poolable> Connection<K, C> {
run_blocking(move || {
// And then re-enter the runtime to wait on the async mutex, but in
// a blocking fashion.
let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await });
let mut connection = tokio::runtime::Handle::current().block_on(async {
connection.lock_owned().await
});
let conn = connection.as_mut()
.expect("internal invariant broken: self.connection is Some");
f(conn)
@ -173,7 +178,10 @@ impl<K, C: Poolable> Drop for Connection<K, C> {
// See same motivation above for this arrangement of spawn_blocking/block_on
tokio::task::spawn_blocking(move || {
let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await });
let mut connection = tokio::runtime::Handle::current().block_on(async {
connection.lock_owned().await
});
if let Some(conn) = connection.take() {
drop(conn);
}