Skip to content

Commit df246e2

Browse files
committed
Work queue shutdown fixes
1 parent 9305ca8 commit df246e2

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

Core/AppRuntime/Source/WorkQueue.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ namespace Babylon
4040

4141
void WorkQueue::Run(Napi::Env env)
4242
{
43-
m_env = std::make_optional(env);
4443
m_dispatcher.set_affinity(std::this_thread::get_id());
4544

45+
m_env = std::make_optional(env);
46+
4647
while (!m_cancellationSource.cancelled())
4748
{
4849
m_dispatcher.blocking_tick(m_cancellationSource);
@@ -53,5 +54,10 @@ namespace Babylon
5354

5455
// There should no longer be any outstanding work once the queue is drained.
5556
assert(m_dispatcher.empty());
57+
58+
// Clear the shutdown queue to make sure the callables are destroyed on this thread.
59+
m_shutdownQueue.clear();
60+
61+
m_env.reset();
5662
}
5763
}

Core/AppRuntime/Source/WorkQueue.h

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,33 @@ namespace Babylon
1919
template<typename CallableT>
2020
void Append(CallableT callable)
2121
{
22-
if (m_cancellationSource.cancelled())
23-
{
24-
// There is likely a coding error if this exception is thrown.
25-
throw std::runtime_error{"Cannot append to the work queue while shutting down"};
26-
}
27-
2822
// Manual dispatcher queueing requires a copyable CallableT, we use a shared pointer trick to make a
2923
// copyable callable if necessary.
3024
if constexpr (std::is_copy_constructible<CallableT>::value)
3125
{
32-
m_dispatcher.queue([this, callable = std::move(callable)]() { Invoke(callable); });
26+
if (m_cancellationSource.cancelled())
27+
{
28+
m_shutdownQueue.push([callable = std::move(callable)] {});
29+
}
30+
else
31+
{
32+
m_dispatcher.queue([this, callable = std::move(callable)]() {
33+
callable(m_env.value());
34+
});
35+
}
3336
}
3437
else
3538
{
36-
m_dispatcher.queue([this, callablePtr = std::make_shared<CallableT>(std::move(callable))]() { Invoke(*callablePtr); });
39+
if (m_cancellationSource.cancelled())
40+
{
41+
m_shutdownQueue.push([callablePtr = std::make_shared<CallableT>(std::move(callable))] {});
42+
}
43+
else
44+
{
45+
m_dispatcher.queue([this, callablePtr = std::make_shared<CallableT>(std::move(callable))]() {
46+
(*callablePtr)(m_env.value());
47+
});
48+
}
3749
}
3850
}
3951

@@ -42,18 +54,17 @@ namespace Babylon
4254
void Run(Napi::Env);
4355

4456
private:
45-
template<typename CallableT>
46-
void Invoke(CallableT& callable)
47-
{
48-
callable(m_env.value());
49-
}
50-
5157
std::optional<Napi::Env> m_env{};
5258

5359
std::optional<std::scoped_lock<std::mutex>> m_suspensionLock{};
5460

5561
arcana::cancellation_source m_cancellationSource{};
56-
arcana::manual_dispatcher<128> m_dispatcher{};
62+
63+
using DispatcherT = arcana::manual_dispatcher<128>;
64+
DispatcherT m_dispatcher{};
65+
66+
// Put the callables in a separate queue during shutdown to ensure the callables are destroyed on the right thread.
67+
arcana::blocking_concurrent_queue<DispatcherT::callback_t> m_shutdownQueue{};
5768

5869
std::thread m_thread{};
5970
};

0 commit comments

Comments
 (0)