From 03d14e436b1e9ffaf839416aa4c7fb8a5e92098e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 19 Jun 2024 08:09:59 +0200 Subject: [PATCH] WorkerThreadPool: Refactor deadlock prevention collaboration into a generic mechanism This is strictly beyond a refactor because it also changes when the mutexes are relocked, but that's only for extra safety. --- core/object/worker_thread_pool.cpp | 91 +++++++++++++++++++++++------- core/object/worker_thread_pool.h | 24 ++++++-- core/templates/command_queue_mt.h | 4 +- 3 files changed, 92 insertions(+), 27 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 9c9e0fa8991..aaf2e627368 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -33,7 +33,6 @@ #include "core/object/script_language.h" #include "core/os/os.h" #include "core/os/thread_safe.h" -#include "core/templates/command_queue_mt.h" WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; @@ -46,7 +45,9 @@ void WorkerThreadPool::Task::free_template_userdata() { WorkerThreadPool *WorkerThreadPool::singleton = nullptr; -thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr; +#ifdef THREADS_ENABLED +thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {}; +#endif void WorkerThreadPool::_process_task(Task *p_task) { #ifdef THREADS_ENABLED @@ -416,6 +417,34 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { return OK; } +void WorkerThreadPool::_lock_unlockable_mutexes() { +#ifdef THREADS_ENABLED + for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { + if (unlockable_mutexes[i]) { + if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) { + ((Mutex *)unlockable_mutexes[i])->lock(); + } else { + ((BinaryMutex *)unlockable_mutexes[i])->lock(); + } + } + } +#endif +} + +void WorkerThreadPool::_unlock_unlockable_mutexes() { +#ifdef THREADS_ENABLED + for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { + if (unlockable_mutexes[i]) { + if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) { + ((Mutex *)unlockable_mutexes[i])->unlock(); + } else { + ((BinaryMutex *)unlockable_mutexes[i])->unlock(); + } + } + } +#endif +} + void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { // Keep processing tasks until the condition to stop waiting is met. @@ -423,6 +452,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T while (true) { Task *task_to_process = nullptr; + bool relock_unlockables = false; { MutexLock lock(task_mutex); bool was_signaled = p_caller_pool_thread->signaled; @@ -460,13 +490,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T if (!task_to_process) { p_caller_pool_thread->awaited_task = p_task; - if (flushing_cmd_queue) { - flushing_cmd_queue->unlock(); - } + _unlock_unlockable_mutexes(); + relock_unlockables = true; p_caller_pool_thread->cond_var.wait(lock); - if (flushing_cmd_queue) { - flushing_cmd_queue->lock(); - } DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); p_caller_pool_thread->awaited_task = nullptr; @@ -474,6 +500,10 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T } } + if (relock_unlockables) { + _lock_unlockable_mutexes(); + } + if (task_to_process) { _process_task(task_to_process); } @@ -600,13 +630,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { { Group *group = *groupp; - if (flushing_cmd_queue) { - flushing_cmd_queue->unlock(); - } + _unlock_unlockable_mutexes(); group->done_semaphore.wait(); - if (flushing_cmd_queue) { - flushing_cmd_queue->lock(); - } + _lock_unlockable_mutexes(); uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. @@ -630,16 +656,41 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } -void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) { - ERR_FAIL_COND(flushing_cmd_queue != nullptr); - flushing_cmd_queue = p_queue; +#ifdef THREADS_ENABLED +uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) { + return _thread_enter_unlock_allowance_zone(p_mutex, false); } -void WorkerThreadPool::thread_exit_command_queue_mt_flush() { - ERR_FAIL_NULL(flushing_cmd_queue); - flushing_cmd_queue = nullptr; +uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) { + return _thread_enter_unlock_allowance_zone(p_mutex, true); } +uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) { + for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { + if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) { + // Already registered in the current thread. + return UINT32_MAX; + } + if (!unlockable_mutexes[i]) { + unlockable_mutexes[i] = (uintptr_t)p_mutex; + if (p_is_binary) { + unlockable_mutexes[i] |= 1; + } + return i; + } + } + ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug."); +} + +void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) { + if (p_zone_id == UINT32_MAX) { + return; + } + DEV_ASSERT(unlockable_mutexes[p_zone_id]); + unlockable_mutexes[p_zone_id] = 0; +} +#endif + void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) { ERR_FAIL_COND(threads.size() > 0); if (p_thread_count < 0) { diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index a9cf260a0f2..8774143abfe 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -41,8 +41,6 @@ #include "core/templates/rid.h" #include "core/templates/safe_refcount.h" -class CommandQueueMT; - class WorkerThreadPool : public Object { GDCLASS(WorkerThreadPool, Object) public: @@ -163,7 +161,10 @@ private: static WorkerThreadPool *singleton; - static thread_local CommandQueueMT *flushing_cmd_queue; +#ifdef THREADS_ENABLED + static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2; + static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES]; +#endif TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description); @@ -190,6 +191,13 @@ private: void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); +#ifdef THREADS_ENABLED + static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary); +#endif + + void _lock_unlockable_mutexes(); + void _unlock_unlockable_mutexes(); + protected: static void _bind_methods(); @@ -232,8 +240,14 @@ public: static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); - static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue); - static void thread_exit_command_queue_mt_flush(); +#ifdef THREADS_ENABLED + static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex); + static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex); + static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id); +#else + static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; } + static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {} +#endif void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); void finish(); diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 349404d75bd..0748e9cb837 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -364,7 +364,7 @@ class CommandQueueMT { lock(); - WorkerThreadPool::thread_enter_command_queue_mt_flush(this); + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&mutex); while (flush_read_ptr < command_mem.size()) { uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; flush_read_ptr += 8; @@ -383,7 +383,7 @@ class CommandQueueMT { flush_read_ptr += size; } - WorkerThreadPool::thread_exit_command_queue_mt_flush(); + WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); command_mem.clear(); flush_read_ptr = 0;