Skip to content

Commit 2640960

Browse files
committed
WorkerThreadPool: Improve logic
- The main thread function and the collaborative wait functions have a much more similar structure than earlier, which yields (pun intended) better maintainability. - Also, there are not assertions anymore about the reason for ending a wait being valid, because spurious awakes can happen and so the assert would fail without that indicating an issue.
1 parent e2fd88e commit 2640960

File tree

1 file changed

+36
-25
lines changed

1 file changed

+36
-25
lines changed

core/object/worker_thread_pool.cpp

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,23 @@ void WorkerThreadPool::_process_task(Task *p_task) {
180180

181181
void WorkerThreadPool::_thread_function(void *p_user) {
182182
ThreadData *thread_data = (ThreadData *)p_user;
183+
183184
while (true) {
184185
Task *task_to_process = nullptr;
185186
{
186187
MutexLock lock(singleton->task_mutex);
187-
if (singleton->exit_threads) {
188-
return;
188+
189+
if (unlikely(singleton->exit_threads)) {
190+
break;
189191
}
192+
190193
thread_data->signaled = false;
191194

192195
if (singleton->task_queue.first()) {
193196
task_to_process = singleton->task_queue.first()->self();
194197
singleton->task_queue.remove(singleton->task_queue.first());
195198
} else {
196199
thread_data->cond_var.wait(lock);
197-
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
198200
}
199201
}
200202

@@ -442,22 +444,33 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
442444
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
443445
// Keep processing tasks until the condition to stop waiting is met.
444446

445-
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
446-
447447
while (true) {
448448
Task *task_to_process = nullptr;
449449
bool relock_unlockables = false;
450450
{
451451
MutexLock lock(task_mutex);
452+
452453
bool was_signaled = p_caller_pool_thread->signaled;
453454
p_caller_pool_thread->signaled = false;
454455

455-
if (IS_WAIT_OVER) {
456-
if (unlikely(p_task == ThreadData::YIELDING)) {
456+
if (unlikely(exit_threads)) {
457+
break;
458+
}
459+
460+
bool wait_is_over = false;
461+
if (unlikely(p_task == ThreadData::YIELDING)) {
462+
if (p_caller_pool_thread->yield_is_over) {
457463
p_caller_pool_thread->yield_is_over = false;
464+
wait_is_over = true;
458465
}
466+
} else {
467+
if (p_task->completed) {
468+
wait_is_over = true;
469+
}
470+
}
459471

460-
if (!exit_threads && was_signaled) {
472+
if (wait_is_over) {
473+
if (was_signaled) {
461474
// This thread was awaken for some additional reason, but it's about to exit.
462475
// Let's find out what may be pending and forward the requests.
463476
uint32_t to_process = task_queue.first() ? 1 : 0;
@@ -472,28 +485,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
472485
break;
473486
}
474487

475-
if (!exit_threads) {
476-
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
477-
if (_try_promote_low_priority_task()) {
478-
_notify_threads(p_caller_pool_thread, 1, 0);
479-
}
488+
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
489+
if (_try_promote_low_priority_task()) {
490+
_notify_threads(p_caller_pool_thread, 1, 0);
480491
}
492+
}
481493

482-
if (singleton->task_queue.first()) {
483-
task_to_process = task_queue.first()->self();
484-
task_queue.remove(task_queue.first());
485-
}
494+
if (singleton->task_queue.first()) {
495+
task_to_process = task_queue.first()->self();
496+
task_queue.remove(task_queue.first());
497+
}
486498

487-
if (!task_to_process) {
488-
p_caller_pool_thread->awaited_task = p_task;
499+
if (!task_to_process) {
500+
p_caller_pool_thread->awaited_task = p_task;
489501

490-
_unlock_unlockable_mutexes();
491-
relock_unlockables = true;
492-
p_caller_pool_thread->cond_var.wait(lock);
502+
_unlock_unlockable_mutexes();
503+
relock_unlockables = true;
493504

494-
DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
495-
p_caller_pool_thread->awaited_task = nullptr;
496-
}
505+
p_caller_pool_thread->cond_var.wait(lock);
506+
507+
p_caller_pool_thread->awaited_task = nullptr;
497508
}
498509
}
499510

0 commit comments

Comments
 (0)