Merge pull request #49463 from reduz/refactor-command-queue

Refactor CommandQueueMT
This commit is contained in:
Rémi Verschelde 2021-06-09 21:06:50 +02:00 committed by GitHub
commit 4f54470c2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 211 deletions

View File

@ -70,35 +70,7 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
return &sync_sems[idx]; return &sync_sems[idx];
} }
bool CommandQueueMT::dealloc_one() {
tryagain:
if (dealloc_ptr == (write_ptr_and_epoch >> 1)) {
// The queue is empty
return false;
}
uint32_t size = *(uint32_t *)&command_mem[dealloc_ptr];
if (size == 0) {
// End of command buffer wrap down
dealloc_ptr = 0;
goto tryagain;
}
if (size & 1) {
// Still used, nothing can be deallocated
return false;
}
dealloc_ptr += (size >> 1) + 8;
return true;
}
CommandQueueMT::CommandQueueMT(bool p_sync) { CommandQueueMT::CommandQueueMT(bool p_sync) {
command_mem_size = GLOBAL_DEF_RST("memory/limits/command_queue/multithreading_queue_size_kb", DEFAULT_COMMAND_MEM_SIZE_KB);
ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/command_queue/multithreading_queue_size_kb", PropertyInfo(Variant::INT, "memory/limits/command_queue/multithreading_queue_size_kb", PROPERTY_HINT_RANGE, "1,4096,1,or_greater"));
command_mem_size *= 1024;
command_mem = (uint8_t *)memalloc(command_mem_size);
if (p_sync) { if (p_sync) {
sync = memnew(Semaphore); sync = memnew(Semaphore);
} }
@ -108,5 +80,4 @@ CommandQueueMT::~CommandQueueMT() {
if (sync) { if (sync) {
memdelete(sync); memdelete(sync);
} }
memfree(command_mem);
} }

View File

@ -34,6 +34,8 @@
#include "core/os/memory.h" #include "core/os/memory.h"
#include "core/os/mutex.h" #include "core/os/mutex.h"
#include "core/os/semaphore.h" #include "core/os/semaphore.h"
#include "core/string/print_string.h"
#include "core/templates/local_vector.h"
#include "core/templates/simple_type.h" #include "core/templates/simple_type.h"
#include "core/typedefs.h" #include "core/typedefs.h"
@ -334,11 +336,7 @@ class CommandQueueMT {
SYNC_SEMAPHORES = 8 SYNC_SEMAPHORES = 8
}; };
uint8_t *command_mem = nullptr; LocalVector<uint8_t> command_mem;
uint32_t read_ptr_and_epoch = 0;
uint32_t write_ptr_and_epoch = 0;
uint32_t dealloc_ptr = 0;
uint32_t command_mem_size = 0;
SyncSemaphore sync_sems[SYNC_SEMAPHORES]; SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex; Mutex mutex;
Semaphore *sync = nullptr; Semaphore *sync = nullptr;
@ -346,138 +344,47 @@ class CommandQueueMT {
template <class T> template <class T>
T *allocate() { T *allocate() {
// alloc size is size+T+safeguard // alloc size is size+T+safeguard
uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1)) + 8; uint32_t alloc_size = ((sizeof(T) + 8 - 1) & ~(8 - 1));
uint64_t size = command_mem.size();
// Assert that the buffer is big enough to hold at least two messages. command_mem.resize(size + alloc_size + 8);
ERR_FAIL_COND_V(alloc_size * 2 + sizeof(uint32_t) > command_mem_size, nullptr); *(uint64_t *)&command_mem[size] = alloc_size;
T *cmd = memnew_placement(&command_mem[size + 8], T);
tryagain:
uint32_t write_ptr = write_ptr_and_epoch >> 1;
if (write_ptr < dealloc_ptr) {
// behind dealloc_ptr, check that there is room
if ((dealloc_ptr - write_ptr) <= alloc_size) {
// There is no more room, try to deallocate something
if (dealloc_one()) {
goto tryagain;
}
return nullptr;
}
} else {
// ahead of dealloc_ptr, check that there is room
if ((command_mem_size - write_ptr) < alloc_size + sizeof(uint32_t)) {
// no room at the end, wrap down;
if (dealloc_ptr == 0) { // don't want write_ptr to become dealloc_ptr
// There is no more room, try to deallocate something
if (dealloc_one()) {
goto tryagain;
}
return nullptr;
}
// if this happens, it's a bug
ERR_FAIL_COND_V((command_mem_size - write_ptr) < 8, nullptr);
// zero means, wrap to beginning
uint32_t *p = (uint32_t *)&command_mem[write_ptr];
*p = 1;
write_ptr_and_epoch = 0 | (1 & ~write_ptr_and_epoch); // Invert epoch.
// See if we can get the thread to run and clear up some more space while we wait.
// This is required if alloc_size * 2 + 4 > COMMAND_MEM_SIZE
if (sync) {
sync->post();
}
goto tryagain;
}
}
// Allocate the size and the 'in use' bit.
// First bit used to mark if command is still in use (1)
// or if it has been destroyed and can be deallocated (0).
uint32_t size = (sizeof(T) + 8 - 1) & ~(8 - 1);
uint32_t *p = (uint32_t *)&command_mem[write_ptr];
*p = (size << 1) | 1;
write_ptr += 8;
// allocate the command
T *cmd = memnew_placement(&command_mem[write_ptr], T);
write_ptr += size;
write_ptr_and_epoch = (write_ptr << 1) | (write_ptr_and_epoch & 1);
return cmd; return cmd;
} }
template <class T> template <class T>
T *allocate_and_lock() { T *allocate_and_lock() {
lock(); lock();
T *ret; T *ret = allocate<T>();
while ((ret = allocate<T>()) == nullptr) {
unlock();
// sleep a little until fetch happened and some room is made
wait_for_flush();
lock();
}
return ret; return ret;
} }
bool flush_one(bool p_lock = true) { void _flush() {
if (p_lock) {
lock(); lock();
}
tryagain:
// tried to read an empty queue uint64_t read_ptr = 0;
if (read_ptr_and_epoch == write_ptr_and_epoch) { uint64_t limit = command_mem.size();
if (p_lock) {
unlock();
}
return false;
}
uint32_t read_ptr = read_ptr_and_epoch >> 1;
uint32_t size_ptr = read_ptr;
uint32_t size = *(uint32_t *)&command_mem[read_ptr] >> 1;
if (size == 0) {
*(uint32_t *)&command_mem[read_ptr] = 0; // clear in-use bit.
//end of ringbuffer, wrap
read_ptr_and_epoch = 0 | (1 & ~read_ptr_and_epoch); // Invert epoch.
goto tryagain;
}
while (read_ptr < limit) {
uint64_t size = *(uint64_t *)&command_mem[read_ptr];
read_ptr += 8; read_ptr += 8;
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]); CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[read_ptr]);
cmd->call(); //execute the function
cmd->post(); //release in case it needs sync/ret
cmd->~CommandBase(); //should be done, so erase the command
read_ptr += size; read_ptr += size;
}
read_ptr_and_epoch = (read_ptr << 1) | (read_ptr_and_epoch & 1); command_mem.clear();
if (p_lock) {
unlock(); unlock();
} }
cmd->call();
if (p_lock) {
lock();
}
cmd->post();
cmd->~CommandBase();
*(uint32_t *)&command_mem[size_ptr] &= ~1;
if (p_lock) {
unlock();
}
return true;
}
void lock(); void lock();
void unlock(); void unlock();
void wait_for_flush(); void wait_for_flush();
SyncSemaphore *_alloc_sync_sem(); SyncSemaphore *_alloc_sync_sem();
bool dealloc_one();
public: public:
/* NORMAL PUSH COMMANDS */ /* NORMAL PUSH COMMANDS */
@ -492,23 +399,19 @@ public:
DECL_PUSH_AND_SYNC(0) DECL_PUSH_AND_SYNC(0)
SPACE_SEP_LIST(DECL_PUSH_AND_SYNC, 15) SPACE_SEP_LIST(DECL_PUSH_AND_SYNC, 15)
void wait_and_flush_one() {
ERR_FAIL_COND(!sync);
sync->wait();
flush_one();
}
_FORCE_INLINE_ void flush_if_pending() { _FORCE_INLINE_ void flush_if_pending() {
if (unlikely(read_ptr_and_epoch != write_ptr_and_epoch)) { if (unlikely(command_mem.size() > 0)) {
flush_all(); _flush();
} }
} }
void flush_all() { void flush_all() {
//ERR_FAIL_COND(sync); _flush();
lock();
while (flush_one(false)) {
} }
unlock();
void wait_and_flush() {
ERR_FAIL_COND(!sync);
sync->wait();
_flush();
} }
CommandQueueMT(bool p_sync); CommandQueueMT(bool p_sync);

View File

@ -1139,8 +1139,6 @@
<member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;"> <member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;">
Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9". Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9".
</member> </member>
<member name="memory/limits/command_queue/multithreading_queue_size_kb" type="int" setter="" getter="" default="256">
</member>
<member name="memory/limits/message_queue/max_size_kb" type="int" setter="" getter="" default="4096"> <member name="memory/limits/message_queue/max_size_kb" type="int" setter="" getter="" default="4096">
Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here. Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.
</member> </member>

View File

@ -56,7 +56,7 @@ void PhysicsServer2DWrapMT::thread_loop() {
step_thread_up.set(); step_thread_up.set();
while (!exit.is_set()) { while (!exit.is_set()) {
// flush commands one by one, until exit is requested // flush commands one by one, until exit is requested
command_queue.wait_and_flush_one(); command_queue.wait_and_flush();
} }
command_queue.flush_all(); // flush all command_queue.flush_all(); // flush all

View File

@ -56,7 +56,7 @@ void PhysicsServer3DWrapMT::thread_loop() {
step_thread_up = true; step_thread_up = true;
while (!exit) { while (!exit) {
// flush commands one by one, until exit is requested // flush commands one by one, until exit is requested
command_queue.wait_and_flush_one(); command_queue.wait_and_flush();
} }
command_queue.flush_all(); // flush all command_queue.flush_all(); // flush all

View File

@ -358,7 +358,7 @@ void RenderingServerDefault::_thread_loop() {
draw_thread_up.set(); draw_thread_up.set();
while (!exit.is_set()) { while (!exit.is_set()) {
// flush commands one by one, until exit is requested // flush commands one by one, until exit is requested
command_queue.wait_and_flush_one(); command_queue.wait_and_flush();
} }
command_queue.flush_all(); // flush all command_queue.flush_all(); // flush all

View File

@ -156,7 +156,7 @@ public:
command_queue.flush_all(); command_queue.flush_all();
} }
for (int i = 0; i < message_count_to_read; i++) { for (int i = 0; i < message_count_to_read; i++) {
command_queue.wait_and_flush_one(); command_queue.wait_and_flush();
} }
message_count_to_read = 0; message_count_to_read = 0;
@ -276,50 +276,6 @@ TEST_CASE("[CommandQueue] Test Queue Basics") {
ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING)); ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
} }
TEST_CASE("[CommandQueue] Test Waiting at Queue Full") {
const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);
SharedThreadState sts;
sts.init_threads();
int msgs_to_add = 24; // a queue of size 1kB fundamentally cannot fit 24 matrices.
for (int i = 0; i < msgs_to_add; i++) {
sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);
}
sts.writer_threadwork.main_start_work();
// If we call main_wait_for_done, we will deadlock. So instead...
sts.message_count_to_read = 1;
sts.reader_threadwork.main_start_work();
sts.reader_threadwork.main_wait_for_done();
CHECK_MESSAGE(sts.func1_count == 1,
"Reader should have read one message");
CHECK_MESSAGE(sts.during_writing,
"Writer thread should still be blocked on writing.");
sts.message_count_to_read = msgs_to_add - 3;
sts.reader_threadwork.main_start_work();
sts.reader_threadwork.main_wait_for_done();
CHECK_MESSAGE(sts.func1_count >= msgs_to_add - 3,
"Reader should have read most messages");
sts.writer_threadwork.main_wait_for_done();
CHECK_MESSAGE(sts.during_writing == false,
"Writer thread should no longer be blocked on writing.");
sts.message_count_to_read = 2;
sts.reader_threadwork.main_start_work();
sts.reader_threadwork.main_wait_for_done();
sts.message_count_to_read = -1;
sts.reader_threadwork.main_start_work();
sts.reader_threadwork.main_wait_for_done();
CHECK_MESSAGE(sts.func1_count == msgs_to_add,
"Reader should have read all messages");
sts.destroy_threads();
CHECK_MESSAGE(sts.func1_count == msgs_to_add,
"Reader should have read no additional messages after join");
ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,
ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));
}
TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") { TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {
const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb"; const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";
ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1); ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);