Skip to content

Commit 5b5cdf2

Browse files
committed
Fixup recent changes to threading concerns
ResourceLoader: - Fix invalid tokens being returned. - Remove no longer written `ThreadLoadTask::dependent_path` and the code reading from it. - Clear deadlock hazard by keeping the mutex unlocked during userland polling. WorkerThreadPool: - Include thread call queue override in the thread state reset set, which allows to simplify the code that handled that (imperfectly) in the ResourceLoader. - Handle the mutex type correctly on entering an allowance zone. CommandQueueMT: - Handle the additional possibility of command buffer reallocation that mutex unlock allowance introduces.
1 parent 10b543f commit 5b5cdf2

File tree

4 files changed

+48
-44
lines changed

4 files changed

+48
-44
lines changed

core/io/resource_loader.cpp

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -304,31 +304,23 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
304304
thread_load_mutex.unlock();
305305

306306
// Thread-safe either if it's the current thread or a brand new one.
307-
thread_local bool mq_override_present = false;
308307
CallQueue *own_mq_override = nullptr;
309308
if (load_nesting == 0) {
310-
mq_override_present = false;
311309
load_paths_stack = memnew(Vector<String>);
312310

313-
if (!load_task.dependent_path.is_empty()) {
314-
load_paths_stack->push_back(load_task.dependent_path);
315-
}
316311
if (!Thread::is_main_thread()) {
317312
// Let the caller thread use its own, for added flexibility. Provide one otherwise.
318313
if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) {
319314
own_mq_override = memnew(CallQueue);
320315
MessageQueue::set_thread_singleton_override(own_mq_override);
321316
}
322-
mq_override_present = true;
323317
set_current_thread_safe_for_nodes(true);
324318
}
325-
} else {
326-
DEV_ASSERT(load_task.dependent_path.is_empty());
327319
}
328320
// --
329321

330322
Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress);
331-
if (mq_override_present) {
323+
if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) {
332324
MessageQueue::get_singleton()->flush();
333325
}
334326

@@ -473,12 +465,13 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
473465

474466
if (!ignoring_cache && thread_load_tasks.has(local_path)) {
475467
load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token);
476-
if (!load_token.is_valid()) {
468+
if (load_token.is_valid()) {
469+
return load_token;
470+
} else {
477471
// The token is dying (reached 0 on another thread).
478472
// Ensure it's killed now so the path can be safely reused right away.
479473
thread_load_tasks[local_path].load_token->clear();
480474
}
481-
return load_token;
482475
}
483476

484477
load_token.instantiate();
@@ -560,39 +553,46 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) {
560553
}
561554

562555
ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) {
563-
MutexLock thread_load_lock(thread_load_mutex);
556+
bool ensure_progress = false;
557+
ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS;
558+
{
559+
MutexLock thread_load_lock(thread_load_mutex);
564560

565-
if (!user_load_tokens.has(p_path)) {
566-
print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
567-
return THREAD_LOAD_INVALID_RESOURCE;
568-
}
561+
if (!user_load_tokens.has(p_path)) {
562+
print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
563+
return THREAD_LOAD_INVALID_RESOURCE;
564+
}
569565

570-
String local_path = _validate_local_path(p_path);
571-
if (!thread_load_tasks.has(local_path)) {
566+
String local_path = _validate_local_path(p_path);
567+
if (!thread_load_tasks.has(local_path)) {
572568
#ifdef DEV_ENABLED
573-
CRASH_NOW();
569+
CRASH_NOW();
574570
#endif
575-
// On non-dev, be defensive and at least avoid crashing (at this point at least).
576-
return THREAD_LOAD_INVALID_RESOURCE;
577-
}
571+
// On non-dev, be defensive and at least avoid crashing (at this point at least).
572+
return THREAD_LOAD_INVALID_RESOURCE;
573+
}
578574

579-
ThreadLoadTask &load_task = thread_load_tasks[local_path];
580-
ThreadLoadStatus status;
581-
status = load_task.status;
582-
if (r_progress) {
583-
*r_progress = _dependency_get_progress(local_path);
584-
}
575+
ThreadLoadTask &load_task = thread_load_tasks[local_path];
576+
status = load_task.status;
577+
if (r_progress) {
578+
*r_progress = _dependency_get_progress(local_path);
579+
}
585580

586-
// Support userland polling in a loop on the main thread.
587-
if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) {
588-
uint64_t frame = Engine::get_singleton()->get_process_frames();
589-
if (frame == load_task.last_progress_check_main_thread_frame) {
590-
_ensure_load_progress();
591-
} else {
592-
load_task.last_progress_check_main_thread_frame = frame;
581+
// Support userland polling in a loop on the main thread.
582+
if (Thread::is_main_thread() && status == THREAD_LOAD_IN_PROGRESS) {
583+
uint64_t frame = Engine::get_singleton()->get_process_frames();
584+
if (frame == load_task.last_progress_check_main_thread_frame) {
585+
ensure_progress = true;
586+
} else {
587+
load_task.last_progress_check_main_thread_frame = frame;
588+
}
593589
}
594590
}
595591

592+
if (ensure_progress) {
593+
_ensure_load_progress();
594+
}
595+
596596
return status;
597597
}
598598

@@ -626,13 +626,13 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
626626
if (Thread::is_main_thread() && !load_token->local_path.is_empty()) {
627627
const ThreadLoadTask &load_task = thread_load_tasks[load_token->local_path];
628628
while (load_task.status == THREAD_LOAD_IN_PROGRESS) {
629-
if (!_ensure_load_progress()) {
630-
// This local poll loop is not needed.
631-
break;
632-
}
633629
thread_load_lock.~MutexLock();
630+
bool exit = !_ensure_load_progress();
634631
OS::get_singleton()->delay_usec(1000);
635632
new (&thread_load_lock) MutexLock(thread_load_mutex);
633+
if (exit) {
634+
break;
635+
}
636636
}
637637
}
638638

core/io/resource_loader.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ class ResourceLoader {
170170
LoadToken *load_token = nullptr;
171171
String local_path;
172172
String remapped_path;
173-
String dependent_path;
174173
String type_hint;
175174
float progress = 0.0f;
176175
float max_reported_progress = 0.0f;

core/object/worker_thread_pool.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ void WorkerThreadPool::_process_task(Task *p_task) {
5959
CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr;
6060

6161
{
62-
// Tasks must start with this unset. They are free to set-and-forget otherwise.
62+
// Tasks must start with these at default values. They are free to set-and-forget otherwise.
6363
set_current_thread_safe_for_nodes(false);
64+
MessageQueue::set_thread_singleton_override(nullptr);
6465
// Since the WorkerThreadPool is started before the script server,
6566
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
6667
// Therefore, we do it late at the first opportunity, so in case the task
@@ -671,7 +672,7 @@ uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mut
671672

672673
uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
673674
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
674-
if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) {
675+
if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) {
675676
// Already registered in the current thread.
676677
return UINT32_MAX;
677678
}

core/templates/command_queue_mt.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,15 +370,19 @@ class CommandQueueMT {
370370
flush_read_ptr += 8;
371371
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
372372
cmd->call();
373+
374+
// Handle potential realloc due to the command and unlock allowance.
375+
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
376+
373377
if (unlikely(cmd->sync)) {
374378
sync_head++;
375379
unlock(); // Give an opportunity to awaiters right away.
376380
sync_cond_var.notify_all();
377381
lock();
382+
// Handle potential realloc happened during unlock.
383+
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
378384
}
379385

380-
// If the command involved reallocating the buffer, the address may have changed.
381-
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
382386
cmd->~CommandBase();
383387

384388
flush_read_ptr += size;

0 commit comments

Comments
 (0)