WorkerThreadPool: Improve logic

- The main thread function and the collaborative wait functions have a much more similar structure than earlier, which yields (pun intended) better maintainability.
- Also, there are not assertions anymore about the reason for ending a wait being valid, because spurious awakes can happen and so the assert would fail without that indicating an issue.
This commit is contained in:
Pedro J. Estébanez 2024-09-16 18:03:36 +02:00
parent e2fd88ed91
commit 2640960706
1 changed files with 38 additions and 27 deletions

View File

@ -180,13 +180,16 @@ void WorkerThreadPool::_process_task(Task *p_task) {
void WorkerThreadPool::_thread_function(void *p_user) { void WorkerThreadPool::_thread_function(void *p_user) {
ThreadData *thread_data = (ThreadData *)p_user; ThreadData *thread_data = (ThreadData *)p_user;
while (true) { while (true) {
Task *task_to_process = nullptr; Task *task_to_process = nullptr;
{ {
MutexLock lock(singleton->task_mutex); MutexLock lock(singleton->task_mutex);
if (singleton->exit_threads) {
return; if (unlikely(singleton->exit_threads)) {
break;
} }
thread_data->signaled = false; thread_data->signaled = false;
if (singleton->task_queue.first()) { if (singleton->task_queue.first()) {
@ -194,7 +197,6 @@ void WorkerThreadPool::_thread_function(void *p_user) {
singleton->task_queue.remove(singleton->task_queue.first()); singleton->task_queue.remove(singleton->task_queue.first());
} else { } else {
thread_data->cond_var.wait(lock); thread_data->cond_var.wait(lock);
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
} }
} }
@ -442,22 +444,33 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) { void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met. // Keep processing tasks until the condition to stop waiting is met.
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
while (true) { while (true) {
Task *task_to_process = nullptr; Task *task_to_process = nullptr;
bool relock_unlockables = false; bool relock_unlockables = false;
{ {
MutexLock lock(task_mutex); MutexLock lock(task_mutex);
bool was_signaled = p_caller_pool_thread->signaled; bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false; p_caller_pool_thread->signaled = false;
if (IS_WAIT_OVER) { if (unlikely(exit_threads)) {
if (unlikely(p_task == ThreadData::YIELDING)) { break;
p_caller_pool_thread->yield_is_over = false; }
}
if (!exit_threads && was_signaled) { bool wait_is_over = false;
if (unlikely(p_task == ThreadData::YIELDING)) {
if (p_caller_pool_thread->yield_is_over) {
p_caller_pool_thread->yield_is_over = false;
wait_is_over = true;
}
} else {
if (p_task->completed) {
wait_is_over = true;
}
}
if (wait_is_over) {
if (was_signaled) {
// This thread was awaken for some additional reason, but it's about to exit. // This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests. // Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue.first() ? 1 : 0; uint32_t to_process = task_queue.first() ? 1 : 0;
@ -472,28 +485,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
break; break;
} }
if (!exit_threads) { if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) { if (_try_promote_low_priority_task()) {
if (_try_promote_low_priority_task()) { _notify_threads(p_caller_pool_thread, 1, 0);
_notify_threads(p_caller_pool_thread, 1, 0);
}
} }
}
if (singleton->task_queue.first()) { if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self(); task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first()); task_queue.remove(task_queue.first());
} }
if (!task_to_process) { if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task; p_caller_pool_thread->awaited_task = p_task;
_unlock_unlockable_mutexes(); _unlock_unlockable_mutexes();
relock_unlockables = true; relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);
DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER); p_caller_pool_thread->cond_var.wait(lock);
p_caller_pool_thread->awaited_task = nullptr;
} p_caller_pool_thread->awaited_task = nullptr;
} }
} }