-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEventQueue.mpp
More file actions
108 lines (95 loc) · 2.78 KB
/
EventQueue.mpp
File metadata and controls
108 lines (95 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
export module CppUtils.Execution.EventQueue;
import std;
import CppUtils.String.Hash;
import CppUtils.Execution.EventDispatcher;
import CppUtils.Execution.ScopeGuard;
import CppUtils.Thread.ThreadLoop;
import CppUtils.Thread.UniqueLocker;
import CppUtils.Chrono.Concept;
export namespace CppUtils::Execution
{
class EventQueue
{
public:
inline EventQueue():
m_worker{
[this] { workerThread(); },
[this] { m_condition.notify_all(); }}
{
m_worker.start();
}
inline ~EventQueue()
{
waitUntilFinished();
}
EventQueue(const EventQueue&) = delete;
EventQueue& operator=(const EventQueue&) = delete;
EventQueue(EventQueue&&) = delete;
EventQueue& operator=(EventQueue&&) = delete;
template<String::Hasher eventName = Type::Hash{}, class... Args>
inline auto emit(Args&&... args) -> void
{
emit(static_cast<Type::Hash>(eventName), std::forward<Args>(args)...);
}
template<class... Args>
inline auto emit(Type::Hash eventName, Args&&... args) -> void
{
auto task = [this, eventName, ... args = std::forward<Args>(args)]() mutable {
m_dispatcher.emit(eventName, std::move(args)...);
};
enqueue(std::move(task));
}
template<String::Hasher eventName = Type::Hash{}>
inline auto subscribe(auto&& function) -> void
{
m_dispatcher.subscribe<eventName>(std::forward<decltype(function)>(function));
}
template<Chrono::Duration Duration = std::chrono::milliseconds>
inline auto waitUntilFinished(Duration timeout = Duration::zero()) -> bool
{
auto accessor = m_queue.access();
auto predicate = [this, &accessor] { return std::empty(accessor.value()) and not m_isTaskRunning; };
if (timeout == Duration::zero())
{
m_condition.wait(accessor.getLockGuard(), predicate);
return true;
}
return m_condition.wait_for(accessor.getLockGuard(), timeout, predicate);
}
private:
inline auto enqueue(std::function<void()> task) -> void
{
{
auto accessor = m_queue.access();
accessor.value().push(std::move(task));
}
m_condition.notify_one();
}
inline auto workerThread() -> void
{
auto task = std::function<void()>{};
{
auto accessor = m_queue.access();
m_condition.wait(accessor.getLockGuard(), [this, &accessor] {
return not std::empty(accessor.value()) or m_worker.isStopRequested();
});
if (std::empty(accessor.value()))
return;
m_isTaskRunning = true;
task = std::move(accessor.value().front());
accessor.value().pop();
}
if (task)
{
auto _ = ScopeGuard{[this] { m_isTaskRunning = false; }};
task();
}
m_condition.notify_all();
}
EventDispatcher m_dispatcher;
Thread::UniqueLocker<std::queue<std::function<void()>>> m_queue;
std::condition_variable m_condition;
Thread::ThreadLoop m_worker;
std::atomic<bool> m_isTaskRunning = false;
};
}