Skip to content

Commit f69a39d

Browse files
author
Andrei Lobov
authored
Feature/use nonpriority pool (iresearch-toolkit#570)
* new thread pool * fix * wip * add static
1 parent c3af257 commit f69a39d

File tree

3 files changed

+307
-192
lines changed

3 files changed

+307
-192
lines changed

core/utils/async_utils.cpp

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,27 +55,32 @@ void busywait_mutex::unlock() noexcept {
5555
locked_.store(false, std::memory_order_release);
5656
}
5757

58-
thread_pool::thread_pool(size_t max_threads /*= 0*/, size_t max_idle /*= 0*/,
59-
basic_string_view<native_char_t> worker_name /*= ""*/)
58+
template<bool UsePriority>
59+
thread_pool<UsePriority>::thread_pool(
60+
size_t max_threads /*= 0*/, size_t max_idle /*= 0*/,
61+
basic_string_view<native_char_t> worker_name /*= ""*/)
6062
: shared_state_(std::make_shared<shared_state>()),
6163
max_idle_(max_idle),
6264
max_threads_(max_threads),
6365
worker_name_(worker_name) {}
6466

65-
thread_pool::~thread_pool() {
67+
template<bool UsePriority>
68+
thread_pool<UsePriority>::~thread_pool() {
6669
try {
6770
stop(true);
6871
} catch (...) {
6972
}
7073
}
7174

72-
size_t thread_pool::max_idle() const {
75+
template<bool UsePriority>
76+
size_t thread_pool<UsePriority>::max_idle() const {
7377
std::lock_guard lock{shared_state_->lock};
7478

7579
return max_idle_;
7680
}
7781

78-
void thread_pool::max_idle(size_t value) {
82+
template<bool UsePriority>
83+
void thread_pool<UsePriority>::max_idle(size_t value) {
7984
auto& state = *shared_state_;
8085

8186
{
@@ -87,7 +92,8 @@ void thread_pool::max_idle(size_t value) {
8792
state.cond.notify_all(); // wake any idle threads if they need termination
8893
}
8994

90-
void thread_pool::max_idle_delta(int delta) {
95+
template<bool UsePriority>
96+
void thread_pool<UsePriority>::max_idle_delta(int delta) {
9197
auto& state = *shared_state_;
9298

9399
{
@@ -106,13 +112,15 @@ void thread_pool::max_idle_delta(int delta) {
106112
state.cond.notify_all(); // wake any idle threads if they need termination
107113
}
108114

109-
size_t thread_pool::max_threads() const {
115+
template<bool UsePriority>
116+
size_t thread_pool<UsePriority>::max_threads() const {
110117
std::lock_guard lock{shared_state_->lock};
111118

112119
return max_threads_;
113120
}
114121

115-
void thread_pool::max_threads(size_t value) {
122+
template<bool UsePriority>
123+
void thread_pool<UsePriority>::max_threads(size_t value) {
116124
auto& state = *shared_state_;
117125

118126
{
@@ -128,7 +136,8 @@ void thread_pool::max_threads(size_t value) {
128136
state.cond.notify_all(); // wake any idle threads if they need termination
129137
}
130138

131-
void thread_pool::max_threads_delta(int delta) {
139+
template<bool UsePriority>
140+
void thread_pool<UsePriority>::max_threads_delta(int delta) {
132141
auto& state = *shared_state_;
133142

134143
{
@@ -151,8 +160,9 @@ void thread_pool::max_threads_delta(int delta) {
151160
state.cond.notify_all(); // wake any idle threads if they need termination
152161
}
153162

154-
bool thread_pool::run(std::function<void()>&& fn,
155-
clock_t::duration delay /*=0*/) {
163+
template<bool UsePriority>
164+
bool thread_pool<UsePriority>::run(thread_pool<UsePriority>::func_t&& fn,
165+
clock_t::duration delay /*=0*/) {
156166
if (!fn) {
157167
return false;
158168
}
@@ -165,8 +175,11 @@ bool thread_pool::run(std::function<void()>&& fn,
165175
if (State::RUN != state.state.load()) {
166176
return false; // pool not active
167177
}
168-
169-
queue_.emplace(std::move(fn), clock_t::now() + delay);
178+
if constexpr (UsePriority) {
179+
queue_.emplace(std::move(fn), clock_t::now() + delay);
180+
} else {
181+
queue_.emplace(std::move(fn));
182+
}
170183

171184
try {
172185
maybe_spawn_worker();
@@ -184,7 +197,8 @@ bool thread_pool::run(std::function<void()>&& fn,
184197
return true;
185198
}
186199

187-
void thread_pool::stop(bool skip_pending /*= false*/) {
200+
template<bool UsePriority>
201+
void thread_pool<UsePriority>::stop(bool skip_pending /*= false*/) {
188202
shared_state_->state.store(skip_pending ? State::ABORT : State::FINISH);
189203

190204
decltype(queue_) empty;
@@ -201,7 +215,8 @@ void thread_pool::stop(bool skip_pending /*= false*/) {
201215
}
202216
}
203217

204-
void thread_pool::limits(size_t max_threads, size_t max_idle) {
218+
template<bool UsePriority>
219+
void thread_pool<UsePriority>::limits(size_t max_threads, size_t max_idle) {
205220
auto& state = *shared_state_;
206221

207222
{
@@ -218,7 +233,8 @@ void thread_pool::limits(size_t max_threads, size_t max_idle) {
218233
state.cond.notify_all(); // wake any idle threads if they need termination
219234
}
220235

221-
bool thread_pool::maybe_spawn_worker() {
236+
template<bool UsePriority>
237+
bool thread_pool<UsePriority>::maybe_spawn_worker() {
222238
IRS_ASSERT(!shared_state_->lock.try_lock()); // lock must be held
223239

224240
// create extra thread if all threads are busy and can grow pool
@@ -236,37 +252,44 @@ bool thread_pool::maybe_spawn_worker() {
236252
return false;
237253
}
238254

239-
std::pair<size_t, size_t> thread_pool::limits() const {
255+
template<bool UsePriority>
256+
std::pair<size_t, size_t> thread_pool<UsePriority>::limits() const {
240257
std::lock_guard lock{shared_state_->lock};
241258

242259
return {max_threads_, max_idle_};
243260
}
244261

245-
std::tuple<size_t, size_t, size_t> thread_pool::stats() const {
262+
template<bool UsePriority>
263+
std::tuple<size_t, size_t, size_t> thread_pool<UsePriority>::stats() const {
246264
std::lock_guard lock{shared_state_->lock};
247265

248266
return {active_, queue_.size(), threads_.load()};
249267
}
250268

251-
size_t thread_pool::tasks_active() const {
269+
template<bool UsePriority>
270+
size_t thread_pool<UsePriority>::tasks_active() const {
252271
std::lock_guard lock{shared_state_->lock};
253272

254273
return active_;
255274
}
256275

257-
size_t thread_pool::tasks_pending() const {
276+
template<bool UsePriority>
277+
size_t thread_pool<UsePriority>::tasks_pending() const {
258278
std::lock_guard lock{shared_state_->lock};
259279

260280
return queue_.size();
261281
}
262282

263-
size_t thread_pool::threads() const {
283+
template<bool UsePriority>
284+
size_t thread_pool<UsePriority>::threads() const {
264285
std::lock_guard lock{shared_state_->lock};
265286

266287
return threads_.load();
267288
}
268289

269-
void thread_pool::worker(std::shared_ptr<shared_state> shared_state) noexcept {
290+
template<bool UsePriority>
291+
void thread_pool<UsePriority>::worker(
292+
std::shared_ptr<shared_state> shared_state) noexcept {
270293
// hold a reference to 'shared_state_' ensure state is still alive
271294
if (!worker_name_.empty()) {
272295
set_thread_name(worker_name_.c_str());
@@ -289,20 +312,27 @@ void thread_pool::worker(std::shared_ptr<shared_state> shared_state) noexcept {
289312
}
290313
}
291314

292-
void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
293-
std::shared_ptr<shared_state> shared_state) {
315+
template<bool UsePriority>
316+
void thread_pool<UsePriority>::worker_impl(
317+
std::unique_lock<std::mutex>& lock,
318+
std::shared_ptr<shared_state> shared_state) {
294319
auto& state = shared_state->state;
295320

296321
lock.lock();
297322

298323
while (State::ABORT != state.load() && threads_.load() <= max_threads_) {
299324
IRS_ASSERT(lock.owns_lock());
300325
if (!queue_.empty()) {
301-
if (const auto& top = queue_.top(); top.at <= clock_t::now()) {
302-
func_t fn;
303-
fn.swap(const_cast<func_t&>(top.fn));
326+
auto& top = next();
327+
bool proceed = true;
328+
if constexpr (UsePriority) {
329+
if (top.at > clock_t::now()) {
330+
proceed = false;
331+
}
332+
}
333+
if (proceed) {
334+
func_t fn = std::move(func(top));
304335
queue_.pop();
305-
306336
++active_;
307337
Finally decrement = [this]() noexcept { --active_; };
308338
// if have more tasks but no idle thread and can grow pool
@@ -339,8 +369,11 @@ void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
339369
(idle <= max_idle_ || (!queue_.empty() && threads_.load() == 1))) {
340370
if (const auto run_state = state.load();
341371
!queue_.empty() && State::ABORT != run_state) {
342-
const auto at = queue_.top().at; // queue_ might be modified
343-
shared_state->cond.wait_until(lock, at);
372+
IRS_ASSERT(UsePriority);
373+
if constexpr (UsePriority) {
374+
const auto at = queue_.top().at; // queue_ might be modified
375+
shared_state->cond.wait_until(lock, at);
376+
}
344377
} else if (State::RUN == run_state) {
345378
IRS_ASSERT(queue_.empty());
346379
shared_state->cond.wait(lock);
@@ -354,5 +387,8 @@ void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
354387
}
355388
}
356389

390+
template class thread_pool<true>;
391+
template class thread_pool<false>;
392+
357393
} // namespace async_utils
358394
} // namespace irs

core/utils/async_utils.hpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <atomic>
2727
#include <condition_variable>
28+
#include <function2/function2.hpp>
2829
#include <functional>
2930
#include <queue>
3031
#include <thread>
@@ -51,11 +52,12 @@ class busywait_mutex final {
5152
std::atomic<bool> locked_{false};
5253
};
5354

55+
template<bool UsePriority = true>
5456
class thread_pool {
5557
public:
5658
using native_char_t = std::remove_pointer_t<thread_name_t>;
5759
using clock_t = std::chrono::steady_clock;
58-
using func_t = std::function<void()>;
60+
using func_t = fu2::unique_function<void()>;
5961

6062
explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0,
6163
basic_string_view<native_char_t> worker_name =
@@ -72,7 +74,7 @@ class thread_pool {
7274
std::pair<size_t, size_t> limits() const;
7375
void limits(size_t max_threads, size_t max_idle);
7476

75-
bool run(std::function<void()>&& fn, clock_t::duration delay = {});
77+
bool run(func_t&& fn, [[maybe_unused]] clock_t::duration delay = {});
7678
void stop(bool skip_pending = false); // always a blocking call
7779
size_t tasks_active() const;
7880
size_t tasks_pending() const;
@@ -83,14 +85,31 @@ class thread_pool {
8385
private:
8486
enum class State { ABORT, FINISH, RUN };
8587

88+
auto& next() {
89+
if constexpr (UsePriority) {
90+
return queue_.top();
91+
} else {
92+
return queue_.front();
93+
}
94+
}
95+
96+
template<typename T>
97+
static func_t& func(T& t) {
98+
if constexpr (UsePriority) {
99+
return const_cast<func_t&>(t.fn);
100+
} else {
101+
return const_cast<func_t&>(t);
102+
}
103+
}
104+
86105
struct shared_state {
87106
std::mutex lock;
88107
std::condition_variable cond;
89108
std::atomic<State> state{State::RUN};
90109
};
91110

92111
struct task {
93-
explicit task(std::function<void()>&& fn, clock_t::time_point at)
112+
explicit task(func_t&& fn, clock_t::time_point at)
94113
: at(at), fn(std::move(fn)) {}
95114

96115
clock_t::time_point at;
@@ -109,7 +128,8 @@ class thread_pool {
109128
std::atomic<size_t> threads_{0};
110129
size_t max_idle_;
111130
size_t max_threads_;
112-
std::priority_queue<task> queue_;
131+
std::conditional_t<UsePriority, std::priority_queue<task>, std::queue<func_t>>
132+
queue_;
113133
basic_string<native_char_t> worker_name_;
114134
};
115135

0 commit comments

Comments
 (0)