Skip to content

Commit 05d9854

Browse files
committed
Merge pull request godotengine#96593 from RandomShaper/res_changed_multiverse
ResourceLoader: Add thread-aware resource changed mechanism
2 parents c2d81b0 + 74b9c38 commit 05d9854

File tree

3 files changed

+139
-49
lines changed

3 files changed

+139
-49
lines changed

core/io/resource.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@
4040
#include <stdio.h>
4141

4242
void Resource::emit_changed() {
43-
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
44-
// Let the connection happen on the call queue, later, since signals are not thread-safe.
45-
call_deferred("emit_signal", CoreStringName(changed));
46-
} else {
47-
emit_signal(CoreStringName(changed));
43+
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
44+
ResourceLoader::resource_changed_emit(this);
45+
return;
4846
}
47+
48+
emit_signal(CoreStringName(changed));
4949
}
5050

5151
void Resource::_resource_path_changed() {
@@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() {
166166
}
167167

168168
void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) {
169-
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
170-
// Let the check and connection happen on the call queue, later, since signals are not thread-safe.
171-
callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags);
169+
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
170+
ResourceLoader::resource_changed_connect(this, p_callable, p_flags);
172171
return;
173172
}
173+
174174
if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) {
175175
connect(CoreStringName(changed), p_callable, p_flags);
176176
}
177177
}
178178

179179
void Resource::disconnect_changed(const Callable &p_callable) {
180-
if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) {
181-
// Let the check and disconnection happen on the call queue, later, since signals are not thread-safe.
182-
callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable);
180+
if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) {
181+
ResourceLoader::resource_changed_disconnect(this, p_callable);
183182
return;
184183
}
184+
185185
if (is_connected(CoreStringName(changed), p_callable)) {
186186
disconnect(CoreStringName(changed), p_callable);
187187
}

core/io/resource_loader.cpp

Lines changed: 113 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "resource_loader.h"
3232

3333
#include "core/config/project_settings.h"
34+
#include "core/core_bind.h"
3435
#include "core/io/file_access.h"
3536
#include "core/io/resource_importer.h"
3637
#include "core/object/script_language.h"
@@ -234,17 +235,22 @@ void ResourceLoader::LoadToken::clear() {
234235
// User-facing tokens shouldn't be deleted until completely claimed.
235236
DEV_ASSERT(user_rc == 0 && user_path.is_empty());
236237

237-
if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
238-
DEV_ASSERT(thread_load_tasks.has(local_path));
239-
ThreadLoadTask &load_task = thread_load_tasks[local_path];
240-
if (load_task.task_id && !load_task.awaited) {
241-
task_to_await = load_task.task_id;
238+
if (!local_path.is_empty()) {
239+
if (task_if_unregistered) {
240+
memdelete(task_if_unregistered);
241+
task_if_unregistered = nullptr;
242+
} else {
243+
DEV_ASSERT(thread_load_tasks.has(local_path));
244+
ThreadLoadTask &load_task = thread_load_tasks[local_path];
245+
if (load_task.task_id && !load_task.awaited) {
246+
task_to_await = load_task.task_id;
247+
}
248+
// Removing a task which is still in progress would be catastrophic.
249+
// Tokens must be alive until the task thread function is done.
250+
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
251+
thread_load_tasks.erase(local_path);
242252
}
243-
// Removing a task which is still in progress would be catastrophic.
244-
// Tokens must be alive until the task thread function is done.
245-
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
246-
thread_load_tasks.erase(local_path);
247-
local_path.clear();
253+
local_path.clear(); // Mark as already cleared.
248254
}
249255
}
250256

@@ -324,6 +330,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
324330
}
325331
}
326332

333+
ThreadLoadTask *curr_load_task_backup = curr_load_task;
334+
curr_load_task = &load_task;
335+
327336
// Thread-safe either if it's the current thread or a brand new one.
328337
CallQueue *own_mq_override = nullptr;
329338
if (load_nesting == 0) {
@@ -451,6 +460,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) {
451460
}
452461
DEV_ASSERT(load_paths_stack.is_empty());
453462
}
463+
464+
curr_load_task = curr_load_task_backup;
454465
}
455466

456467
static String _validate_local_path(const String &p_path) {
@@ -521,9 +532,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
521532

522533
Ref<LoadToken> load_token;
523534
bool must_not_register = false;
524-
ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load.
525535
ThreadLoadTask *load_task_ptr = nullptr;
526-
bool run_on_current_thread = false;
527536
{
528537
MutexLock thread_load_lock(thread_load_mutex);
529538

@@ -578,22 +587,19 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
578587
}
579588
}
580589

581-
// If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope.
590+
// If we want to ignore cache, but there's another task loading it, we can't add this one to the map.
582591
must_not_register = ignoring_cache && thread_load_tasks.has(local_path);
583592
if (must_not_register) {
584-
load_token->local_path.clear();
585-
unregistered_load_task = load_task;
586-
load_task_ptr = &unregistered_load_task;
593+
load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task));
594+
load_task_ptr = load_token->task_if_unregistered;
587595
} else {
588596
DEV_ASSERT(!thread_load_tasks.has(local_path));
589597
HashMap<String, ResourceLoader::ThreadLoadTask>::Iterator E = thread_load_tasks.insert(local_path, load_task);
590598
load_task_ptr = &E->value;
591599
}
592600
}
593601

594-
run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;
595-
596-
if (run_on_current_thread) {
602+
if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
597603
// The current thread may happen to be a thread from the pool.
598604
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
599605
if (tid != WorkerThreadPool::INVALID_TASK_ID) {
@@ -606,11 +612,8 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
606612
}
607613
} // MutexLock(thread_load_mutex).
608614

609-
if (run_on_current_thread) {
615+
if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) {
610616
_run_load_task(load_task_ptr);
611-
if (must_not_register) {
612-
load_token->res_if_unregistered = load_task_ptr->resource;
613-
}
614617
}
615618

616619
return load_token;
@@ -738,7 +741,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
738741
*r_error = OK;
739742
}
740743

741-
if (!p_load_token.local_path.is_empty()) {
744+
ThreadLoadTask *load_task_ptr = nullptr;
745+
if (p_load_token.task_if_unregistered) {
746+
load_task_ptr = p_load_token.task_if_unregistered;
747+
} else {
742748
if (!thread_load_tasks.has(p_load_token.local_path)) {
743749
if (r_error) {
744750
*r_error = ERR_BUG;
@@ -809,22 +815,47 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
809815
load_task.error = FAILED;
810816
}
811817

812-
Ref<Resource> resource = load_task.resource;
813-
if (r_error) {
814-
*r_error = load_task.error;
815-
}
816-
return resource;
817-
} else {
818-
// Special case of an unregistered task.
819-
// The resource should have been loaded by now.
820-
Ref<Resource> resource = p_load_token.res_if_unregistered;
821-
if (!resource.is_valid()) {
822-
if (r_error) {
823-
*r_error = FAILED;
818+
load_task_ptr = &load_task;
819+
}
820+
821+
Ref<Resource> resource = load_task_ptr->resource;
822+
if (r_error) {
823+
*r_error = load_task_ptr->error;
824+
}
825+
826+
if (resource.is_valid()) {
827+
if (curr_load_task) {
828+
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
829+
DEV_ASSERT(curr_load_task != load_task_ptr);
830+
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
831+
curr_load_task->resource_changed_connections.push_back(rcc);
832+
}
833+
} else {
834+
// A leaf task being awaited => Propagate the resource changed connections.
835+
if (Thread::is_main_thread()) {
836+
// On the main thread it's safe to migrate the connections to the standard signal mechanism.
837+
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
838+
if (rcc.callable.is_valid()) {
839+
rcc.source->connect_changed(rcc.callable, rcc.flags);
840+
}
841+
}
842+
} else {
843+
// On non-main threads, we have to queue and call it done when processed.
844+
if (!load_task_ptr->resource_changed_connections.is_empty()) {
845+
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
846+
if (rcc.callable.is_valid()) {
847+
MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags));
848+
}
849+
}
850+
core_bind::Semaphore done;
851+
MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post));
852+
done.wait();
853+
}
824854
}
825855
}
826-
return resource;
827856
}
857+
858+
return resource;
828859
}
829860

830861
bool ResourceLoader::_ensure_load_progress() {
@@ -838,6 +869,50 @@ bool ResourceLoader::_ensure_load_progress() {
838869
return true;
839870
}
840871

872+
void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) {
873+
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
874+
875+
MutexLock lock(thread_load_mutex);
876+
877+
for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
878+
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
879+
return;
880+
}
881+
}
882+
883+
ThreadLoadTask::ResourceChangedConnection rcc;
884+
rcc.source = p_source;
885+
rcc.callable = p_callable;
886+
rcc.flags = p_flags;
887+
curr_load_task->resource_changed_connections.push_back(rcc);
888+
}
889+
890+
void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) {
891+
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id()));
892+
893+
MutexLock lock(thread_load_mutex);
894+
895+
for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) {
896+
const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i];
897+
if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) {
898+
curr_load_task->resource_changed_connections.remove_at_unordered(i);
899+
return;
900+
}
901+
}
902+
}
903+
904+
void ResourceLoader::resource_changed_emit(Resource *p_source) {
905+
print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class()));
906+
907+
MutexLock lock(thread_load_mutex);
908+
909+
for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) {
910+
if (unlikely(rcc.source == p_source)) {
911+
rcc.callable.call();
912+
}
913+
}
914+
}
915+
841916
Ref<Resource> ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) {
842917
ERR_FAIL_COND_V(load_nesting == 0, Ref<Resource>()); // It makes no sense to use this from nesting level 0.
843918
const String &local_path = _validate_local_path(p_path);
@@ -1368,6 +1443,7 @@ bool ResourceLoader::timestamp_on_load = false;
13681443
thread_local int ResourceLoader::load_nesting = 0;
13691444
thread_local Vector<String> ResourceLoader::load_paths_stack;
13701445
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;
1446+
thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr;
13711447

13721448
SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> &_get_res_loader_mutex() {
13731449
return ResourceLoader::thread_load_mutex;

core/io/resource_loader.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ class ResourceLoader {
106106
MAX_LOADERS = 64
107107
};
108108

109+
struct ThreadLoadTask;
110+
109111
public:
110112
enum ThreadLoadStatus {
111113
THREAD_LOAD_INVALID_RESOURCE,
@@ -124,7 +126,7 @@ class ResourceLoader {
124126
String local_path;
125127
String user_path;
126128
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
127-
Ref<Resource> res_if_unregistered;
129+
ThreadLoadTask *task_if_unregistered = nullptr;
128130

129131
void clear();
130132

@@ -187,13 +189,21 @@ class ResourceLoader {
187189
Ref<Resource> resource;
188190
bool use_sub_threads = false;
189191
HashSet<String> sub_tasks;
192+
193+
struct ResourceChangedConnection {
194+
Resource *source = nullptr;
195+
Callable callable;
196+
uint32_t flags = 0;
197+
};
198+
LocalVector<ResourceChangedConnection> resource_changed_connections;
190199
};
191200

192201
static void _run_load_task(void *p_userdata);
193202

194203
static thread_local int load_nesting;
195204
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
196205
static thread_local Vector<String> load_paths_stack;
206+
static thread_local ThreadLoadTask *curr_load_task;
197207

198208
static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
199209
friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();
@@ -214,6 +224,10 @@ class ResourceLoader {
214224

215225
static bool is_within_load() { return load_nesting > 0; };
216226

227+
static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags);
228+
static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable);
229+
static void resource_changed_emit(Resource *p_source);
230+
217231
static Ref<Resource> load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr);
218232
static bool exists(const String &p_path, const String &p_type_hint = "");
219233

0 commit comments

Comments
 (0)