diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index a00593c6793..932493e340b 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -285,10 +285,15 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin void ResourceLoader::_thread_load_function(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; // Thread-safe either if it's the current thread or a brand new one. + CallQueue *mq_override = nullptr; if (load_task.first_in_stack) { if (!load_task.dependent_path.is_empty()) { load_paths_stack.push_back(load_task.dependent_path); } + if (!Thread::is_main_thread()) { + mq_override = memnew(CallQueue); + MessageQueue::set_thread_singleton_override(mq_override); + } } else { DEV_ASSERT(load_task.dependent_path.is_empty()); } @@ -346,6 +351,11 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count)); thread_load_mutex.unlock(); + + if (load_task.first_in_stack && mq_override) { + memdelete(mq_override); + MessageQueue::set_thread_singleton_override(nullptr); + } } static String _validate_local_path(const String &p_path) { diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp index 05f4e2a8a62..55ea5f5ecda 100644 --- a/core/object/message_queue.cpp +++ b/core/object/message_queue.cpp @@ -35,14 +35,23 @@ #include "core/object/class_db.h" #include "core/object/script_language.h" -void CallQueue::_add_page() { - if (pages_used == page_messages.size()) { - pages.push_back(allocator->alloc()); - page_messages.push_back(0); +#define LOCK_MUTEX \ + if (this != MessageQueue::thread_singleton) { \ + mutex.lock(); \ } - page_messages[pages_used] = 0; + +#define UNLOCK_MUTEX \ + if (this != MessageQueue::thread_singleton) { \ + mutex.unlock(); \ + } + +void CallQueue::_add_page() { + if (pages_used == page_bytes.size()) { + pages.push_back(allocator->alloc()); + page_bytes.push_back(0); + } + page_bytes[pages_used] = 0; pages_used++; - page_offset = 0; } Error CallQueue::push_callp(ObjectID p_id, const StringName &p_method, const Variant **p_args, int p_argcount, bool p_show_error) { @@ -66,15 +75,15 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar ERR_FAIL_COND_V_MSG(room_needed > uint32_t(PAGE_SIZE_BYTES), ERR_INVALID_PARAMETER, "Message is too large to fit on a page (" + itos(PAGE_SIZE_BYTES) + " bytes), consider passing less arguments."); - mutex.lock(); + LOCK_MUTEX; _ensure_first_page(); - if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { + if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { if (pages_used == max_pages) { ERR_PRINT("Failed method: " + p_callable + ". Message queue out of memory. " + error_text); statistics(); - mutex.unlock(); + UNLOCK_MUTEX; return ERR_OUT_OF_MEMORY; } _add_page(); @@ -82,7 +91,7 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar Page *page = pages[pages_used - 1]; - uint8_t *buffer_end = &page->data[page_offset]; + uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; Message *msg = memnew_placement(buffer_end, Message); msg->args = p_argcount; @@ -104,21 +113,20 @@ Error CallQueue::push_callablep(const Callable &p_callable, const Variant **p_ar *v = *p_args[i]; } - page_messages[pages_used - 1]++; - page_offset += room_needed; + page_bytes[pages_used - 1] += room_needed; - mutex.unlock(); + UNLOCK_MUTEX; return OK; } Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant &p_value) { - mutex.lock(); + LOCK_MUTEX; uint32_t room_needed = sizeof(Message) + sizeof(Variant); _ensure_first_page(); - if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { + if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { if (pages_used == max_pages) { String type; if (ObjectDB::get_instance(p_id)) { @@ -127,14 +135,14 @@ Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant ERR_PRINT("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text); statistics(); - mutex.unlock(); + UNLOCK_MUTEX; return ERR_OUT_OF_MEMORY; } _add_page(); } Page *page = pages[pages_used - 1]; - uint8_t *buffer_end = &page->data[page_offset]; + uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; Message *msg = memnew_placement(buffer_end, Message); msg->args = 1; @@ -146,32 +154,31 @@ Error CallQueue::push_set(ObjectID p_id, const StringName &p_prop, const Variant Variant *v = memnew_placement(buffer_end, Variant); *v = p_value; - page_messages[pages_used - 1]++; - page_offset += room_needed; - mutex.unlock(); + page_bytes[pages_used - 1] += room_needed; + UNLOCK_MUTEX; return OK; } Error CallQueue::push_notification(ObjectID p_id, int p_notification) { ERR_FAIL_COND_V(p_notification < 0, ERR_INVALID_PARAMETER); - mutex.lock(); + LOCK_MUTEX; uint32_t room_needed = sizeof(Message); _ensure_first_page(); - if ((page_offset + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { + if ((page_bytes[pages_used - 1] + room_needed) > uint32_t(PAGE_SIZE_BYTES)) { if (pages_used == max_pages) { ERR_PRINT("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id) + ". Message queue out of memory. " + error_text); statistics(); - mutex.unlock(); + UNLOCK_MUTEX; return ERR_OUT_OF_MEMORY; } _add_page(); } Page *page = pages[pages_used - 1]; - uint8_t *buffer_end = &page->data[page_offset]; + uint8_t *buffer_end = &page->data[page_bytes[pages_used - 1]]; Message *msg = memnew_placement(buffer_end, Message); @@ -180,9 +187,8 @@ Error CallQueue::push_notification(ObjectID p_id, int p_notification) { //msg->target; msg->notification = p_notification; - page_messages[pages_used - 1]++; - page_offset += room_needed; - mutex.unlock(); + page_bytes[pages_used - 1] += room_needed; + UNLOCK_MUTEX; return OK; } @@ -205,26 +211,77 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args } Error CallQueue::flush() { - mutex.lock(); + LOCK_MUTEX; + + // Non-main threads are not meant to be flushed, but appended to the main one. + if (this != MessageQueue::main_singleton) { + if (pages.size() == 0) { + return OK; + } + + CallQueue *mq = MessageQueue::main_singleton; + DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. + + mq->mutex.lock(); + + // Here we're transferring the data from this queue to the main one. + // However, it's very unlikely big amounts of messages will be queued here, + // so PagedArray/Pool would be overkill. Also, in most cases the data will fit + // an already existing page of the main queue. + + // Let's see if our first (likely only) page fits the current target queue page. + uint32_t src_page = 0; + { + if (mq->pages_used) { + uint32_t dst_page = mq->pages_used - 1; + uint32_t dst_offset = mq->page_bytes[dst_page]; + if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { + memcpy(mq->pages[dst_page] + dst_offset, pages[0], page_bytes[0]); + src_page++; + } + } + } + + // Any other possibly existing source page needs to be added. + + if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { + ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); + mq->statistics(); + mq->mutex.unlock(); + return ERR_OUT_OF_MEMORY; + } + + for (; src_page < pages_used; src_page++) { + mq->_add_page(); + memcpy(mq->pages[mq->pages_used - 1], pages[src_page], page_bytes[src_page]); + mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; + } + + mq->mutex.unlock(); + + page_bytes[0] = 0; + pages_used = 1; + + return OK; + } if (pages.size() == 0) { // Never allocated - mutex.unlock(); + UNLOCK_MUTEX; return OK; // Do nothing. } if (flushing) { - mutex.unlock(); + UNLOCK_MUTEX; return ERR_BUSY; } flushing = true; uint32_t i = 0; - uint32_t j = 0; uint32_t offset = 0; - while (i < pages_used && j < page_messages[i]) { + while (i < pages_used && offset < page_bytes[i]) { Page *page = pages[i]; //lock on each iteration, so a call can re-add itself to the message queue @@ -241,7 +298,7 @@ Error CallQueue::flush() { Object *target = message->callable.get_object(); - mutex.unlock(); + UNLOCK_MUTEX; switch (message->type & FLAG_MASK) { case TYPE_CALL: { @@ -272,35 +329,32 @@ Error CallQueue::flush() { message->~Message(); - mutex.lock(); - j++; - if (j == page_messages[i]) { - j = 0; + LOCK_MUTEX; + if (offset == page_bytes[i]) { i++; offset = 0; } } - page_messages[0] = 0; - page_offset = 0; + page_bytes[0] = 0; pages_used = 1; flushing = false; - mutex.unlock(); + UNLOCK_MUTEX; return OK; } void CallQueue::clear() { - mutex.lock(); + LOCK_MUTEX; if (pages.size() == 0) { - mutex.unlock(); + UNLOCK_MUTEX; return; // Nothing to clear. } for (uint32_t i = 0; i < pages_used; i++) { uint32_t offset = 0; - for (uint32_t j = 0; j < page_messages[i]; j++) { + while (offset < page_bytes[i]) { Page *page = pages[i]; //lock on each iteration, so a call can re-add itself to the message queue @@ -312,7 +366,6 @@ void CallQueue::clear() { advance += sizeof(Variant) * message->args; } - //pre-advance so this function is reentrant offset += advance; if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { @@ -327,14 +380,13 @@ void CallQueue::clear() { } pages_used = 1; - page_offset = 0; - page_messages[0] = 0; + page_bytes[0] = 0; - mutex.unlock(); + UNLOCK_MUTEX; } void CallQueue::statistics() { - mutex.lock(); + LOCK_MUTEX; HashMap set_count; HashMap notify_count; HashMap call_count; @@ -342,7 +394,7 @@ void CallQueue::statistics() { for (uint32_t i = 0; i < pages_used; i++) { uint32_t offset = 0; - for (uint32_t j = 0; j < page_messages[i]; j++) { + while (offset < page_bytes[i]) { Page *page = pages[i]; //lock on each iteration, so a call can re-add itself to the message queue @@ -397,7 +449,6 @@ void CallQueue::statistics() { null_count++; } - //pre-advance so this function is reentrant offset += advance; if ((message->type & FLAG_MASK) != TYPE_NOTIFICATION) { @@ -426,7 +477,7 @@ void CallQueue::statistics() { print_line("NOTIFY " + itos(E.key) + ": " + itos(E.value)); } - mutex.unlock(); + UNLOCK_MUTEX; } bool CallQueue::is_flushing() const { @@ -437,7 +488,7 @@ bool CallQueue::has_messages() const { if (pages_used == 0) { return false; } - if (pages_used == 1 && page_messages[0] == 0) { + if (pages_used == 1 && page_bytes[0] == 0) { return false; } @@ -473,16 +524,21 @@ CallQueue::~CallQueue() { ////////////////////// -MessageQueue *MessageQueue::singleton = nullptr; +CallQueue *MessageQueue::main_singleton = nullptr; +thread_local CallQueue *MessageQueue::thread_singleton = nullptr; + +void MessageQueue::set_thread_singleton_override(CallQueue *p_thread_singleton) { + thread_singleton = p_thread_singleton; +} MessageQueue::MessageQueue() : CallQueue(nullptr, int(GLOBAL_DEF_RST(PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "1,512,1,or_greater"), 32)) * 1024 * 1024 / PAGE_SIZE_BYTES, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.") { - ERR_FAIL_COND_MSG(singleton != nullptr, "A MessageQueue singleton already exists."); - singleton = this; + ERR_FAIL_COND_MSG(main_singleton != nullptr, "A MessageQueue singleton already exists."); + main_singleton = this; } MessageQueue::~MessageQueue() { - singleton = nullptr; + main_singleton = nullptr; } diff --git a/core/object/message_queue.h b/core/object/message_queue.h index fe261f840ed..c6fcccbd586 100644 --- a/core/object/message_queue.h +++ b/core/object/message_queue.h @@ -70,10 +70,9 @@ private: bool allocator_is_custom = false; LocalVector pages; - LocalVector page_messages; + LocalVector page_bytes; uint32_t max_pages = 0; uint32_t pages_used = 0; - uint32_t page_offset = 0; bool flushing = false; struct Message { @@ -88,7 +87,7 @@ private: _FORCE_INLINE_ void _ensure_first_page() { if (unlikely(pages.is_empty())) { pages.push_back(allocator->alloc()); - page_messages.push_back(0); + page_bytes.push_back(0); pages_used = 1; } } @@ -153,10 +152,15 @@ public: }; class MessageQueue : public CallQueue { - static MessageQueue *singleton; + static CallQueue *main_singleton; + static thread_local CallQueue *thread_singleton; + friend class CallQueue; public: - _FORCE_INLINE_ static MessageQueue *get_singleton() { return singleton; } + _FORCE_INLINE_ static CallQueue *get_singleton() { return thread_singleton ? thread_singleton : main_singleton; } + + static void set_thread_singleton_override(CallQueue *p_thread_singleton); + MessageQueue(); ~MessageQueue(); };