Skip to content

Commit 645be5c

Browse files
committed
Merge pull request godotengine#90760 from RandomShaper/cmd_queue_good_sync
CommandQueueMT: Optimize & fix handling of sync/ret commands
2 parents 70247ad + 15de869 commit 645be5c

File tree

2 files changed

+30
-72
lines changed

2 files changed

+30
-72
lines changed

core/templates/command_queue_mt.cpp

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,35 +41,6 @@ void CommandQueueMT::unlock() {
4141
mutex.unlock();
4242
}
4343

44-
void CommandQueueMT::wait_for_flush() {
45-
// wait one millisecond for a flush to happen
46-
OS::get_singleton()->delay_usec(1000);
47-
}
48-
49-
CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
50-
int idx = -1;
51-
52-
while (true) {
53-
lock();
54-
for (int i = 0; i < SYNC_SEMAPHORES; i++) {
55-
if (!sync_sems[i].in_use) {
56-
sync_sems[i].in_use = true;
57-
idx = i;
58-
break;
59-
}
60-
}
61-
unlock();
62-
63-
if (idx == -1) {
64-
wait_for_flush();
65-
} else {
66-
break;
67-
}
68-
}
69-
70-
return &sync_sems[idx];
71-
}
72-
7344
CommandQueueMT::CommandQueueMT() {
7445
}
7546

core/templates/command_queue_mt.h

Lines changed: 30 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
#define COMMAND_QUEUE_MT_H
3333

3434
#include "core/object/worker_thread_pool.h"
35+
#include "core/os/condition_variable.h"
3536
#include "core/os/memory.h"
3637
#include "core/os/mutex.h"
37-
#include "core/os/semaphore.h"
3838
#include "core/string/print_string.h"
3939
#include "core/templates/local_vector.h"
4040
#include "core/templates/simple_type.h"
@@ -251,74 +251,64 @@
251251
#define DECL_PUSH(N) \
252252
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
253253
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
254-
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
254+
MutexLock mlock(mutex); \
255+
CMD_TYPE(N) *cmd = allocate<CMD_TYPE(N)>(); \
255256
cmd->instance = p_instance; \
256257
cmd->method = p_method; \
257258
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
258259
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
259260
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
260261
} \
261-
unlock(); \
262262
}
263263

264264
#define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R>
265265

266266
#define DECL_PUSH_AND_RET(N) \
267267
template <typename T, typename M, COMMA_SEP_LIST(TYPE_PARAM, N) COMMA(N) typename R> \
268268
void push_and_ret(T *p_instance, M p_method, COMMA_SEP_LIST(PARAM, N) COMMA(N) R *r_ret) { \
269-
SyncSemaphore *ss = _alloc_sync_sem(); \
270-
CMD_RET_TYPE(N) *cmd = allocate_and_lock<CMD_RET_TYPE(N)>(); \
269+
MutexLock mlock(mutex); \
270+
CMD_RET_TYPE(N) *cmd = allocate<CMD_RET_TYPE(N)>(); \
271271
cmd->instance = p_instance; \
272272
cmd->method = p_method; \
273273
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
274274
cmd->ret = r_ret; \
275-
cmd->sync_sem = ss; \
276275
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
277276
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
278277
} \
279-
unlock(); \
280-
ss->sem.wait(); \
281-
ss->in_use = false; \
278+
sync_tail++; \
279+
_wait_for_sync(mlock); \
282280
}
283281

284282
#define CMD_SYNC_TYPE(N) CommandSync##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)>
285283

286284
#define DECL_PUSH_AND_SYNC(N) \
287285
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
288286
void push_and_sync(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
289-
SyncSemaphore *ss = _alloc_sync_sem(); \
290-
CMD_SYNC_TYPE(N) *cmd = allocate_and_lock<CMD_SYNC_TYPE(N)>(); \
287+
MutexLock mlock(mutex); \
288+
CMD_SYNC_TYPE(N) *cmd = allocate<CMD_SYNC_TYPE(N)>(); \
291289
cmd->instance = p_instance; \
292290
cmd->method = p_method; \
293291
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
294-
cmd->sync_sem = ss; \
295292
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
296293
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
297294
} \
298-
unlock(); \
299-
ss->sem.wait(); \
300-
ss->in_use = false; \
295+
sync_tail++; \
296+
_wait_for_sync(mlock); \
301297
}
302298

303299
#define MAX_CMD_PARAMS 15
304300

305301
class CommandQueueMT {
306-
struct SyncSemaphore {
307-
Semaphore sem;
308-
bool in_use = false;
309-
};
310-
311302
struct CommandBase {
303+
bool sync = false;
312304
virtual void call() = 0;
313-
virtual SyncSemaphore *get_sync_semaphore() { return nullptr; }
314305
virtual ~CommandBase() = default; // Won't be called.
315306
};
316307

317308
struct SyncCommand : public CommandBase {
318-
SyncSemaphore *sync_sem = nullptr;
319-
320-
virtual SyncSemaphore *get_sync_semaphore() override {
321-
return sync_sem;
309+
virtual void call() override {}
310+
SyncCommand() {
311+
sync = true;
322312
}
323313
};
324314

@@ -340,9 +330,11 @@ class CommandQueueMT {
340330
SYNC_SEMAPHORES = 8
341331
};
342332

333+
BinaryMutex mutex;
343334
LocalVector<uint8_t> command_mem;
344-
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
345-
Mutex mutex;
335+
ConditionVariable sync_cond_var;
336+
uint32_t sync_head = 0;
337+
uint32_t sync_tail = 0;
346338
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
347339
uint64_t flush_read_ptr = 0;
348340

@@ -357,32 +349,23 @@ class CommandQueueMT {
357349
return cmd;
358350
}
359351

360-
template <typename T>
361-
T *allocate_and_lock() {
362-
lock();
363-
T *ret = allocate<T>();
364-
return ret;
365-
}
366-
367352
void _flush() {
368-
lock();
369-
370353
if (unlikely(flush_read_ptr)) {
371354
// Re-entrant call.
372-
unlock();
373355
return;
374356
}
375357

358+
lock();
359+
376360
WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
377361
while (flush_read_ptr < command_mem.size()) {
378362
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
379363
flush_read_ptr += 8;
380364
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
381-
382-
SyncSemaphore *sync_sem = cmd->get_sync_semaphore();
383365
cmd->call();
384-
if (sync_sem) {
385-
sync_sem->sem.post(); // Release in case it needs sync/ret.
366+
if (unlikely(cmd->sync)) {
367+
sync_head++;
368+
sync_cond_var.notify_all();
386369
}
387370

388371
flush_read_ptr += size;
@@ -394,8 +377,12 @@ class CommandQueueMT {
394377
unlock();
395378
}
396379

397-
void wait_for_flush();
398-
SyncSemaphore *_alloc_sync_sem();
380+
_FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
381+
uint32_t sync_head_goal = sync_tail;
382+
do {
383+
sync_cond_var.wait(p_lock);
384+
} while (sync_head != sync_head_goal); // Can't use lower-than because of wraparound.
385+
}
399386

400387
public:
401388
void lock();

0 commit comments

Comments
 (0)