From 62d9ce6445283d2bc1daa973350f91df56a826bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 12 Jul 2024 16:03:01 +0200 Subject: [PATCH 1/4] Re-add resource thread-safety measures These deferring measures were added to aid threaded resource loading in being safe. They were removed as seemingly unneeded, but it seems they are needed so resources involved in threaded loading interact with others only after "sync points". --- core/io/resource.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/io/resource.cpp b/core/io/resource.cpp index c045c0fc742..432adb88da9 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -40,7 +40,12 @@ #include void Resource::emit_changed() { - emit_signal(CoreStringName(changed)); + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the connection happen on the call queue, later, since signals are not thread-safe. + call_deferred("emit_signal", CoreStringName(changed)); + } else { + emit_signal(CoreStringName(changed)); + } } void Resource::_resource_path_changed() { @@ -161,12 +166,22 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the check and connection happen on the call queue, later, since signals are not thread-safe. + callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); + return; + } if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { + if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { + // Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. + callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); + return; + } if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } From 10b543f8a770970cac36a404f192a7f2c246894f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 15 Jul 2024 11:22:39 +0200 Subject: [PATCH 2/4] WorkerThreadPool: Fix wrong sync logic breaking task map integrity --- core/object/worker_thread_pool.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index caf4ed3835a..0356da06d99 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -397,16 +397,17 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { task->waiting_user++; } - task_mutex.unlock(); - if (caller_pool_thread) { + task_mutex.unlock(); _wait_collaboratively(caller_pool_thread, task); + task_mutex.lock(); task->waiting_pool--; if (task->waiting_pool == 0 && task->waiting_user == 0) { tasks.erase(p_task_id); task_allocator.free(task); } } else { + task_mutex.unlock(); task->done_semaphore.wait(); task_mutex.lock(); task->waiting_user--; @@ -414,9 +415,9 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { tasks.erase(p_task_id); task_allocator.free(task); } - task_mutex.unlock(); } + task_mutex.unlock(); return OK; } From 5b5cdf2414f4b9ac22eb6e4d01209c425ced4f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Tue, 9 Jul 2024 18:41:24 +0200 Subject: [PATCH 3/4] Fixup recent changes to threading concerns ResourceLoader: - Fix invalid tokens being returned. - Remove no longer written `ThreadLoadTask::dependent_path` and the code reading from it. - Clear deadlock hazard by keeping the mutex unlocked during userland polling. WorkerThreadPool: - Include thread call queue override in the thread state reset set, which allows to simplify the code that handled that (imperfectly) in the ResourceLoader. - Handle the mutex type correctly on entering an allowance zone. CommandQueueMT: - Handle the additional possibility of command buffer reallocation that mutex unlock allowance introduces. --- core/io/resource_loader.cpp | 88 +++++++++++++++--------------- core/io/resource_loader.h | 1 - core/object/worker_thread_pool.cpp | 5 +- core/templates/command_queue_mt.h | 8 ++- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 20dd192da1a..2d6987298ec 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -304,31 +304,23 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { thread_load_mutex.unlock(); // Thread-safe either if it's the current thread or a brand new one. - thread_local bool mq_override_present = false; CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { - mq_override_present = false; load_paths_stack = memnew(Vector); - if (!load_task.dependent_path.is_empty()) { - load_paths_stack->push_back(load_task.dependent_path); - } if (!Thread::is_main_thread()) { // Let the caller thread use its own, for added flexibility. Provide one otherwise. if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) { own_mq_override = memnew(CallQueue); MessageQueue::set_thread_singleton_override(own_mq_override); } - mq_override_present = true; set_current_thread_safe_for_nodes(true); } - } else { - DEV_ASSERT(load_task.dependent_path.is_empty()); } // -- Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); - if (mq_override_present) { + if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } @@ -473,12 +465,13 @@ Ref ResourceLoader::_load_start(const String &p_path, if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); - if (!load_token.is_valid()) { + if (load_token.is_valid()) { + return load_token; + } else { // The token is dying (reached 0 on another thread). // Ensure it's killed now so the path can be safely reused right away. thread_load_tasks[local_path].load_token->clear(); } - return load_token; } load_token.instantiate(); @@ -560,37 +553,44 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) { } ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) { - MutexLock thread_load_lock(thread_load_mutex); + bool ensure_progress = false; + ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS; + { + MutexLock thread_load_lock(thread_load_mutex); - if (!user_load_tokens.has(p_path)) { - print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); - return THREAD_LOAD_INVALID_RESOURCE; - } - - String local_path = _validate_local_path(p_path); - if (!thread_load_tasks.has(local_path)) { -#ifdef DEV_ENABLED - CRASH_NOW(); -#endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). - return THREAD_LOAD_INVALID_RESOURCE; - } - - ThreadLoadTask &load_task = thread_load_tasks[local_path]; - ThreadLoadStatus status; - status = load_task.status; - if (r_progress) { - *r_progress = _dependency_get_progress(local_path); - } - - // Support userland polling in a loop on the main thread. - if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) { - uint64_t frame = Engine::get_singleton()->get_process_frames(); - if (frame == load_task.last_progress_check_main_thread_frame) { - _ensure_load_progress(); - } else { - load_task.last_progress_check_main_thread_frame = frame; + if (!user_load_tokens.has(p_path)) { + print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected."); + return THREAD_LOAD_INVALID_RESOURCE; } + + String local_path = _validate_local_path(p_path); + if (!thread_load_tasks.has(local_path)) { +#ifdef DEV_ENABLED + CRASH_NOW(); +#endif + // On non-dev, be defensive and at least avoid crashing (at this point at least). + return THREAD_LOAD_INVALID_RESOURCE; + } + + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + status = load_task.status; + if (r_progress) { + *r_progress = _dependency_get_progress(local_path); + } + + // Support userland polling in a loop on the main thread. + if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) { + uint64_t frame = Engine::get_singleton()->get_process_frames(); + if (frame == load_task.last_progress_check_main_thread_frame) { + ensure_progress = true; + } else { + load_task.last_progress_check_main_thread_frame = frame; + } + } + } + + if (ensure_progress) { + _ensure_load_progress(); } return status; @@ -626,13 +626,13 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { const ThreadLoadTask &load_task = thread_load_tasks[load_token->local_path]; while (load_task.status == THREAD_LOAD_IN_PROGRESS) { - if (!_ensure_load_progress()) { - // This local poll loop is not needed. - break; - } thread_load_lock.~MutexLock(); + bool exit = !_ensure_load_progress(); OS::get_singleton()->delay_usec(1000); new (&thread_load_lock) MutexLock(thread_load_mutex); + if (exit) { + break; + } } } diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 46df79ea221..5f1831f0d98 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -170,7 +170,6 @@ private: LoadToken *load_token = nullptr; String local_path; String remapped_path; - String dependent_path; String type_hint; float progress = 0.0f; float max_reported_progress = 0.0f; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 0356da06d99..a873bc1f09c 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -59,8 +59,9 @@ void WorkerThreadPool::_process_task(Task *p_task) { CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr; { - // Tasks must start with this unset. They are free to set-and-forget otherwise. + // Tasks must start with these at default values. They are free to set-and-forget otherwise. set_current_thread_safe_for_nodes(false); + MessageQueue::set_thread_singleton_override(nullptr); // Since the WorkerThreadPool is started before the script server, // its pre-created threads can't have ScriptServer::thread_enter() called on them early. // Therefore, we do it late at the first opportunity, so in case the task @@ -671,7 +672,7 @@ uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mut uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) { for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { - if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) { + if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) { // Already registered in the current thread. return UINT32_MAX; } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 0748e9cb837..1e6c6e42a96 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -370,15 +370,19 @@ class CommandQueueMT { flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); cmd->call(); + + // Handle potential realloc due to the command and unlock allowance. + cmd = reinterpret_cast(&command_mem[flush_read_ptr]); + if (unlikely(cmd->sync)) { sync_head++; unlock(); // Give an opportunity to awaiters right away. sync_cond_var.notify_all(); lock(); + // Handle potential realloc happened during unlock. + cmd = reinterpret_cast(&command_mem[flush_read_ptr]); } - // If the command involved reallocating the buffer, the address may have changed. - cmd = reinterpret_cast(&command_mem[flush_read_ptr]); cmd->~CommandBase(); flush_read_ptr += size; From 28a7a95531d0aa1d92698a4f3a1b34b70f2b047c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 15 Jul 2024 11:43:27 +0200 Subject: [PATCH 4/4] ResourceLoader: Fix sync issues with error reporting This is about not letting the resource format loader set the error code directly on the task anymore. Instead, it's stored locally and assigned only when it is right to do so. Otherwise, other tasks may see an error code in the current one before it's state having transitioned to errored. While this, besides the technically true data race, may not be a problem in practice, it causes surprising situations during debugging as it breaks assumptions. --- core/io/resource_loader.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 2d6987298ec..d606db620c1 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -319,7 +319,8 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } // -- - Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress); + Error load_err = OK; + Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } @@ -328,7 +329,8 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { load_task.resource = res; - load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0 + load_task.progress = 1.0; // It was fully loaded at this point, so force progress to 1.0. + load_task.error = load_err; if (load_task.error != OK) { load_task.status = THREAD_LOAD_FAILED; } else {