diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 95690964856..a7c5c80ad29 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -231,23 +231,22 @@ void ResourceLoader::LoadToken::clear() { WorkerThreadPool::TaskID task_to_await = 0; + // User-facing tokens shouldn't be deleted until completely claimed. + DEV_ASSERT(user_rc == 0 && user_path.is_empty()); + if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. DEV_ASSERT(thread_load_tasks.has(local_path)); ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (!load_task.awaited) { + if (load_task.task_id && !load_task.awaited) { task_to_await = load_task.task_id; - load_task.awaited = true; } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); thread_load_tasks.erase(local_path); local_path.clear(); } - if (!user_path.is_empty()) { - DEV_ASSERT(user_load_tokens.has(user_path)); - user_load_tokens.erase(user_path); - user_path.clear(); - } - thread_load_mutex.unlock(); // If task is unused, await it here, locally, now the token data is consistent. @@ -461,36 +460,44 @@ static String _validate_local_path(const String &p_path) { } Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { - thread_load_mutex.lock(); - if (user_load_tokens.has(p_path)) { - print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); - user_load_tokens[p_path]->reference(); // Additional request. - thread_load_mutex.unlock(); - return OK; - } - user_load_tokens[p_path] = nullptr; - thread_load_mutex.unlock(); + Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true); + return token.is_valid() ? OK : FAILED; +} - Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); - if (token.is_valid()) { - thread_load_mutex.lock(); - token->user_path = p_path; - token->reference(); // First request. - user_load_tokens[p_path] = token.ptr(); - print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); - thread_load_mutex.unlock(); - return OK; +ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) { + HashMap::Iterator E = user_load_tokens.find(p_path); + if (E) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + LoadToken *token = E->value; + token->user_rc++; + return token; } else { - return FAILED; + return nullptr; } } +void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) { + p_token->user_path = p_path; + p_token->reference(); // Extra RC until all user requests have been gotten. + p_token->user_rc = 1; + user_load_tokens[p_path] = p_token; + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); +} + Ref ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { if (r_error) { *r_error = OK; } - Ref load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT; + if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) { + // If user is initiating a single-threaded load from a WorkerThreadPool task, + // we instead spawn a new task so there's a precondition that a load in a pool task + // is always initiated by the engine. That makes certain aspects simpler, such as + // cyclic load detection and awaiting. + thread_mode = LOAD_THREAD_SPAWN_SINGLE; + } + Ref load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode); if (!load_token.is_valid()) { if (r_error) { *r_error = FAILED; @@ -502,7 +509,7 @@ Ref ResourceLoader::load(const String &p_path, const String &p_type_hi return res; } -Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { +Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) { String local_path = _validate_local_path(p_path); bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; @@ -515,6 +522,13 @@ Ref ResourceLoader::_load_start(const String &p_path, { MutexLock thread_load_lock(thread_load_mutex); + if (p_for_user) { + LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path); + if (existing_token) { + return Ref(existing_token); + } + } + if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); if (load_token.is_valid()) { @@ -528,6 +542,9 @@ Ref ResourceLoader::_load_start(const String &p_path, load_token.instantiate(); load_token->local_path = local_path; + if (p_for_user) { + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } //create load task { @@ -545,6 +562,7 @@ Ref ResourceLoader::_load_start(const String &p_path, load_task.resource = existing; load_task.status = THREAD_LOAD_LOADED; load_task.progress = 1.0; + DEV_ASSERT(!thread_load_tasks.has(local_path)); thread_load_tasks[local_path] = load_task; return load_token; } @@ -576,7 +594,7 @@ Ref ResourceLoader::_load_start(const String &p_path, } else { load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } - } + } // MutexLock(thread_load_mutex). if (run_on_current_thread) { _run_load_task(load_task_ptr); @@ -666,13 +684,7 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } LoadToken *load_token = user_load_tokens[p_path]; - if (!load_token) { - // This happens if requested from one thread and rapidly querying from another. - if (r_error) { - *r_error = ERR_BUSY; - } - return Ref(); - } + DEV_ASSERT(load_token->user_rc >= 1); // Support userland requesting on the main thread before the load is reported to be complete. if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { @@ -689,8 +701,15 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } res = _load_complete_inner(*load_token, r_error, thread_load_lock); - if (load_token->unreference()) { - memdelete(load_token); + + load_token->user_rc--; + if (load_token->user_rc == 0) { + load_token->user_path.clear(); + user_load_tokens.erase(p_path); + if (load_token->unreference()) { + memdelete(load_token); + load_token = nullptr; + } } } @@ -733,55 +752,45 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } bool loader_is_wtp = load_task.task_id != 0; - Error wtp_task_err = FAILED; if (loader_is_wtp) { // Loading thread is in the worker pool. - load_task.awaited = true; thread_load_mutex.unlock(); - PREPARE_FOR_WTP_WAIT - wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); - RESTORE_AFTER_WTP_WAIT - } - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. - if (loader_is_wtp) { - if (wtp_task_err == ERR_BUSY) { - // The WorkerThreadPool has reported that the current task wants to await on an older one. - // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of - // resource loading that means that the task to wait for can be restarted here to break the - // cycle, with as much recursion into this process as needed. - // When the stack is eventually unrolled, the original load will have been notified to go on. - _run_load_task(&load_task); - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - thread_load_mutex.lock(); - return resource; - } else { - DEV_ASSERT(wtp_task_err == OK); - thread_load_mutex.lock(); - } - } else if (load_task.need_wait) { - // Loading thread is main or user thread. - if (!load_task.cond_var) { - load_task.cond_var = memnew(ConditionVariable); - } - load_task.awaiters_count++; - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.need_wait); - load_task.awaiters_count--; - if (load_task.awaiters_count == 0) { - memdelete(load_task.cond_var); - load_task.cond_var = nullptr; - } + PREPARE_FOR_WTP_WAIT + Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + RESTORE_AFTER_WTP_WAIT + + DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); + if (wait_err == ERR_BUSY) { + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. + _run_load_task(&load_task); } - } else { - if (loader_is_wtp) { - thread_load_mutex.lock(); + + thread_load_mutex.lock(); + load_task.awaited = true; + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + } else if (load_task.need_wait) { + // Loading thread is main or user thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); } + load_task.awaiters_count++; + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.need_wait); + load_task.awaiters_count--; + if (load_task.awaiters_count == 0) { + memdelete(load_task.cond_var); + load_task.cond_var = nullptr; + } + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); } } @@ -1221,10 +1230,13 @@ void ResourceLoader::clear_thread_load_tasks() { } while (user_load_tokens.begin()) { - // User load tokens remove themselves from the map on destruction. - memdelete(user_load_tokens.begin()->value); + LoadToken *user_token = user_load_tokens.begin()->value; + user_load_tokens.remove(user_load_tokens.begin()); + DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty()); + user_token->user_path.clear(); + user_token->user_rc = 0; + user_token->unreference(); } - user_load_tokens.clear(); thread_load_tasks.clear(); diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index f00a5cf1172..217c14d7a38 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -123,6 +123,7 @@ public: struct LoadToken : public RefCounted { String local_path; String user_path; + uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. Ref res_if_unregistered; void clear(); @@ -132,10 +133,13 @@ public: static const int BINARY_MUTEX_TAG = 1; - static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode); + static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false); static Ref _load_complete(LoadToken &p_load_token, Error *r_error); private: + static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path); + static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path); + static Ref _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock> &p_thread_load_lock); static Ref loader[MAX_LOADERS];