From 26409607063bfbd6f1a5a5810c4b244c9df64d6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 16 Sep 2024 18:03:36 +0200 Subject: [PATCH] WorkerThreadPool: Improve logic - The main thread function and the collaborative wait functions have a much more similar structure than earlier, which yields (pun intended) better maintainability. - Also, there are not assertions anymore about the reason for ending a wait being valid, because spurious awakes can happen and so the assert would fail without that indicating an issue. --- core/object/worker_thread_pool.cpp | 65 +++++++++++++++++------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 09d6c9a30d6..91e010b2880 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -180,13 +180,16 @@ void WorkerThreadPool::_process_task(Task *p_task) { void WorkerThreadPool::_thread_function(void *p_user) { ThreadData *thread_data = (ThreadData *)p_user; + while (true) { Task *task_to_process = nullptr; { MutexLock lock(singleton->task_mutex); - if (singleton->exit_threads) { - return; + + if (unlikely(singleton->exit_threads)) { + break; } + thread_data->signaled = false; if (singleton->task_queue.first()) { @@ -194,7 +197,6 @@ void WorkerThreadPool::_thread_function(void *p_user) { singleton->task_queue.remove(singleton->task_queue.first()); } else { thread_data->cond_var.wait(lock); - DEV_ASSERT(singleton->exit_threads || thread_data->signaled); } } @@ -442,22 +444,33 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() { void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { // Keep processing tasks until the condition to stop waiting is met. -#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed) - while (true) { Task *task_to_process = nullptr; bool relock_unlockables = false; { MutexLock lock(task_mutex); + bool was_signaled = p_caller_pool_thread->signaled; p_caller_pool_thread->signaled = false; - if (IS_WAIT_OVER) { - if (unlikely(p_task == ThreadData::YIELDING)) { - p_caller_pool_thread->yield_is_over = false; - } + if (unlikely(exit_threads)) { + break; + } - if (!exit_threads && was_signaled) { + bool wait_is_over = false; + if (unlikely(p_task == ThreadData::YIELDING)) { + if (p_caller_pool_thread->yield_is_over) { + p_caller_pool_thread->yield_is_over = false; + wait_is_over = true; + } + } else { + if (p_task->completed) { + wait_is_over = true; + } + } + + if (wait_is_over) { + if (was_signaled) { // This thread was awaken for some additional reason, but it's about to exit. // Let's find out what may be pending and forward the requests. uint32_t to_process = task_queue.first() ? 1 : 0; @@ -472,28 +485,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T break; } - if (!exit_threads) { - if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { - if (_try_promote_low_priority_task()) { - _notify_threads(p_caller_pool_thread, 1, 0); - } + if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { + if (_try_promote_low_priority_task()) { + _notify_threads(p_caller_pool_thread, 1, 0); } + } - if (singleton->task_queue.first()) { - task_to_process = task_queue.first()->self(); - task_queue.remove(task_queue.first()); - } + if (singleton->task_queue.first()) { + task_to_process = task_queue.first()->self(); + task_queue.remove(task_queue.first()); + } - if (!task_to_process) { - p_caller_pool_thread->awaited_task = p_task; + if (!task_to_process) { + p_caller_pool_thread->awaited_task = p_task; - _unlock_unlockable_mutexes(); - relock_unlockables = true; - p_caller_pool_thread->cond_var.wait(lock); + _unlock_unlockable_mutexes(); + relock_unlockables = true; - DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); - p_caller_pool_thread->awaited_task = nullptr; - } + p_caller_pool_thread->cond_var.wait(lock); + + p_caller_pool_thread->awaited_task = nullptr; } }