Skip to content

Commit 48403b5

Browse files
authored
Merge pull request godotengine#96959 from RandomShaper/revamp_languages_exit
WorkerThreadPool: Revamp interaction with ScriptServer
2 parents 99a7a9c + 5d371e3 commit 48403b5

File tree

4 files changed

+163
-69
lines changed

4 files changed

+163
-69
lines changed

core/object/worker_thread_pool.cpp

Lines changed: 130 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,24 @@ void WorkerThreadPool::_process_task(Task *p_task) {
180180

181181
void WorkerThreadPool::_thread_function(void *p_user) {
182182
ThreadData *thread_data = (ThreadData *)p_user;
183+
183184
while (true) {
184185
Task *task_to_process = nullptr;
185186
{
186187
MutexLock lock(singleton->task_mutex);
187-
if (singleton->exit_threads) {
188-
return;
188+
189+
bool exit = singleton->_handle_runlevel(thread_data, lock);
190+
if (unlikely(exit)) {
191+
break;
189192
}
193+
190194
thread_data->signaled = false;
191195

192196
if (singleton->task_queue.first()) {
193197
task_to_process = singleton->task_queue.first()->self();
194198
singleton->task_queue.remove(singleton->task_queue.first());
195199
} else {
196200
thread_data->cond_var.wait(lock);
197-
DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
198201
}
199202
}
200203

@@ -204,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
204207
}
205208
}
206209

207-
void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
210+
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
208211
// Fall back to processing on the calling thread if there are no worker threads.
209212
// Separated into its own variable to make it easier to extend this logic
210213
// in custom builds.
211214
bool process_on_calling_thread = threads.size() == 0;
212215
if (process_on_calling_thread) {
213-
task_mutex.unlock();
216+
p_lock.temp_unlock();
214217
for (uint32_t i = 0; i < p_count; i++) {
215218
_process_task(p_tasks[i]);
216219
}
220+
p_lock.temp_relock();
217221
return;
218222
}
219223

224+
while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
225+
control_cond_var.wait(p_lock);
226+
}
227+
220228
uint32_t to_process = 0;
221229
uint32_t to_promote = 0;
222230

@@ -238,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
238246
}
239247

240248
_notify_threads(caller_pool_thread, to_process, to_promote);
241-
242-
task_mutex.unlock();
243249
}
244250

245251
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -323,9 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
323329
}
324330

325331
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
326-
ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a task because the WorkerThreadPool is either not initialized yet or already terminated.");
332+
MutexLock<BinaryMutex> lock(task_mutex);
327333

328-
task_mutex.lock();
329334
// Get a free task
330335
Task *task = task_allocator.alloc();
331336
TaskID id = last_task++;
@@ -337,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
337342
task->template_userdata = p_template_userdata;
338343
tasks.insert(id, task);
339344

340-
_post_tasks_and_unlock(&task, 1, p_high_priority);
345+
_post_tasks(&task, 1, p_high_priority, lock);
341346

342347
return id;
343348
}
@@ -444,22 +449,34 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
444449
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
445450
// Keep processing tasks until the condition to stop waiting is met.
446451

447-
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
448-
449452
while (true) {
450453
Task *task_to_process = nullptr;
451454
bool relock_unlockables = false;
452455
{
453456
MutexLock lock(task_mutex);
457+
454458
bool was_signaled = p_caller_pool_thread->signaled;
455459
p_caller_pool_thread->signaled = false;
456460

457-
if (IS_WAIT_OVER) {
458-
if (unlikely(p_task == ThreadData::YIELDING)) {
461+
bool exit = _handle_runlevel(p_caller_pool_thread, lock);
462+
if (unlikely(exit)) {
463+
break;
464+
}
465+
466+
bool wait_is_over = false;
467+
if (unlikely(p_task == ThreadData::YIELDING)) {
468+
if (p_caller_pool_thread->yield_is_over) {
459469
p_caller_pool_thread->yield_is_over = false;
470+
wait_is_over = true;
460471
}
472+
} else {
473+
if (p_task->completed) {
474+
wait_is_over = true;
475+
}
476+
}
461477

462-
if (!exit_threads && was_signaled) {
478+
if (wait_is_over) {
479+
if (was_signaled) {
463480
// This thread was awaken for some additional reason, but it's about to exit.
464481
// Let's find out what may be pending and forward the requests.
465482
uint32_t to_process = task_queue.first() ? 1 : 0;
@@ -474,28 +491,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
474491
break;
475492
}
476493

477-
if (!exit_threads) {
478-
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
479-
if (_try_promote_low_priority_task()) {
480-
_notify_threads(p_caller_pool_thread, 1, 0);
481-
}
494+
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
495+
if (_try_promote_low_priority_task()) {
496+
_notify_threads(p_caller_pool_thread, 1, 0);
482497
}
498+
}
483499

484-
if (singleton->task_queue.first()) {
485-
task_to_process = task_queue.first()->self();
486-
task_queue.remove(task_queue.first());
487-
}
500+
if (singleton->task_queue.first()) {
501+
task_to_process = task_queue.first()->self();
502+
task_queue.remove(task_queue.first());
503+
}
488504

489-
if (!task_to_process) {
490-
p_caller_pool_thread->awaited_task = p_task;
505+
if (!task_to_process) {
506+
p_caller_pool_thread->awaited_task = p_task;
491507

492-
_unlock_unlockable_mutexes();
493-
relock_unlockables = true;
494-
p_caller_pool_thread->cond_var.wait(lock);
508+
_unlock_unlockable_mutexes();
509+
relock_unlockables = true;
495510

496-
DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
497-
p_caller_pool_thread->awaited_task = nullptr;
498-
}
511+
p_caller_pool_thread->cond_var.wait(lock);
512+
513+
p_caller_pool_thread->awaited_task = nullptr;
499514
}
500515
}
501516

@@ -509,16 +524,65 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
509524
}
510525
}
511526

527+
void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
528+
DEV_ASSERT(p_runlevel > runlevel);
529+
runlevel = p_runlevel;
530+
memset(&runlevel_data, 0, sizeof(runlevel_data));
531+
for (uint32_t i = 0; i < threads.size(); i++) {
532+
threads[i].cond_var.notify_one();
533+
threads[i].signaled = true;
534+
}
535+
control_cond_var.notify_all();
536+
}
537+
538+
// Returns whether threads have to exit. This may perform the check about handling needed.
539+
bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
540+
bool exit = false;
541+
switch (runlevel) {
542+
case RUNLEVEL_NORMAL: {
543+
} break;
544+
case RUNLEVEL_PRE_EXIT_LANGUAGES: {
545+
if (!p_thread_data->pre_exited_languages) {
546+
if (!task_queue.first() && !low_priority_task_queue.first()) {
547+
p_thread_data->pre_exited_languages = true;
548+
runlevel_data.pre_exit_languages.num_idle_threads++;
549+
control_cond_var.notify_all();
550+
}
551+
}
552+
} break;
553+
case RUNLEVEL_EXIT_LANGUAGES: {
554+
if (!p_thread_data->exited_languages) {
555+
p_lock.temp_unlock();
556+
ScriptServer::thread_exit();
557+
p_lock.temp_relock();
558+
p_thread_data->exited_languages = true;
559+
runlevel_data.exit_languages.num_exited_threads++;
560+
control_cond_var.notify_all();
561+
}
562+
} break;
563+
case RUNLEVEL_EXIT: {
564+
exit = true;
565+
} break;
566+
}
567+
return exit;
568+
}
569+
512570
void WorkerThreadPool::yield() {
513571
int th_index = get_thread_index();
514572
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
515573
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
516574

517-
// If this long-lived task started before the scripting server was initialized,
518-
// now is a good time to have scripting languages ready for the current thread.
519-
// Otherwise, such a piece of setup won't happen unless another task has been
520-
// run during the collaborative wait.
521-
ScriptServer::thread_enter();
575+
task_mutex.lock();
576+
if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
577+
// If this long-lived task started before the scripting server was initialized,
578+
// now is a good time to have scripting languages ready for the current thread.
579+
// Otherwise, such a piece of setup won't happen unless another task has been
580+
// run during the collaborative wait.
581+
task_mutex.unlock();
582+
ScriptServer::thread_enter();
583+
} else {
584+
task_mutex.unlock();
585+
}
522586
}
523587

524588
void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -543,13 +607,13 @@ void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
543607
}
544608

545609
WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
546-
ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a group task because the WorkerThreadPool is either not initialized yet or already terminated.");
547610
ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);
548611
if (p_tasks < 0) {
549612
p_tasks = MAX(1u, threads.size());
550613
}
551614

552-
task_mutex.lock();
615+
MutexLock<BinaryMutex> lock(task_mutex);
616+
553617
Group *group = group_allocator.alloc();
554618
GroupID id = last_task++;
555619
group->max = p_elements;
@@ -584,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
584648

585649
groups[id] = group;
586650

587-
_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
651+
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
588652

589653
return id;
590654
}
@@ -687,6 +751,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
687751

688752
void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
689753
ERR_FAIL_COND(threads.size() > 0);
754+
755+
runlevel = RUNLEVEL_NORMAL;
756+
690757
if (p_thread_count < 0) {
691758
p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
692759
}
@@ -704,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
704771
}
705772
}
706773

774+
void WorkerThreadPool::exit_languages_threads() {
775+
if (threads.size() == 0) {
776+
return;
777+
}
778+
779+
MutexLock lock(task_mutex);
780+
781+
// Wait until all threads are idle.
782+
_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
783+
while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
784+
control_cond_var.wait(lock);
785+
}
786+
787+
// Wait until all threads have detached from scripting languages.
788+
_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
789+
while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
790+
control_cond_var.wait(lock);
791+
}
792+
}
793+
707794
void WorkerThreadPool::finish() {
708795
if (threads.size() == 0) {
709796
return;
@@ -716,15 +803,10 @@ void WorkerThreadPool::finish() {
716803
print_error("Task waiting was never re-claimed: " + E->self()->description);
717804
E = E->next();
718805
}
719-
}
720806

721-
{
722-
MutexLock lock(task_mutex);
723-
exit_threads = true;
724-
}
725-
for (ThreadData &data : threads) {
726-
data.cond_var.notify_one();
807+
_switch_runlevel(RUNLEVEL_EXIT);
727808
}
809+
728810
for (ThreadData &data : threads) {
729811
data.thread.wait_to_finish();
730812
}
@@ -755,5 +837,5 @@ WorkerThreadPool::WorkerThreadPool() {
755837
}
756838

757839
WorkerThreadPool::~WorkerThreadPool() {
758-
DEV_ASSERT(threads.size() == 0 && "finish() hasn't been called!");
840+
finish();
759841
}

core/object/worker_thread_pool.h

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,17 +114,35 @@ class WorkerThreadPool : public Object {
114114
Thread thread;
115115
bool signaled : 1;
116116
bool yield_is_over : 1;
117+
bool pre_exited_languages : 1;
118+
bool exited_languages : 1;
117119
Task *current_task = nullptr;
118120
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
119121
ConditionVariable cond_var;
120122

121123
ThreadData() :
122124
signaled(false),
123-
yield_is_over(false) {}
125+
yield_is_over(false),
126+
pre_exited_languages(false),
127+
exited_languages(false) {}
124128
};
125129

126130
TightLocalVector<ThreadData> threads;
127-
bool exit_threads = false;
131+
enum Runlevel {
132+
RUNLEVEL_NORMAL,
133+
RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
134+
RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
135+
RUNLEVEL_EXIT,
136+
} runlevel = RUNLEVEL_NORMAL;
137+
union { // Cleared on every runlevel change.
138+
struct {
139+
uint32_t num_idle_threads;
140+
} pre_exit_languages;
141+
struct {
142+
uint32_t num_exited_threads;
143+
} exit_languages;
144+
} runlevel_data;
145+
ConditionVariable control_cond_var;
128146

129147
HashMap<Thread::ID, int> thread_ids;
130148
HashMap<
@@ -152,7 +170,7 @@ class WorkerThreadPool : public Object {
152170

153171
void _process_task(Task *task);
154172

155-
void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
173+
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
156174
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
157175

158176
bool _try_promote_low_priority_task();
@@ -193,6 +211,9 @@ class WorkerThreadPool : public Object {
193211

194212
void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
195213

214+
void _switch_runlevel(Runlevel p_runlevel);
215+
bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);
216+
196217
#ifdef THREADS_ENABLED
197218
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
198219
#endif
@@ -256,6 +277,7 @@ class WorkerThreadPool : public Object {
256277
#endif
257278

258279
void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
280+
void exit_languages_threads();
259281
void finish();
260282
WorkerThreadPool();
261283
~WorkerThreadPool();

0 commit comments

Comments
 (0)