@@ -76,6 +76,7 @@ void WorkerThreadPool::_process_task(Task *p_task) {
7676 p_task->pool_thread_index = pool_thread_index;
7777 prev_task = curr_thread.current_task ;
7878 curr_thread.current_task = p_task;
79+ curr_thread.has_pump_task = p_task->is_pump_task ;
7980 if (p_task->pending_notify_yield_over ) {
8081 curr_thread.yield_is_over = true ;
8182 }
@@ -218,11 +219,13 @@ void WorkerThreadPool::_thread_function(void *p_user) {
218219 }
219220}
220221
221- void WorkerThreadPool::_post_tasks (Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
222+ void WorkerThreadPool::_post_tasks (Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task ) {
222223 // Fall back to processing on the calling thread if there are no worker threads.
223224 // Separated into its own variable to make it easier to extend this logic
224225 // in custom builds.
225- bool process_on_calling_thread = threads.is_empty ();
226+
227+ // Avoid calling pump tasks or low priority tasks from the calling thread.
228+ bool process_on_calling_thread = threads.is_empty () && p_high_priority && !p_pump_task;
226229 if (process_on_calling_thread) {
227230 p_lock.temp_unlock ();
228231 for (uint32_t i = 0 ; i < p_count; i++) {
@@ -339,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
339342 return _add_task (Callable (), p_func, p_userdata, nullptr , p_high_priority, p_description);
340343}
341344
342- WorkerThreadPool::TaskID WorkerThreadPool::_add_task (const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
345+ WorkerThreadPool::TaskID WorkerThreadPool::_add_task (const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task ) {
343346 MutexLock<BinaryMutex> lock (task_mutex);
344347
345348 // Get a free task
@@ -351,15 +354,50 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
351354 task->native_func_userdata = p_userdata;
352355 task->description = p_description;
353356 task->template_userdata = p_template_userdata;
357+ task->is_pump_task = p_pump_task;
354358 tasks.insert (id, task);
355359
356- _post_tasks (&task, 1 , p_high_priority, lock);
360+ #ifdef THREADS_ENABLED
361+ if (p_pump_task) {
362+ pump_task_count++;
363+ int thread_count = get_thread_count ();
364+ if (pump_task_count >= thread_count) {
365+ print_verbose (vformat (" A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads." , pump_task_count + 1 , thread_count)); // +1 because we want to keep a Thread without any pump tasks free.
366+
367+ Thread::Settings settings;
368+ #ifdef __APPLE__
369+ // The default stack size for new threads on Apple platforms is 512KiB.
370+ // This is insufficient when using a library like SPIRV-Cross,
371+ // which can generate deep stacks and result in a stack overflow.
372+ #ifdef DEV_ENABLED
373+ // Debug builds need an even larger stack size.
374+ settings.stack_size = 2 * 1024 * 1024 ; // 2 MiB
375+ #else
376+ settings.stack_size = 1 * 1024 * 1024 ; // 1 MiB
377+ #endif
378+ #endif
379+ // Re-sizing implies relocation, which is not supported for this array.
380+ CRASH_COND_MSG (thread_count + 1 > (int )threads.get_capacity (), " Reserve trick for worker thread pool failed. Crashing." );
381+ threads.resize_initialized (thread_count + 1 );
382+ threads[thread_count].index = thread_count;
383+ threads[thread_count].pool = this ;
384+ threads[thread_count].thread .start (&WorkerThreadPool::_thread_function, &threads[thread_count], settings);
385+ thread_ids.insert (threads[thread_count].thread .get_id (), thread_count);
386+ }
387+ }
388+ #endif
389+
390+ _post_tasks (&task, 1 , p_high_priority, lock, p_pump_task);
357391
358392 return id;
359393}
360394
361- WorkerThreadPool::TaskID WorkerThreadPool::add_task (const Callable &p_action, bool p_high_priority, const String &p_description) {
362- return _add_task (p_action, nullptr , nullptr , nullptr , p_high_priority, p_description);
395+ WorkerThreadPool::TaskID WorkerThreadPool::add_task (const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) {
396+ return _add_task (p_action, nullptr , nullptr , nullptr , p_high_priority, p_description, p_pump_task);
397+ }
398+
399+ WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind (const Callable &p_action, bool p_high_priority, const String &p_description) {
400+ return _add_task (p_action, nullptr , nullptr , nullptr , p_high_priority, p_description, false );
363401}
364402
365403bool WorkerThreadPool::is_task_completed (TaskID p_task_id) const {
@@ -510,7 +548,12 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
510548
511549 if (p_caller_pool_thread->pool ->task_queue .first ()) {
512550 task_to_process = task_queue.first ()->self ();
513- task_queue.remove (task_queue.first ());
551+ if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true ) && task_to_process->is_pump_task ) {
552+ task_to_process = nullptr ;
553+ _notify_threads (p_caller_pool_thread, 1 , 0 );
554+ } else {
555+ task_queue.remove (task_queue.first ());
556+ }
514557 }
515558
516559 if (!task_to_process) {
@@ -661,7 +704,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
661704
662705 groups[id] = group;
663706
664- _post_tasks (tasks_posted, p_tasks, p_high_priority, lock);
707+ _post_tasks (tasks_posted, p_tasks, p_high_priority, lock, false );
665708
666709 return id;
667710}
@@ -788,6 +831,11 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
788831
789832 print_verbose (vformat (" WorkerThreadPool: %d threads, %d max low-priority." , p_thread_count, max_low_priority_threads));
790833
834+ #ifdef THREADS_ENABLED
835+ // Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks.
836+ // We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed.
837+ threads.reserve (5 );
838+ #endif
791839 threads.resize (p_thread_count);
792840
793841 Thread::Settings settings;
@@ -862,7 +910,7 @@ void WorkerThreadPool::finish() {
862910}
863911
864912void WorkerThreadPool::_bind_methods () {
865- ClassDB::bind_method (D_METHOD (" add_task" , " action" , " high_priority" , " description" ), &WorkerThreadPool::add_task , DEFVAL (false ), DEFVAL (String ()));
913+ ClassDB::bind_method (D_METHOD (" add_task" , " action" , " high_priority" , " description" ), &WorkerThreadPool::add_task_bind , DEFVAL (false ), DEFVAL (String ()));
866914 ClassDB::bind_method (D_METHOD (" is_task_completed" , " task_id" ), &WorkerThreadPool::is_task_completed);
867915 ClassDB::bind_method (D_METHOD (" wait_for_task_completion" , " task_id" ), &WorkerThreadPool::wait_for_task_completion);
868916 ClassDB::bind_method (D_METHOD (" get_caller_task_id" ), &WorkerThreadPool::get_caller_task_id);
0 commit comments