16
16
#include < vector>
17
17
#include < thread>
18
18
#include " common/macros.h"
19
-
19
+ #include " common/dedicated_thread_task.h"
20
+ #include " common/dedicated_thread_owner.h
20
21
21
22
namespace peloton {
22
23
@@ -32,10 +33,24 @@ class DedicatedThreadRegistry {
32
33
public:
33
34
DedicatedThreadRegistry() = default;
34
35
35
- ~DedicatedThreadRegistry ();
36
+ ~DedicatedThreadRegistry() {
37
+ // Note that if registry is shutting down, it doesn't matter whether
38
+ // owners are notified as this class should have the same life cycle
39
+ // as the entire peloton process.
40
+
41
+ for (auto &entry : thread_owners_table_) {
42
+ for (auto &task : entry.second) {
43
+ task->Terminate();
44
+ threads_table_[task.get()].join();
45
+ }
46
+ }
47
+ }
36
48
37
49
// TODO(tianyu): Remove when we remove singletons
38
- static DedicatedThreadRegistry &GetInstance ();
50
+ static DedicatedThreadRegistry &GetInstance() {
51
+ static DedicatedThreadRegistry registry;
52
+ return registry;
53
+ }
39
54
40
55
/**
41
56
*
@@ -47,7 +62,11 @@ class DedicatedThreadRegistry {
47
62
*/
48
63
template <typename Task>
49
64
void RegisterDedicatedThread(DedicatedThreadOwner *requester,
50
- std::shared_ptr<Task> task);
65
+ std::shared_ptr<Task> task) {
66
+ thread_owners_table_[requester].push_back(task);
67
+ requester->NotifyNewThread();
68
+ threads_table_.emplace(task.get(), std::thread([=] { task->RunTask(); }));
69
+ }
51
70
52
71
// TODO(tianyu): Add code for thread removal
53
72
0 commit comments