Ensure synchronous database connections are not dropped from inside async tasks.

Fixes #1610.
This commit is contained in:
Jeb Rosen 2021-04-19 21:03:27 -07:00
parent c858bd2873
commit e1307ddf48
1 changed files with 22 additions and 11 deletions

View File

@ -146,8 +146,19 @@ impl<K: 'static, C: Poolable> Connection<K, C> {
where F: FnOnce(&mut C) -> R + Send + 'static,
R: Send + 'static,
{
let mut connection = self.connection.clone().lock_owned().await;
// It is important that this inner Arc<Mutex<>> (or the OwnedMutexGuard
// derived from it) never be a variable on the stack at an await point,
// where Drop might be called at any time. This causes (synchronous)
// Drop to be called from asynchronous code, which some database
// wrappers do not or can not handle.
let connection = self.connection.clone();
// Since connection can't be on the stack in an async fn during an
// await, we have to spawn a new blocking-safe thread...
run_blocking(move || {
// And then re-enter the runtime to wait on the async mutex, but in
// a blocking fashion.
let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await });
let conn = connection.as_mut()
.expect("internal invariant broken: self.connection is Some");
f(conn)
@ -159,17 +170,17 @@ impl<K, C: Poolable> Drop for Connection<K, C> {
fn drop(&mut self) {
let connection = self.connection.clone();
let permit = self.permit.take();
tokio::spawn(async move {
let mut connection = connection.lock_owned().await;
tokio::task::spawn_blocking(move || {
if let Some(conn) = connection.take() {
drop(conn);
}
// Explicitly dropping the permit here so that it's only
// released after the connection is.
drop(permit);
})
// See same motivation above for this arrangement of spawn_blocking/block_on
tokio::task::spawn_blocking(move || {
let mut connection = tokio::runtime::Handle::current().block_on(async { connection.lock_owned().await });
if let Some(conn) = connection.take() {
drop(conn);
}
// Explicitly dropping the permit here so that it's only
// released after the connection is.
drop(permit);
});
}
}