Skip to content

Commit 5d371e3

Browse files
committed
WorkerThreadPool: Add safety point between languages finished and pool termination
1 parent 2a483fa commit 5d371e3

File tree

3 files changed

+98
-20
lines changed

3 files changed

+98
-20
lines changed

core/object/worker_thread_pool.cpp

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ void WorkerThreadPool::_thread_function(void *p_user) {
186186
{
187187
MutexLock lock(singleton->task_mutex);
188188

189-
bool exit = singleton->_handle_runlevel();
189+
bool exit = singleton->_handle_runlevel(thread_data, lock);
190190
if (unlikely(exit)) {
191191
break;
192192
}
@@ -207,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
207207
}
208208
}
209209

210-
void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
210+
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
211211
// Fall back to processing on the calling thread if there are no worker threads.
212212
// Separated into its own variable to make it easier to extend this logic
213213
// in custom builds.
214214
bool process_on_calling_thread = threads.size() == 0;
215215
if (process_on_calling_thread) {
216-
task_mutex.unlock();
216+
p_lock.temp_unlock();
217217
for (uint32_t i = 0; i < p_count; i++) {
218218
_process_task(p_tasks[i]);
219219
}
220+
p_lock.temp_relock();
220221
return;
221222
}
222223

224+
while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
225+
control_cond_var.wait(p_lock);
226+
}
227+
223228
uint32_t to_process = 0;
224229
uint32_t to_promote = 0;
225230

@@ -241,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
241246
}
242247

243248
_notify_threads(caller_pool_thread, to_process, to_promote);
244-
245-
task_mutex.unlock();
246249
}
247250

248251
void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -326,7 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
326329
}
327330

328331
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
329-
task_mutex.lock();
332+
MutexLock<BinaryMutex> lock(task_mutex);
333+
330334
// Get a free task
331335
Task *task = task_allocator.alloc();
332336
TaskID id = last_task++;
@@ -338,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
338342
task->template_userdata = p_template_userdata;
339343
tasks.insert(id, task);
340344

341-
_post_tasks_and_unlock(&task, 1, p_high_priority);
345+
_post_tasks(&task, 1, p_high_priority, lock);
342346

343347
return id;
344348
}
@@ -454,7 +458,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
454458
bool was_signaled = p_caller_pool_thread->signaled;
455459
p_caller_pool_thread->signaled = false;
456460

457-
bool exit = _handle_runlevel();
461+
bool exit = _handle_runlevel(p_caller_pool_thread, lock);
458462
if (unlikely(exit)) {
459463
break;
460464
}
@@ -523,27 +527,62 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
523527
void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
524528
DEV_ASSERT(p_runlevel > runlevel);
525529
runlevel = p_runlevel;
530+
memset(&runlevel_data, 0, sizeof(runlevel_data));
526531
for (uint32_t i = 0; i < threads.size(); i++) {
527532
threads[i].cond_var.notify_one();
528533
threads[i].signaled = true;
529534
}
535+
control_cond_var.notify_all();
530536
}
531537

532538
// Returns whether threads have to exit. This may perform the check about handling needed.
533-
bool WorkerThreadPool::_handle_runlevel() {
534-
return runlevel == RUNLEVEL_EXIT;
539+
bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
540+
bool exit = false;
541+
switch (runlevel) {
542+
case RUNLEVEL_NORMAL: {
543+
} break;
544+
case RUNLEVEL_PRE_EXIT_LANGUAGES: {
545+
if (!p_thread_data->pre_exited_languages) {
546+
if (!task_queue.first() && !low_priority_task_queue.first()) {
547+
p_thread_data->pre_exited_languages = true;
548+
runlevel_data.pre_exit_languages.num_idle_threads++;
549+
control_cond_var.notify_all();
550+
}
551+
}
552+
} break;
553+
case RUNLEVEL_EXIT_LANGUAGES: {
554+
if (!p_thread_data->exited_languages) {
555+
p_lock.temp_unlock();
556+
ScriptServer::thread_exit();
557+
p_lock.temp_relock();
558+
p_thread_data->exited_languages = true;
559+
runlevel_data.exit_languages.num_exited_threads++;
560+
control_cond_var.notify_all();
561+
}
562+
} break;
563+
case RUNLEVEL_EXIT: {
564+
exit = true;
565+
} break;
566+
}
567+
return exit;
535568
}
536569

537570
void WorkerThreadPool::yield() {
538571
int th_index = get_thread_index();
539572
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
540573
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
541574

542-
// If this long-lived task started before the scripting server was initialized,
543-
// now is a good time to have scripting languages ready for the current thread.
544-
// Otherwise, such a piece of setup won't happen unless another task has been
545-
// run during the collaborative wait.
546-
ScriptServer::thread_enter();
575+
task_mutex.lock();
576+
if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
577+
// If this long-lived task started before the scripting server was initialized,
578+
// now is a good time to have scripting languages ready for the current thread.
579+
// Otherwise, such a piece of setup won't happen unless another task has been
580+
// run during the collaborative wait.
581+
task_mutex.unlock();
582+
ScriptServer::thread_enter();
583+
} else {
584+
task_mutex.unlock();
585+
}
547586
}
548587

549588
void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -573,7 +612,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
573612
p_tasks = MAX(1u, threads.size());
574613
}
575614

576-
task_mutex.lock();
615+
MutexLock<BinaryMutex> lock(task_mutex);
616+
577617
Group *group = group_allocator.alloc();
578618
GroupID id = last_task++;
579619
group->max = p_elements;
@@ -608,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
608648

609649
groups[id] = group;
610650

611-
_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
651+
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
612652

613653
return id;
614654
}
@@ -731,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
731771
}
732772
}
733773

774+
void WorkerThreadPool::exit_languages_threads() {
775+
if (threads.size() == 0) {
776+
return;
777+
}
778+
779+
MutexLock lock(task_mutex);
780+
781+
// Wait until all threads are idle.
782+
_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
783+
while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
784+
control_cond_var.wait(lock);
785+
}
786+
787+
// Wait until all threads have detached from scripting languages.
788+
_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
789+
while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
790+
control_cond_var.wait(lock);
791+
}
792+
}
793+
734794
void WorkerThreadPool::finish() {
735795
if (threads.size() == 0) {
736796
return;

core/object/worker_thread_pool.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,20 +114,35 @@ class WorkerThreadPool : public Object {
114114
Thread thread;
115115
bool signaled : 1;
116116
bool yield_is_over : 1;
117+
bool pre_exited_languages : 1;
118+
bool exited_languages : 1;
117119
Task *current_task = nullptr;
118120
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
119121
ConditionVariable cond_var;
120122

121123
ThreadData() :
122124
signaled(false),
123-
yield_is_over(false) {}
125+
yield_is_over(false),
126+
pre_exited_languages(false),
127+
exited_languages(false) {}
124128
};
125129

126130
TightLocalVector<ThreadData> threads;
127131
enum Runlevel {
128132
RUNLEVEL_NORMAL,
133+
RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
134+
RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
129135
RUNLEVEL_EXIT,
130136
} runlevel = RUNLEVEL_NORMAL;
137+
union { // Cleared on every runlevel change.
138+
struct {
139+
uint32_t num_idle_threads;
140+
} pre_exit_languages;
141+
struct {
142+
uint32_t num_exited_threads;
143+
} exit_languages;
144+
} runlevel_data;
145+
ConditionVariable control_cond_var;
131146

132147
HashMap<Thread::ID, int> thread_ids;
133148
HashMap<
@@ -155,7 +170,7 @@ class WorkerThreadPool : public Object {
155170

156171
void _process_task(Task *task);
157172

158-
void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
173+
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
159174
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
160175

161176
bool _try_promote_low_priority_task();
@@ -197,7 +212,7 @@ class WorkerThreadPool : public Object {
197212
void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
198213

199214
void _switch_runlevel(Runlevel p_runlevel);
200-
bool _handle_runlevel();
215+
bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);
201216

202217
#ifdef THREADS_ENABLED
203218
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
@@ -262,6 +277,7 @@ class WorkerThreadPool : public Object {
262277
#endif
263278

264279
void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
280+
void exit_languages_threads();
265281
void finish();
266282
WorkerThreadPool();
267283
~WorkerThreadPool();

main/main.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) {
45014501
ResourceLoader::clear_translation_remaps();
45024502
ResourceLoader::clear_path_remaps();
45034503

4504+
WorkerThreadPool::get_singleton()->exit_languages_threads();
4505+
45044506
ScriptServer::finish_languages();
45054507

45064508
// Sync pending commands that may have been queued from a different thread during ScriptServer finalization

0 commit comments

Comments
 (0)