Skip to content

Commit 782b9e3

Browse files
committed
Ensure that threads only process one pump task.
This is necessary because we will always deadlock if a thread takes on multiple pump tasks since pump tasks never return. This means when using separate threads for certain systems (like physics or rendering), we need to be sure that there are enough threads to have at least one per system (to ensure forward progress).
1 parent 7826b6b commit 782b9e3

File tree

6 files changed

+71
-17
lines changed

6 files changed

+71
-17
lines changed

core/object/worker_thread_pool.cpp

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ void WorkerThreadPool::_process_task(Task *p_task) {
7676
p_task->pool_thread_index = pool_thread_index;
7777
prev_task = curr_thread.current_task;
7878
curr_thread.current_task = p_task;
79+
curr_thread.has_pump_task = p_task->is_pump_task;
7980
if (p_task->pending_notify_yield_over) {
8081
curr_thread.yield_is_over = true;
8182
}
@@ -218,11 +219,13 @@ void WorkerThreadPool::_thread_function(void *p_user) {
218219
}
219220
}
220221

221-
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
222+
void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task) {
222223
// Fall back to processing on the calling thread if there are no worker threads.
223224
// Separated into its own variable to make it easier to extend this logic
224225
// in custom builds.
225-
bool process_on_calling_thread = threads.is_empty();
226+
227+
// Avoid calling pump tasks or low priority tasks from the calling thread.
228+
bool process_on_calling_thread = threads.is_empty() && p_high_priority && !p_pump_task;
226229
if (process_on_calling_thread) {
227230
p_lock.temp_unlock();
228231
for (uint32_t i = 0; i < p_count; i++) {
@@ -339,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
339342
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
340343
}
341344

342-
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
345+
WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task) {
343346
MutexLock<BinaryMutex> lock(task_mutex);
344347

345348
// Get a free task
@@ -351,15 +354,50 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
351354
task->native_func_userdata = p_userdata;
352355
task->description = p_description;
353356
task->template_userdata = p_template_userdata;
357+
task->is_pump_task = p_pump_task;
354358
tasks.insert(id, task);
355359

356-
_post_tasks(&task, 1, p_high_priority, lock);
360+
#ifdef THREADS_ENABLED
361+
if (p_pump_task) {
362+
pump_task_count++;
363+
int thread_count = get_thread_count();
364+
if (pump_task_count >= thread_count) {
365+
print_verbose(vformat("A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads.", pump_task_count + 1, thread_count)); // +1 because we want to keep a Thread without any pump tasks free.
366+
367+
Thread::Settings settings;
368+
#ifdef __APPLE__
369+
// The default stack size for new threads on Apple platforms is 512KiB.
370+
// This is insufficient when using a library like SPIRV-Cross,
371+
// which can generate deep stacks and result in a stack overflow.
372+
#ifdef DEV_ENABLED
373+
// Debug builds need an even larger stack size.
374+
settings.stack_size = 2 * 1024 * 1024; // 2 MiB
375+
#else
376+
settings.stack_size = 1 * 1024 * 1024; // 1 MiB
377+
#endif
378+
#endif
379+
// Re-sizing implies relocation, which is not supported for this array.
380+
CRASH_COND_MSG(thread_count + 1 > (int)threads.get_capacity(), "Reserve trick for worker thread pool failed. Crashing.");
381+
threads.resize_initialized(thread_count + 1);
382+
threads[thread_count].index = thread_count;
383+
threads[thread_count].pool = this;
384+
threads[thread_count].thread.start(&WorkerThreadPool::_thread_function, &threads[thread_count], settings);
385+
thread_ids.insert(threads[thread_count].thread.get_id(), thread_count);
386+
}
387+
}
388+
#endif
389+
390+
_post_tasks(&task, 1, p_high_priority, lock, p_pump_task);
357391

358392
return id;
359393
}
360394

361-
WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) {
362-
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description);
395+
WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) {
396+
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, p_pump_task);
397+
}
398+
399+
WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind(const Callable &p_action, bool p_high_priority, const String &p_description) {
400+
return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, false);
363401
}
364402

365403
bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
@@ -510,7 +548,12 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
510548

511549
if (p_caller_pool_thread->pool->task_queue.first()) {
512550
task_to_process = task_queue.first()->self();
513-
task_queue.remove(task_queue.first());
551+
if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true) && task_to_process->is_pump_task) {
552+
task_to_process = nullptr;
553+
_notify_threads(p_caller_pool_thread, 1, 0);
554+
} else {
555+
task_queue.remove(task_queue.first());
556+
}
514557
}
515558

516559
if (!task_to_process) {
@@ -661,7 +704,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
661704

662705
groups[id] = group;
663706

664-
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
707+
_post_tasks(tasks_posted, p_tasks, p_high_priority, lock, false);
665708

666709
return id;
667710
}
@@ -788,6 +831,11 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
788831

789832
print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads));
790833

834+
#ifdef THREADS_ENABLED
835+
// Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks.
836+
// We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed.
837+
threads.reserve(5);
838+
#endif
791839
threads.resize(p_thread_count);
792840

793841
Thread::Settings settings;
@@ -862,7 +910,7 @@ void WorkerThreadPool::finish() {
862910
}
863911

864912
void WorkerThreadPool::_bind_methods() {
865-
ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String()));
913+
ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task_bind, DEFVAL(false), DEFVAL(String()));
866914
ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);
867915
ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);
868916
ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id);

core/object/worker_thread_pool.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class WorkerThreadPool : public Object {
8080
Semaphore done_semaphore; // For user threads awaiting.
8181
bool completed : 1;
8282
bool pending_notify_yield_over : 1;
83+
bool is_pump_task : 1;
8384
Group *group = nullptr;
8485
SelfList<Task> task_elem;
8586
uint32_t waiting_pool = 0;
@@ -92,6 +93,7 @@ class WorkerThreadPool : public Object {
9293
Task() :
9394
completed(false),
9495
pending_notify_yield_over(false),
96+
is_pump_task(false),
9597
task_elem(this) {}
9698
};
9799

@@ -115,6 +117,7 @@ class WorkerThreadPool : public Object {
115117
bool yield_is_over : 1;
116118
bool pre_exited_languages : 1;
117119
bool exited_languages : 1;
120+
bool has_pump_task : 1; // Threads can only have one pump task.
118121
Task *current_task = nullptr;
119122
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
120123
ConditionVariable cond_var;
@@ -124,7 +127,8 @@ class WorkerThreadPool : public Object {
124127
signaled(false),
125128
yield_is_over(false),
126129
pre_exited_languages(false),
127-
exited_languages(false) {}
130+
exited_languages(false),
131+
has_pump_task(false) {}
128132
};
129133

130134
TightLocalVector<ThreadData> threads;
@@ -165,14 +169,15 @@ class WorkerThreadPool : public Object {
165169
uint32_t notify_index = 0; // For rotating across threads, no help distributing load.
166170

167171
uint64_t last_task = 1;
172+
int pump_task_count = 0;
168173

169174
static HashMap<StringName, WorkerThreadPool *> named_pools;
170175

171176
static void _thread_function(void *p_user);
172177

173178
void _process_task(Task *task);
174179

175-
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
180+
void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task);
176181
void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
177182

178183
bool _try_promote_low_priority_task();
@@ -188,7 +193,7 @@ class WorkerThreadPool : public Object {
188193
static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS];
189194
#endif
190195

191-
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
196+
TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task = false);
192197
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
193198

194199
template <typename C, typename M, typename U>
@@ -237,7 +242,8 @@ class WorkerThreadPool : public Object {
237242
return _add_task(Callable(), nullptr, nullptr, ud, p_high_priority, p_description);
238243
}
239244
TaskID add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority = false, const String &p_description = String());
240-
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
245+
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String(), bool p_pump_task = false);
246+
TaskID add_task_bind(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
241247

242248
bool is_task_completed(TaskID p_task_id) const;
243249
Error wait_for_task_completion(TaskID p_task_id);

modules/betsy/image_compress_betsy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ void BetsyCompressor::_init() {
223223
}
224224

225225
void BetsyCompressor::init() {
226-
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true);
226+
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &BetsyCompressor::_thread_loop), true, "Betsy pump task", true);
227227
command_queue.set_pump_task_id(tid);
228228
command_queue.push(this, &BetsyCompressor::_assign_mt_ids, tid);
229229
command_queue.push_and_sync(this, &BetsyCompressor::_init);

servers/physics_server_2d_wrap_mt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void PhysicsServer2DWrapMT::end_sync() {
7575

7676
void PhysicsServer2DWrapMT::init() {
7777
if (create_thread) {
78-
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true);
78+
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer2DWrapMT::_thread_loop), true, "Physics server 2D pump task", true);
7979
command_queue.set_pump_task_id(tid);
8080
command_queue.push(this, &PhysicsServer2DWrapMT::_assign_mt_ids, tid);
8181
command_queue.push_and_sync(physics_server_2d, &PhysicsServer2D::init);

servers/physics_server_3d_wrap_mt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void PhysicsServer3DWrapMT::end_sync() {
7575

7676
void PhysicsServer3DWrapMT::init() {
7777
if (create_thread) {
78-
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true);
78+
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &PhysicsServer3DWrapMT::_thread_loop), true, "Physics server 3D pump task", true);
7979
command_queue.set_pump_task_id(tid);
8080
command_queue.push(this, &PhysicsServer3DWrapMT::_assign_mt_ids, tid);
8181
command_queue.push_and_sync(physics_server_3d, &PhysicsServer3D::init);

servers/rendering/rendering_server_default.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ void RenderingServerDefault::init() {
253253
if (create_thread) {
254254
print_verbose("RenderingServerWrapMT: Starting render thread");
255255
DisplayServer::get_singleton()->release_rendering_thread();
256-
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true);
256+
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->add_task(callable_mp(this, &RenderingServerDefault::_thread_loop), true, "Rendering Server pump task", true);
257257
command_queue.set_pump_task_id(tid);
258258
command_queue.push(this, &RenderingServerDefault::_assign_mt_ids, tid);
259259
command_queue.push_and_sync(this, &RenderingServerDefault::_init);

0 commit comments

Comments
 (0)