Skip to content

Commit 8c1cdaf

Browse files
committed
more flexible abstractions
1 parent 7267bf1 commit 8c1cdaf

File tree

6 files changed

+164
-83
lines changed

6 files changed

+164
-83
lines changed

include/thread_pool/thread_pool.hpp

Lines changed: 30 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#pragma once
22

3+
#include <thread_pool/fixed_function.hpp>
4+
#include <thread_pool/mpsc_bounded_queue.hpp>
5+
#include <thread_pool/thread_pool_options.hpp>
36
#include <thread_pool/worker.hpp>
47

58
#include <atomic>
@@ -10,20 +13,10 @@
1013
namespace tp
1114
{
1215

13-
/**
14-
* @brief The ThreadPoolOptions struct provides construction options for
15-
* ThreadPool.
16-
*/
17-
struct ThreadPoolOptions
18-
{
19-
enum
20-
{
21-
AUTODETECT = 0
22-
};
23-
24-
size_t threads_count = AUTODETECT;
25-
size_t worker_queue_size = 1024;
26-
};
16+
template <typename Task, template<typename> class Queue>
17+
class ThreadPoolImpl;
18+
using ThreadPool = ThreadPoolImpl<FixedFunction<void(), 128>,
19+
MPMCBoundedQueue>;
2720

2821
/**
2922
* @brief The ThreadPool class implements thread pool pattern.
@@ -33,7 +26,7 @@ struct ThreadPoolOptions
3326
* startegies.
3427
* It implements cooperative scheduling strategy for tasks.
3528
*/
36-
template <size_t TASK_SIZE>
29+
template <typename Task, template<typename> class Queue>
3730
class ThreadPoolImpl {
3831
public:
3932
/**
@@ -79,66 +72,52 @@ class ThreadPoolImpl {
7972
void post(Handler&& handler);
8073

8174
private:
82-
Worker<TASK_SIZE>& getWorker();
75+
Worker<Task, Queue>& getWorker();
8376

84-
std::vector<std::unique_ptr<Worker<TASK_SIZE>>> m_workers;
77+
std::vector<std::unique_ptr<Worker<Task, Queue>>> m_workers;
8578
std::atomic<size_t> m_next_worker;
8679
};
8780

88-
using ThreadPool = ThreadPoolImpl<128>;
89-
9081

9182
/// Implementation
9283

93-
template <size_t TASK_SIZE>
94-
inline ThreadPoolImpl<TASK_SIZE>::ThreadPoolImpl(
84+
template <typename Task, template<typename> class Queue>
85+
inline ThreadPoolImpl<Task, Queue>::ThreadPoolImpl(
9586
const ThreadPoolOptions& options)
96-
: m_next_worker(0)
87+
: m_workers(options.threadCount())
88+
, m_next_worker(0)
9789
{
98-
size_t workers_count = options.threads_count;
99-
100-
if(ThreadPoolOptions::AUTODETECT == options.threads_count)
101-
{
102-
workers_count = std::thread::hardware_concurrency();
103-
}
104-
105-
if(0 == workers_count)
106-
{
107-
workers_count = 1;
108-
}
109-
110-
m_workers.resize(workers_count);
11190
for(auto& worker_ptr : m_workers)
11291
{
113-
worker_ptr.reset(new Worker<TASK_SIZE>(options.worker_queue_size));
92+
worker_ptr.reset(new Worker<Task, Queue>(options.queueSize()));
11493
}
11594

11695
for(size_t i = 0; i < m_workers.size(); ++i)
11796
{
118-
Worker<TASK_SIZE>* steal_donor =
97+
Worker<Task, Queue>* steal_donor =
11998
m_workers[(i + 1) % m_workers.size()].get();
12099
m_workers[i]->start(i, steal_donor);
121100
}
122101
}
123102

124-
template <size_t TASK_SIZE>
125-
inline ThreadPoolImpl<TASK_SIZE>::ThreadPoolImpl(ThreadPoolImpl<TASK_SIZE>&& rhs) noexcept
103+
template <typename Task, template<typename> class Queue>
104+
inline ThreadPoolImpl<Task, Queue>::ThreadPoolImpl(ThreadPoolImpl<Task, Queue>&& rhs) noexcept
126105
{
127106
*this = rhs;
128107
}
129108

130-
template <size_t TASK_SIZE>
131-
inline ThreadPoolImpl<TASK_SIZE>::~ThreadPoolImpl()
109+
template <typename Task, template<typename> class Queue>
110+
inline ThreadPoolImpl<Task, Queue>::~ThreadPoolImpl()
132111
{
133112
for (auto& worker_ptr : m_workers)
134113
{
135114
worker_ptr->stop();
136115
}
137116
}
138117

139-
template <size_t TASK_SIZE>
140-
inline ThreadPoolImpl<TASK_SIZE>&
141-
ThreadPoolImpl<TASK_SIZE>::operator=(ThreadPoolImpl<TASK_SIZE>&& rhs) noexcept
118+
template <typename Task, template<typename> class Queue>
119+
inline ThreadPoolImpl<Task, Queue>&
120+
ThreadPoolImpl<Task, Queue>::operator=(ThreadPoolImpl<Task, Queue>&& rhs) noexcept
142121
{
143122
if (this != &rhs)
144123
{
@@ -148,16 +127,16 @@ ThreadPoolImpl<TASK_SIZE>::operator=(ThreadPoolImpl<TASK_SIZE>&& rhs) noexcept
148127
return *this;
149128
}
150129

151-
template <size_t TASK_SIZE>
130+
template <typename Task, template<typename> class Queue>
152131
template <typename Handler>
153-
inline bool ThreadPoolImpl<TASK_SIZE>::tryPost(Handler&& handler)
132+
inline bool ThreadPoolImpl<Task, Queue>::tryPost(Handler&& handler)
154133
{
155134
return getWorker().post(std::forward<Handler>(handler));
156135
}
157136

158-
template <size_t TASK_SIZE>
137+
template <typename Task, template<typename> class Queue>
159138
template <typename Handler>
160-
inline void ThreadPoolImpl<TASK_SIZE>::post(Handler&& handler)
139+
inline void ThreadPoolImpl<Task, Queue>::post(Handler&& handler)
161140
{
162141
const auto ok = tryPost(std::forward<Handler>(handler));
163142
if (!ok)
@@ -166,10 +145,10 @@ inline void ThreadPoolImpl<TASK_SIZE>::post(Handler&& handler)
166145
}
167146
}
168147

169-
template <size_t TASK_SIZE>
170-
inline Worker<TASK_SIZE>& ThreadPoolImpl<TASK_SIZE>::getWorker()
148+
template <typename Task, template<typename> class Queue>
149+
inline Worker<Task, Queue>& ThreadPoolImpl<Task, Queue>::getWorker()
171150
{
172-
auto id = Worker<TASK_SIZE>::getWorkerIdForCurrentThread();
151+
auto id = Worker<Task, Queue>::getWorkerIdForCurrentThread();
173152

174153
if (id > m_workers.size())
175154
{
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <algorithm>
5+
#include <thread>
6+
7+
namespace tp
8+
{
9+
10+
/**
11+
* @brief The ThreadPoolOptions class provides creation options for
12+
* ThreadPool.
13+
*/
14+
class ThreadPoolOptions
15+
{
16+
public:
17+
/**
18+
* @brief ThreadPoolOptions Construct default options for thread pool.
19+
*/
20+
ThreadPoolOptions();
21+
22+
/**
23+
* @brief setThreadCount Set thread count.
24+
* @param count Number of threads to be created.
25+
*/
26+
void setThreadCount(size_t count);
27+
28+
/**
29+
* @brief setQueueSize Set single worker queue size.
30+
* @param count Maximum length of queue of single worker.
31+
*/
32+
void setQueueSize(size_t size);
33+
34+
/**
35+
* @brief threadCount Return thread count.
36+
*/
37+
size_t threadCount() const;
38+
39+
/**
40+
* @brief queueSize Return single worker queue size.
41+
*/
42+
size_t queueSize() const;
43+
44+
private:
45+
size_t m_thread_count;
46+
size_t m_queue_size;
47+
};
48+
49+
/// Implementation
50+
51+
ThreadPoolOptions::ThreadPoolOptions()
52+
: m_thread_count(std::max<size_t>(1u, std::thread::hardware_concurrency()))
53+
, m_queue_size(1024u)
54+
{
55+
}
56+
57+
void ThreadPoolOptions::setThreadCount(size_t count)
58+
{
59+
m_thread_count = std::max<size_t>(1u, count);
60+
}
61+
62+
void ThreadPoolOptions::setQueueSize(size_t size)
63+
{
64+
m_queue_size = std::max<size_t>(1u, size);
65+
}
66+
67+
size_t ThreadPoolOptions::threadCount() const
68+
{
69+
return m_thread_count;
70+
}
71+
72+
size_t ThreadPoolOptions::queueSize() const
73+
{
74+
return m_queue_size;
75+
}
76+
77+
}

include/thread_pool/worker.hpp

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
#pragma once
22

3-
#include <thread_pool/fixed_function.hpp>
4-
#include <thread_pool/mpsc_bounded_queue.hpp>
5-
63
#include <atomic>
74
#include <thread>
85

@@ -11,17 +8,14 @@ namespace tp
118

129
/**
1310
* @brief The Worker class owns task queue and executing thread.
14-
* In executing thread it tries to pop task from queue. If queue is empty
15-
* then it tries to steal task from the sibling worker. If stealing was
16-
* unsuccessful
17-
* then spins with one millisecond delay.
11+
* In thread it tries to pop task from queue. If queue is empty then it tries
12+
* to steal task from the sibling worker. If steal was unsuccessful then spins
13+
* with one millisecond delay.
1814
*/
19-
template <size_t TASK_SIZE>
15+
template <typename Task, template<typename> class Queue>
2016
class Worker
2117
{
2218
public:
23-
using Task = FixedFunction<void(), TASK_SIZE>;
24-
2519
/**
2620
* @brief Worker Constructor.
2721
* @param queue_size Length of undelaying task queue.
@@ -81,7 +75,7 @@ class Worker
8175
*/
8276
void threadFunc(size_t id, Worker* steal_donor);
8377

84-
MPMCBoundedQueue<Task> m_queue;
78+
Queue<Task> m_queue;
8579
std::atomic<bool> m_running_flag;
8680
std::thread m_thread;
8781
};
@@ -98,21 +92,21 @@ namespace detail
9892
}
9993
}
10094

101-
template <size_t TASK_SIZE>
102-
inline Worker<TASK_SIZE>::Worker(size_t queue_size)
95+
template <typename Task, template<typename> class Queue>
96+
inline Worker<Task, Queue>::Worker(size_t queue_size)
10397
: m_queue(queue_size)
10498
, m_running_flag(true)
10599
{
106100
}
107101

108-
template <size_t TASK_SIZE>
109-
inline Worker<TASK_SIZE>::Worker(Worker&& rhs) noexcept
102+
template <typename Task, template<typename> class Queue>
103+
inline Worker<Task, Queue>::Worker(Worker&& rhs) noexcept
110104
{
111105
*this = rhs;
112106
}
113107

114-
template <size_t TASK_SIZE>
115-
inline Worker<TASK_SIZE>& Worker<TASK_SIZE>::operator=(Worker&& rhs) noexcept
108+
template <typename Task, template<typename> class Queue>
109+
inline Worker<Task, Queue>& Worker<Task, Queue>::operator=(Worker&& rhs) noexcept
116110
{
117111
if (this != &rhs)
118112
{
@@ -123,40 +117,40 @@ inline Worker<TASK_SIZE>& Worker<TASK_SIZE>::operator=(Worker&& rhs) noexcept
123117
return *this;
124118
}
125119

126-
template <size_t TASK_SIZE>
127-
inline void Worker<TASK_SIZE>::stop()
120+
template <typename Task, template<typename> class Queue>
121+
inline void Worker<Task, Queue>::stop()
128122
{
129123
m_running_flag.store(false, std::memory_order_relaxed);
130124
m_thread.join();
131125
}
132126

133-
template <size_t TASK_SIZE>
134-
inline void Worker<TASK_SIZE>::start(size_t id, Worker* steal_donor)
127+
template <typename Task, template<typename> class Queue>
128+
inline void Worker<Task, Queue>::start(size_t id, Worker* steal_donor)
135129
{
136-
m_thread = std::thread(&Worker<TASK_SIZE>::threadFunc, this, id, steal_donor);
130+
m_thread = std::thread(&Worker<Task, Queue>::threadFunc, this, id, steal_donor);
137131
}
138132

139-
template <size_t TASK_SIZE>
140-
inline size_t Worker<TASK_SIZE>::getWorkerIdForCurrentThread()
133+
template <typename Task, template<typename> class Queue>
134+
inline size_t Worker<Task, Queue>::getWorkerIdForCurrentThread()
141135
{
142136
return *detail::thread_id();
143137
}
144138

145-
template <size_t TASK_SIZE>
139+
template <typename Task, template<typename> class Queue>
146140
template <typename Handler>
147-
inline bool Worker<TASK_SIZE>::post(Handler&& handler)
141+
inline bool Worker<Task, Queue>::post(Handler&& handler)
148142
{
149143
return m_queue.push(std::forward<Handler>(handler));
150144
}
151145

152-
template <size_t TASK_SIZE>
153-
inline bool Worker<TASK_SIZE>::steal(Task& task)
146+
template <typename Task, template<typename> class Queue>
147+
inline bool Worker<Task, Queue>::steal(Task& task)
154148
{
155149
return m_queue.pop(task);
156150
}
157151

158-
template <size_t TASK_SIZE>
159-
inline void Worker<TASK_SIZE>::threadFunc(size_t id, Worker* steal_donor)
152+
template <typename Task, template<typename> class Queue>
153+
inline void Worker<Task, Queue>::threadFunc(size_t id, Worker* steal_donor)
160154
{
161155
*detail::thread_id() = id;
162156

@@ -172,7 +166,7 @@ inline void Worker<TASK_SIZE>::threadFunc(size_t id, Worker* steal_donor)
172166
}
173167
catch(...)
174168
{
175-
// supress all exceptions
169+
// suppress all exceptions
176170
}
177171
}
178172
else

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ endfunction()
1010

1111
build_test(fixed_function fixed_function.t.cpp)
1212
build_test(thread_pool thread_pool.t.cpp)
13+
build_test(thread_pool_options thread_pool_options.t.cpp)

tests/thread_pool.t.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ size_t getWorkerIdForCurrentThread()
1515

1616
size_t getWorkerIdForCurrentThread2()
1717
{
18-
return tp::Worker<128>::getWorkerIdForCurrentThread();
18+
return tp::Worker<std::function<void()>, tp::MPMCBoundedQueue>::getWorkerIdForCurrentThread();
1919
}
2020
}
2121

0 commit comments

Comments
 (0)