Use WorkerThreadPool for threaded resource loading

This commit is contained in:
Pedro J. Estébanez 2023-05-10 11:23:07 +02:00
parent b6647a5808
commit 045401c64e
2 changed files with 50 additions and 90 deletions

View File

@ -206,13 +206,15 @@ void ResourceFormatLoader::_bind_methods() {
void ResourceLoader::LoadToken::clear() {
thread_load_mutex.lock();
Thread *thread_to_destroy = nullptr;
WorkerThreadPool::TaskID task_to_await = 0;
if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
thread_to_destroy = load_task.thread;
load_task.thread = nullptr;
if (!load_task.awaited) {
task_to_await = load_task.task_id;
load_task.awaited = true;
}
thread_load_tasks.erase(local_path);
local_path.clear();
}
@ -225,12 +227,9 @@ void ResourceLoader::LoadToken::clear() {
thread_load_mutex.unlock();
// If thread is unused, destroy it here, locally, now the token data is consistent.
if (thread_to_destroy) {
if (thread_to_destroy->is_started()) {
thread_to_destroy->wait_to_finish();
}
memdelete(thread_to_destroy);
// If task is unused, await it here, locally, now the token data is consistent.
if (task_to_await) {
WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await);
}
}
@ -284,9 +283,19 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
void ResourceLoader::_thread_load_function(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
thread_load_mutex.lock();
caller_task_id = load_task.task_id;
if (cleaning_tasks) {
load_task.status = THREAD_LOAD_FAILED;
thread_load_mutex.unlock();
return;
}
thread_load_mutex.unlock();
// Thread-safe either if it's the current thread or a brand new one.
CallQueue *mq_override = nullptr;
if (load_task.first_in_stack) {
if (load_nesting == 0) {
if (!load_task.dependent_path.is_empty()) {
load_paths_stack.push_back(load_task.dependent_path);
}
@ -341,18 +350,9 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
}
}
if (load_nesting == 0) {
thread_active_count--;
if (thread_waiting_count) {
thread_active_cond_var.notify_one();
}
}
print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
thread_load_mutex.unlock();
if (load_task.first_in_stack && mq_override) {
if (load_nesting == 0 && mq_override) {
memdelete(mq_override);
MessageQueue::set_thread_singleton_override(nullptr);
}
@ -472,46 +472,15 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
load_task_ptr = must_not_register ? &unregistered_load_task : &thread_load_tasks[local_path];
}
print_lt("REQUEST: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;
if (!run_on_current_thread && thread_active_count >= thread_active_max && load_nesting > 0) {
// No free slots for another thread, but this one is already active, so keep working here.
run_on_current_thread = true;
}
load_task_ptr->first_in_stack = run_on_current_thread ? load_nesting == 0 : true;
if (load_task_ptr->first_in_stack) {
if (!run_on_current_thread && load_paths_stack.size()) {
// The paths stack is lost across thread boundaries, so we have to remember what was the topmost path.
load_task_ptr->dependent_path = load_paths_stack[load_paths_stack.size() - 1];
}
if (thread_active_count >= thread_active_max) {
// Either the current or a new thread needs to wait for a free slot to become active.
thread_waiting_count++;
do {
thread_active_cond_var.wait(thread_load_lock);
} while (thread_active_count >= thread_active_max);
thread_waiting_count--;
}
thread_active_count++;
}
if (cleaning_tasks) {
load_task_ptr->status = THREAD_LOAD_FAILED;
return load_token;
}
if (run_on_current_thread) {
load_task_ptr->loader_id = Thread::get_caller_id();
load_task_ptr->thread_id = Thread::get_caller_id();
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
} else {
load_task_ptr->thread = memnew(Thread);
load_task_ptr->loader_id = load_task_ptr->thread->start(_thread_load_function, load_task_ptr);
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr);
}
}
@ -632,29 +601,34 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path];
if (load_task.status == THREAD_LOAD_IN_PROGRESS) {
if (load_task.loader_id == Thread::get_caller_id()) {
DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0));
if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) ||
(load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) {
// Load is in progress, but it's precisely this thread the one in charge.
// That means this is a cyclic load.
if (r_error) {
*r_error = ERR_BUSY;
}
return Ref<Resource>();
} else if (!load_task.cond_var) {
// This is the first time some thread needs to wait for this one.
load_task.cond_var = memnew(ConditionVariable);
}
// Wait for load to complete.
thread_suspended_count++;
print_lt("GET: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.cond_var);
thread_suspended_count--;
if (load_task.task_id != 0 && !load_task.awaited) {
// Loading thread is in the worker pool and still not awaited.
load_task.awaited = true;
thread_load_mutex.unlock();
WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
thread_load_mutex.lock();
} else {
// Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.cond_var);
}
}
if (cleaning_tasks) {
@ -1042,7 +1016,6 @@ void ResourceLoader::clear_thread_load_tasks() {
if (none_running) {
break;
}
thread_active_cond_var.notify_all();
thread_load_mutex.unlock();
OS::get_singleton()->delay_usec(1000);
thread_load_mutex.lock();
@ -1158,12 +1131,7 @@ bool ResourceLoader::is_cleaning_tasks() {
return cleaning_tasks;
}
void ResourceLoader::initialize() {
thread_active_max = OS::get_singleton()->get_processor_count();
thread_active_count = 0;
thread_waiting_count = 0;
thread_suspended_count = 0;
}
void ResourceLoader::initialize() {}
void ResourceLoader::finalize() {}
@ -1178,18 +1146,13 @@ bool ResourceLoader::abort_on_missing_resource = true;
bool ResourceLoader::timestamp_on_load = false;
thread_local int ResourceLoader::load_nesting = 0;
thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack;
template <>
thread_local uint32_t SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::count = 0;
SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> ResourceLoader::thread_load_mutex;
HashMap<String, ResourceLoader::ThreadLoadTask> ResourceLoader::thread_load_tasks;
ConditionVariable ResourceLoader::thread_active_cond_var;
int ResourceLoader::thread_active_count = 0;
int ResourceLoader::thread_waiting_count = 0;
int ResourceLoader::thread_suspended_count = 0;
int ResourceLoader::thread_active_max = 0;
bool ResourceLoader::cleaning_tasks = false;
HashMap<String, ResourceLoader::LoadToken *> ResourceLoader::user_load_tokens;

View File

@ -34,6 +34,7 @@
#include "core/io/resource.h"
#include "core/object/gdvirtual.gen.inc"
#include "core/object/script_language.h"
#include "core/object/worker_thread_pool.h"
#include "core/os/semaphore.h"
#include "core/os/thread.h"
@ -158,10 +159,10 @@ private:
static Ref<ResourceFormatLoader> _find_custom_resource_format_loader(String path);
struct ThreadLoadTask {
Thread *thread = nullptr;
Thread::ID loader_id = 0;
bool first_in_stack = false;
ConditionVariable *cond_var = nullptr;
WorkerThreadPool::TaskID task_id = 0; // Used if run on a worker thread from the pool.
Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load).
bool awaited = false; // If it's in the pool, this helps not awaiting from more than one dependent thread.
ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism.
LoadToken *load_token = nullptr;
String local_path;
String remapped_path;
@ -180,14 +181,10 @@ private:
static void _thread_load_function(void *p_userdata);
static thread_local int load_nesting;
static thread_local WorkerThreadPool::TaskID caller_task_id;
static thread_local Vector<String> load_paths_stack;
static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
static HashMap<String, ThreadLoadTask> thread_load_tasks;
static ConditionVariable thread_active_cond_var;
static int thread_active_count;
static int thread_waiting_count;
static int thread_suspended_count;
static int thread_active_max;
static bool cleaning_tasks;
static HashMap<String, LoadToken *> user_load_tokens;