Merge pull request #77143 from RandomShaper/fix_wtp_deadlocks

Avoid multiple possibilites of deadlock in resource loading
This commit is contained in:
Rémi Verschelde 2023-05-17 15:59:18 +02:00
commit 26f96aec9d
No known key found for this signature in database
GPG Key ID: C3336907360768E1
4 changed files with 198 additions and 61 deletions

View File

@ -476,9 +476,6 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
if (run_on_current_thread) { if (run_on_current_thread) {
load_task_ptr->thread_id = Thread::get_caller_id(); load_task_ptr->thread_id = Thread::get_caller_id();
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
} else { } else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr);
} }
@ -486,6 +483,9 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
if (run_on_current_thread) { if (run_on_current_thread) {
_thread_load_function(load_task_ptr); _thread_load_function(load_task_ptr);
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
} }
return load_token; return load_token;
@ -613,14 +613,33 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
return Ref<Resource>(); return Ref<Resource>();
} }
if (load_task.task_id != 0 && !load_task.awaited) { if (load_task.task_id != 0) {
// Loading thread is in the worker pool and still not awaited. // Loading thread is in the worker pool.
load_task.awaited = true; load_task.awaited = true;
thread_load_mutex.unlock(); thread_load_mutex.unlock();
WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
thread_load_mutex.lock(); if (err == ERR_BUSY) {
// The WorkerThreadPool has scheduled tasks in a way that the current load depends on
// another one in a lower stack frame. Restart such load here. When the stack is eventually
// unrolled, the original load will have been notified to go on.
#ifdef DEV_ENABLED
print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now.");
#endif
// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's
// an ongoing load for that resource and wait for it again. This value forces a new load.
Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE);
Ref<Resource> resource = _load_complete(*token.ptr(), &err);
if (r_error) {
*r_error = err;
}
thread_load_mutex.lock();
return resource;
} else {
DEV_ASSERT(err == OK);
thread_load_mutex.lock();
}
} else { } else {
// Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread. // Loading thread is main or user thread.
if (!load_task.cond_var) { if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable); load_task.cond_var = memnew(ConditionVariable);
} }

View File

@ -51,6 +51,23 @@ void WorkerThreadPool::_process_task_queue() {
void WorkerThreadPool::_process_task(Task *p_task) { void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority; bool low_priority = p_task->low_priority;
int pool_thread_index = -1;
Task *prev_low_prio_task = nullptr; // In case this is recursively called.
if (!use_native_low_priority_threads) {
pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
if (low_priority) {
low_priority_tasks_running++;
prev_low_prio_task = curr_thread.current_low_prio_task;
curr_thread.current_low_prio_task = p_task;
} else {
curr_thread.current_low_prio_task = nullptr;
}
task_mutex.unlock();
}
if (p_task->group) { if (p_task->group) {
// Handling a group // Handling a group
@ -126,21 +143,36 @@ void WorkerThreadPool::_process_task(Task *p_task) {
p_task->callable.callp(nullptr, 0, ret, ce); p_task->callable.callp(nullptr, 0, ret, ce);
} }
task_mutex.lock();
p_task->completed = true; p_task->completed = true;
p_task->done_semaphore.post(); for (uint8_t i = 0; i < p_task->waiting; i++) {
p_task->done_semaphore.post();
}
if (!use_native_low_priority_threads) {
p_task->pool_thread_index = -1;
}
task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
} }
if (!use_native_low_priority_threads && low_priority) { // Task may have been freed by now (all callers notified).
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue. p_task = nullptr;
if (!use_native_low_priority_threads) {
bool post = false; bool post = false;
task_mutex.lock(); task_mutex.lock();
if (low_priority_task_queue.first()) { ThreadData &curr_thread = threads[pool_thread_index];
Task *low_prio_task = low_priority_task_queue.first()->self(); curr_thread.current_low_prio_task = prev_low_prio_task;
low_priority_task_queue.remove(low_priority_task_queue.first()); if (low_priority) {
task_queue.add_last(&low_prio_task->task_elem);
post = true;
} else {
low_priority_threads_used--; low_priority_threads_used--;
low_priority_tasks_running--;
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
if (_try_promote_low_priority_task()) {
post = true;
}
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
}
} }
task_mutex.unlock(); task_mutex.unlock();
if (post) { if (post) {
@ -198,6 +230,35 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
} }
} }
bool WorkerThreadPool::_try_promote_low_priority_task() {
if (low_priority_task_queue.first()) {
Task *low_prio_task = low_priority_task_queue.first()->self();
low_priority_task_queue.remove(low_priority_task_queue.first());
task_queue.add_last(&low_prio_task->task_elem);
low_priority_threads_used++;
return true;
} else {
return false;
}
}
void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
#ifdef DEV_ENABLED
print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
#endif
// In order not to create dependency cycles, we can only schedule the next one.
// We'll keep doing the same until the deadlock is broken,
SelfList<Task> *to_promote = low_priority_task_queue.first();
if (to_promote) {
low_priority_task_queue.remove(to_promote);
task_queue.add_last(to_promote);
low_priority_threads_used++;
task_available_semaphore.post();
}
}
}
WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
} }
@ -238,66 +299,113 @@ bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
return completed; return completed;
} }
void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.lock(); task_mutex.lock();
Task **taskp = tasks.getptr(p_task_id); Task **taskp = tasks.getptr(p_task_id);
if (!taskp) { if (!taskp) {
task_mutex.unlock(); task_mutex.unlock();
ERR_FAIL_MSG("Invalid Task ID"); // Invalid task ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task
} }
Task *task = *taskp; Task *task = *taskp;
if (task->waiting) { if (!task->completed) {
String description = task->description; if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
task_mutex.unlock(); int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
if (description.is_empty()) { if (caller_pool_th_index == task->pool_thread_index) {
ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task // Deadlock prevention.
} else { // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task // and another task was run to make use of the thread in the meantime, with enough bad luck as to
// the need to wait for the original task arose in turn.
// In other words, the task we want to wait for is buried in the stack.
// Let's report the caller about the issue to it handles as it sees fit.
task_mutex.unlock();
return ERR_BUSY;
}
} }
}
task->waiting = true; task->waiting++;
task_mutex.unlock(); bool is_low_prio_waiting_for_another = false;
if (!use_native_low_priority_threads) {
if (use_native_low_priority_threads && task->low_priority) { // Deadlock prevention:
task->low_priority_thread->wait_to_finish(); // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
// we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
task_mutex.lock(); // low-prio task to the schedule queue so it can run and break the "impasse".
native_thread_allocator.free(task->low_priority_thread); // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
} else { // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
int *index = thread_ids.getptr(Thread::get_caller_id()); // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
// an issue in the real world, a further fix can be applied against that.
if (index) { if (task->low_priority) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available. bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
bool must_exit = false; if (awaiter_is_a_low_prio_task) {
while (true) { is_low_prio_waiting_for_another = true;
if (task->done_semaphore.try_wait()) { low_priority_tasks_awaiting_others++;
// If done, exit if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
break; _prevent_low_prio_saturation_deadlock();
}
if (!must_exit && task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
_process_task_queue();
continue;
} }
} }
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
} }
} else { }
task_mutex.unlock();
if (use_native_low_priority_threads && task->low_priority) {
task->done_semaphore.wait(); task->done_semaphore.wait();
} else {
bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
if (current_is_pool_thread) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available.
bool must_exit = false;
while (true) {
if (task->done_semaphore.try_wait()) {
// If done, exit
break;
}
if (!must_exit) {
if (task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
_process_task_queue();
continue;
}
} else if (!use_native_low_priority_threads && task->low_priority) {
// A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
task_mutex.lock();
bool post = _try_promote_low_priority_task();
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
}
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
}
} else {
task->done_semaphore.wait();
}
} }
task_mutex.lock(); task_mutex.lock();
if (is_low_prio_waiting_for_another) {
low_priority_tasks_awaiting_others--;
}
task->waiting--;
}
if (task->waiting == 0) {
if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
native_thread_allocator.free(task->low_priority_thread);
}
tasks.erase(p_task_id);
task_allocator.free(task);
} }
tasks.erase(p_task_id);
task_allocator.free(task);
task_mutex.unlock(); task_mutex.unlock();
return OK;
} }
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
@ -429,7 +537,7 @@ void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_pr
if (p_use_native_threads_low_priority) { if (p_use_native_threads_low_priority) {
max_low_priority_threads = 0; max_low_priority_threads = 0;
} else { } else {
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count); max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
} }
use_native_low_priority_threads = p_use_native_threads_low_priority; use_native_low_priority_threads = p_use_native_threads_low_priority;

View File

@ -81,10 +81,11 @@ private:
bool completed = false; bool completed = false;
Group *group = nullptr; Group *group = nullptr;
SelfList<Task> task_elem; SelfList<Task> task_elem;
bool waiting = false; // Waiting for completion uint32_t waiting = 0;
bool low_priority = false; bool low_priority = false;
BaseTemplateUserdata *template_userdata = nullptr; BaseTemplateUserdata *template_userdata = nullptr;
Thread *low_priority_thread = nullptr; Thread *low_priority_thread = nullptr;
int pool_thread_index = -1;
void free_template_userdata(); void free_template_userdata();
Task() : Task() :
@ -104,6 +105,7 @@ private:
struct ThreadData { struct ThreadData {
uint32_t index; uint32_t index;
Thread thread; Thread thread;
Task *current_low_prio_task = nullptr;
}; };
TightLocalVector<ThreadData> threads; TightLocalVector<ThreadData> threads;
@ -116,6 +118,8 @@ private:
bool use_native_low_priority_threads = false; bool use_native_low_priority_threads = false;
uint32_t max_low_priority_threads = 0; uint32_t max_low_priority_threads = 0;
uint32_t low_priority_threads_used = 0; uint32_t low_priority_threads_used = 0;
uint32_t low_priority_tasks_running = 0;
uint32_t low_priority_tasks_awaiting_others = 0;
uint64_t last_task = 1; uint64_t last_task = 1;
@ -127,6 +131,9 @@ private:
void _post_task(Task *p_task, bool p_high_priority); void _post_task(Task *p_task, bool p_high_priority);
bool _try_promote_low_priority_task();
void _prevent_low_prio_saturation_deadlock();
static WorkerThreadPool *singleton; static WorkerThreadPool *singleton;
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
@ -169,7 +176,7 @@ public:
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String()); TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
bool is_task_completed(TaskID p_task_id) const; bool is_task_completed(TaskID p_task_id) const;
void wait_for_task_completion(TaskID p_task_id); Error wait_for_task_completion(TaskID p_task_id);
template <class C, class M, class U> template <class C, class M, class U>
GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) { GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) {

View File

@ -100,10 +100,13 @@
</description> </description>
</method> </method>
<method name="wait_for_task_completion"> <method name="wait_for_task_completion">
<return type="void" /> <return type="int" enum="Error" />
<param index="0" name="task_id" type="int" /> <param index="0" name="task_id" type="int" />
<description> <description>
Pauses the thread that calls this method until the task with the given ID is completed. Pauses the thread that calls this method until the task with the given ID is completed.
Returns [constant @GlobalScope.OK] if the task could be successfully awaited.
Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of).
Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others.
</description> </description>
</method> </method>
</methods> </methods>