Skip to content

Commit b37f93e

Browse files
authored
Improve thread pool (iresearch-toolkit#571)
* WIP * WIP * WIP * WIP * Update core/utils/async_utils.cpp
1 parent ba9eb35 commit b37f93e

File tree

9 files changed

+187
-546
lines changed

9 files changed

+187
-546
lines changed

core/utils/async_utils.cpp

Lines changed: 71 additions & 318 deletions
Large diffs are not rendered by default.

core/utils/async_utils.hpp

Lines changed: 46 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@
2323

2424
#pragma once
2525

26-
#include <atomic>
2726
#include <condition_variable>
2827
#include <function2/function2.hpp>
29-
#include <functional>
28+
#include <mutex>
3029
#include <queue>
3130
#include <thread>
3231

@@ -35,8 +34,7 @@
3534
#include "string.hpp"
3635
#include "thread_utils.hpp"
3736

38-
namespace irs {
39-
namespace async_utils {
37+
namespace irs::async_utils {
4038

4139
//////////////////////////////////////////////////////////////////////////////
4240
/// @brief spinlock implementation for Win32 since std::mutex cannot be used
@@ -52,86 +50,59 @@ class busywait_mutex final {
5250
std::atomic<bool> locked_{false};
5351
};
5452

55-
template<bool UsePriority = true>
56-
class thread_pool {
53+
template<bool UseDelay = true>
54+
class ThreadPool {
5755
public:
58-
using native_char_t = std::remove_pointer_t<thread_name_t>;
59-
using clock_t = std::chrono::steady_clock;
60-
using func_t = fu2::unique_function<void()>;
61-
62-
explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0,
63-
basic_string_view<native_char_t> worker_name =
64-
kEmptyStringView<native_char_t>);
65-
~thread_pool();
66-
size_t max_idle() const;
67-
void max_idle(size_t value);
68-
void max_idle_delta(int delta); // change value by delta
69-
size_t max_threads() const;
70-
void max_threads(size_t value);
71-
void max_threads_delta(int delta); // change value by delta
72-
73-
// 1st - max_threads(), 2nd - max_idle()
74-
std::pair<size_t, size_t> limits() const;
75-
void limits(size_t max_threads, size_t max_idle);
76-
77-
bool run(func_t&& fn, [[maybe_unused]] clock_t::duration delay = {});
78-
void stop(bool skip_pending = false); // always a blocking call
79-
size_t tasks_active() const;
80-
size_t tasks_pending() const;
81-
size_t threads() const;
56+
using Char = std::remove_pointer_t<thread_name_t>;
57+
using Clock = std::chrono::steady_clock;
58+
using Func = fu2::unique_function<void()>;
59+
60+
explicit ThreadPool(size_t threads, basic_string_view<Char> name = {});
61+
~ThreadPool() { stop(true); }
62+
63+
bool run(Func&& fn, Clock::duration delay = {});
64+
void stop(bool skip_pending = false) noexcept; // always a blocking call
65+
size_t tasks_active() const {
66+
std::lock_guard lock{m_};
67+
return state_ / 2;
68+
}
69+
size_t tasks_pending() const {
70+
std::lock_guard lock{m_};
71+
return tasks_.size();
72+
}
73+
size_t threads() const {
74+
std::lock_guard lock{m_};
75+
return threads_.size();
76+
}
8277
// 1st - tasks active(), 2nd - tasks pending(), 3rd - threads()
83-
std::tuple<size_t, size_t, size_t> stats() const;
78+
std::tuple<size_t, size_t, size_t> stats() const {
79+
std::lock_guard lock{m_};
80+
return {state_ / 2, tasks_.size(), threads_.size()};
81+
}
8482

8583
private:
86-
enum class State { ABORT, FINISH, RUN };
87-
88-
auto& next() {
89-
if constexpr (UsePriority) {
90-
return queue_.top();
91-
} else {
92-
return queue_.front();
93-
}
94-
}
84+
struct Task {
85+
explicit Task(Func&& fn, Clock::time_point at)
86+
: at{at}, fn{std::move(fn)} {}
9587

96-
template<typename T>
97-
static func_t& func(T& t) {
98-
if constexpr (UsePriority) {
99-
return const_cast<func_t&>(t.fn);
100-
} else {
101-
return const_cast<func_t&>(t);
102-
}
103-
}
88+
Clock::time_point at;
89+
Func fn;
10490

105-
struct shared_state {
106-
std::mutex lock;
107-
std::condition_variable cond;
108-
std::atomic<State> state{State::RUN};
91+
bool operator<(const Task& rhs) const noexcept { return rhs.at < at; }
10992
};
11093

111-
struct task {
112-
explicit task(func_t&& fn, clock_t::time_point at)
113-
: at(at), fn(std::move(fn)) {}
94+
void Work();
11495

115-
clock_t::time_point at;
116-
func_t fn;
117-
118-
bool operator<(const task& rhs) const noexcept { return rhs.at < at; }
119-
};
96+
bool WasStop() const { return state_ % 2 != 0; }
12097

121-
void worker(std::shared_ptr<shared_state> shared_state) noexcept;
122-
void worker_impl(std::unique_lock<std::mutex>& lock,
123-
std::shared_ptr<shared_state> shared_state);
124-
bool maybe_spawn_worker();
125-
126-
std::shared_ptr<shared_state> shared_state_;
127-
size_t active_{0};
128-
std::atomic<size_t> threads_{0};
129-
size_t max_idle_;
130-
size_t max_threads_;
131-
std::conditional_t<UsePriority, std::priority_queue<task>, std::queue<func_t>>
132-
queue_;
133-
basic_string<native_char_t> worker_name_;
98+
basic_string<Char> name_;
99+
std::vector<std::thread> threads_;
100+
mutable std::mutex m_;
101+
std::condition_variable cv_;
102+
std::conditional_t<UseDelay, std::priority_queue<Task>, std::queue<Func>>
103+
tasks_;
104+
// stop flag and active tasks counter
105+
uint64_t state_ = 0;
134106
};
135107

136-
} // namespace async_utils
137-
} // namespace irs
108+
} // namespace irs::async_utils

tests/index/index_profile_tests.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class index_profile_test_case : public tests::index_test_base {
126126
std::atomic<size_t> writer_import_count(0);
127127
auto thread_count = (std::max)((size_t)1, num_insert_threads);
128128
auto total_threads = thread_count + num_import_threads + num_update_threads;
129-
irs::async_utils::thread_pool thread_pool(total_threads, total_threads);
129+
irs::async_utils::ThreadPool<> thread_pool(total_threads);
130130
std::mutex mutex;
131131

132132
if (!writer) {
@@ -473,7 +473,7 @@ class index_profile_test_case : public tests::index_test_base {
473473
size_t cleanup_interval) {
474474
auto* directory = &dir();
475475
std::atomic<bool> working(true);
476-
irs::async_utils::thread_pool thread_pool(1, 1);
476+
irs::async_utils::ThreadPool<> thread_pool(1);
477477

478478
thread_pool.run([cleanup_interval, directory, &working]() -> void {
479479
while (working.load()) {
@@ -502,7 +502,7 @@ class index_profile_test_case : public tests::index_test_base {
502502
options.segment_count_max = 8; // match original implementation or may run
503503
// out of file handles (e.g. MacOS/Travis)
504504

505-
irs::async_utils::thread_pool thread_pool(commit_threads, commit_threads);
505+
irs::async_utils::ThreadPool<> thread_pool(commit_threads);
506506
auto writer = open_writer(irs::OM_CREATE, options);
507507

508508
for (size_t i = 0; i < commit_threads; ++i) {
@@ -536,7 +536,7 @@ class index_profile_test_case : public tests::index_test_base {
536536
irs::index_utils::MakePolicy(irs::index_utils::ConsolidateCount());
537537
irs::IndexWriterOptions options;
538538
std::atomic<bool> working(true);
539-
irs::async_utils::thread_pool thread_pool(2, 2);
539+
irs::async_utils::ThreadPool<> thread_pool(2);
540540

541541
options.segment_count_max = 8; // match original implementation or may run
542542
// out of file handles (e.g. MacOS/Travis)

tests/index/index_tests.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ class index_test_case : public tests::index_test_base {
540540

541541
// validate terms async
542542
{
543-
irs::async_utils::thread_pool pool(thread_count, thread_count);
543+
irs::async_utils::ThreadPool<> pool(thread_count);
544544

545545
{
546546
std::lock_guard<std::mutex> lock(mutex);
@@ -583,7 +583,7 @@ class index_test_case : public tests::index_test_base {
583583
ASSERT_EQ(expected_term_itrs[i]->value(), actual_term_itr->value());
584584
}
585585

586-
irs::async_utils::thread_pool pool(thread_count, thread_count);
586+
irs::async_utils::ThreadPool<> pool(thread_count);
587587

588588
{
589589
std::lock_guard<std::mutex> lock(mutex);

tests/store/directory_test_case.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ TEST_P(directory_test_case, read_multiple_streams) {
376376
auto in = dir_->open("test_async", irs::IOAdvice::NORMAL);
377377
std::mutex in_mtx;
378378
std::mutex mutex;
379-
irs::async_utils::thread_pool pool(16, 16);
379+
irs::async_utils::ThreadPool<> pool(16);
380380

381381
ASSERT_FALSE(!in);
382382
in = in->reopen();
@@ -385,7 +385,7 @@ TEST_P(directory_test_case, read_multiple_streams) {
385385
{
386386
std::lock_guard<std::mutex> lock(mutex);
387387

388-
for (auto i = pool.max_threads(); i; --i) {
388+
for (auto i = pool.threads(); i; --i) {
389389
pool.run([&in, &in_mtx, &mutex]() -> void {
390390
index_input::ptr input;
391391

0 commit comments

Comments
 (0)