Skip to content

Commit b386d37

Browse files
committed
Merge #18710: Add local thread pool to CCheckQueue
bb6fcc7 refactor: Drop boost::thread stuff in CCheckQueue (Hennadii Stepanov) 6784ac4 bench: Use CCheckQueue local thread pool (Hennadii Stepanov) dba3069 test: Use CCheckQueue local thread pool (Hennadii Stepanov) 0151177 Add local thread pool to CCheckQueue (Hennadii Stepanov) 0ef9386 refactor: Use member initializers in CCheckQueue (Hennadii Stepanov) Pull request description: This PR: - gets rid of `boost::thread_group` in the `CCheckQueue` class - allows thread safety annotation usage in the `CCheckQueue` class - is alternative to #14464 (bitcoin/bitcoin#18710 (comment), bitcoin/bitcoin#18710 (comment)) Also, with this PR (I hope) it could be easier to resurrect a bunch of brilliant ideas from #9938. Related: #17307 ACKs for top commit: laanwj: Code review ACK bb6fcc7 LarryRuane: ACK bb6fcc7 jonatack: Code review ACK bb6fcc7 and verified rebase to master builds cleanly with unit/functional tests green Tree-SHA512: fddeb720d5a391b48bb4c6fa58ed34ccc3f57862fdb8e641745c021841c8340e35c5126338271446cbd98f40bd5484f27926aa6c3e76fa478ba1efafe72e73c1
2 parents d0852f3 + bb6fcc7 commit b386d37

File tree

9 files changed

+97
-96
lines changed

9 files changed

+97
-96
lines changed

src/bench/checkqueue.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
#include <random.h>
1111
#include <util/system.h>
1212

13-
#include <boost/thread/thread.hpp>
14-
1513
#include <vector>
1614

1715
static const size_t BATCHES = 101;
@@ -44,12 +42,9 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench)
4442
void swap(PrevectorJob& x){p.swap(x.p);};
4543
};
4644
CCheckQueue<PrevectorJob> queue {QUEUE_BATCH_SIZE};
47-
boost::thread_group tg;
4845
// The main thread should be counted to prevent thread oversubscription, and
4946
// to decrease the variance of benchmark results.
50-
for (auto x = 0; x < GetNumCores() - 1; ++x) {
51-
tg.create_thread([&]{queue.Thread();});
52-
}
47+
queue.StartWorkerThreads(GetNumCores() - 1);
5348

5449
// create all the data once, then submit copies in the benchmark.
5550
FastRandomContext insecure_rand(true);
@@ -70,8 +65,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench)
7065
// it is done explicitly here for clarity
7166
control.Wait();
7267
});
73-
tg.interrupt_all();
74-
tg.join_all();
68+
queue.StopWorkerThreads();
7569
ECC_Stop();
7670
}
7771
BENCHMARK(CCheckQueueSpeedPrevectorJob);

src/checkqueue.h

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@
66
#define BITCOIN_CHECKQUEUE_H
77

88
#include <sync.h>
9+
#include <tinyformat.h>
10+
#include <util/threadnames.h>
911

1012
#include <algorithm>
1113
#include <vector>
1214

13-
#include <boost/thread/condition_variable.hpp>
14-
#include <boost/thread/mutex.hpp>
15-
1615
template <typename T>
1716
class CCheckQueueControl;
1817

@@ -31,61 +30,64 @@ class CCheckQueue
3130
{
3231
private:
3332
//! Mutex to protect the inner state
34-
boost::mutex mutex;
33+
Mutex m_mutex;
3534

3635
//! Worker threads block on this when out of work
37-
boost::condition_variable condWorker;
36+
std::condition_variable m_worker_cv;
3837

3938
//! Master thread blocks on this when out of work
40-
boost::condition_variable condMaster;
39+
std::condition_variable m_master_cv;
4140

4241
//! The queue of elements to be processed.
4342
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
44-
std::vector<T> queue;
43+
std::vector<T> queue GUARDED_BY(m_mutex);
4544

4645
//! The number of workers (including the master) that are idle.
47-
int nIdle;
46+
int nIdle GUARDED_BY(m_mutex){0};
4847

4948
//! The total number of workers (including the master).
50-
int nTotal;
49+
int nTotal GUARDED_BY(m_mutex){0};
5150

5251
//! The temporary evaluation result.
53-
bool fAllOk;
52+
bool fAllOk GUARDED_BY(m_mutex){true};
5453

5554
/**
5655
* Number of verifications that haven't completed yet.
5756
* This includes elements that are no longer queued, but still in the
5857
* worker's own batches.
5958
*/
60-
unsigned int nTodo;
59+
unsigned int nTodo GUARDED_BY(m_mutex){0};
6160

6261
//! The maximum number of elements to be processed in one batch
63-
unsigned int nBatchSize;
62+
const unsigned int nBatchSize;
63+
64+
std::vector<std::thread> m_worker_threads;
65+
bool m_request_stop GUARDED_BY(m_mutex){false};
6466

6567
/** Internal function that does bulk of the verification work. */
66-
bool Loop(bool fMaster = false)
68+
bool Loop(bool fMaster)
6769
{
68-
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
70+
std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
6971
std::vector<T> vChecks;
7072
vChecks.reserve(nBatchSize);
7173
unsigned int nNow = 0;
7274
bool fOk = true;
7375
do {
7476
{
75-
boost::unique_lock<boost::mutex> lock(mutex);
77+
WAIT_LOCK(m_mutex, lock);
7678
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
7779
if (nNow) {
7880
fAllOk &= fOk;
7981
nTodo -= nNow;
8082
if (nTodo == 0 && !fMaster)
8183
// We processed the last element; inform the master it can exit and return the result
82-
condMaster.notify_one();
84+
m_master_cv.notify_one();
8385
} else {
8486
// first iteration
8587
nTotal++;
8688
}
8789
// logically, the do loop starts here
88-
while (queue.empty()) {
90+
while (queue.empty() && !m_request_stop) {
8991
if (fMaster && nTodo == 0) {
9092
nTotal--;
9193
bool fRet = fAllOk;
@@ -98,6 +100,10 @@ class CCheckQueue
98100
cond.wait(lock); // wait
99101
nIdle--;
100102
}
103+
if (m_request_stop) {
104+
return false;
105+
}
106+
101107
// Decide how many work units to process now.
102108
// * Do not try to do everything at once, but aim for increasingly smaller batches so
103109
// all workers finish approximately simultaneously.
@@ -106,7 +112,7 @@ class CCheckQueue
106112
nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
107113
vChecks.resize(nNow);
108114
for (unsigned int i = 0; i < nNow; i++) {
109-
// We want the lock on the mutex to be as short as possible, so swap jobs from the global
115+
// We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
110116
// queue to the local batch vector instead of copying.
111117
vChecks[i].swap(queue.back());
112118
queue.pop_back();
@@ -124,40 +130,68 @@ class CCheckQueue
124130

125131
public:
126132
//! Mutex to ensure only one concurrent CCheckQueueControl
127-
boost::mutex ControlMutex;
133+
Mutex m_control_mutex;
128134

129135
//! Create a new check queue
130-
explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
136+
explicit CCheckQueue(unsigned int nBatchSizeIn)
137+
: nBatchSize(nBatchSizeIn)
138+
{
139+
}
131140

132-
//! Worker thread
133-
void Thread()
141+
//! Create a pool of new worker threads.
142+
void StartWorkerThreads(const int threads_num)
134143
{
135-
Loop();
144+
{
145+
LOCK(m_mutex);
146+
nIdle = 0;
147+
nTotal = 0;
148+
fAllOk = true;
149+
}
150+
assert(m_worker_threads.empty());
151+
for (int n = 0; n < threads_num; ++n) {
152+
m_worker_threads.emplace_back([this, n]() {
153+
util::ThreadRename(strprintf("scriptch.%i", n));
154+
Loop(false /* worker thread */);
155+
});
156+
}
136157
}
137158

138159
//! Wait until execution finishes, and return whether all evaluations were successful.
139160
bool Wait()
140161
{
141-
return Loop(true);
162+
return Loop(true /* master thread */);
142163
}
143164

144165
//! Add a batch of checks to the queue
145166
void Add(std::vector<T>& vChecks)
146167
{
147-
boost::unique_lock<boost::mutex> lock(mutex);
168+
LOCK(m_mutex);
148169
for (T& check : vChecks) {
149170
queue.push_back(T());
150171
check.swap(queue.back());
151172
}
152173
nTodo += vChecks.size();
153174
if (vChecks.size() == 1)
154-
condWorker.notify_one();
175+
m_worker_cv.notify_one();
155176
else if (vChecks.size() > 1)
156-
condWorker.notify_all();
177+
m_worker_cv.notify_all();
178+
}
179+
180+
//! Stop all of the worker threads.
181+
void StopWorkerThreads()
182+
{
183+
WITH_LOCK(m_mutex, m_request_stop = true);
184+
m_worker_cv.notify_all();
185+
for (std::thread& t : m_worker_threads) {
186+
t.join();
187+
}
188+
m_worker_threads.clear();
189+
WITH_LOCK(m_mutex, m_request_stop = false);
157190
}
158191

159192
~CCheckQueue()
160193
{
194+
assert(m_worker_threads.empty());
161195
}
162196

163197
};
@@ -181,7 +215,7 @@ class CCheckQueueControl
181215
{
182216
// passed queue is supposed to be unused, or nullptr
183217
if (pqueue != nullptr) {
184-
ENTER_CRITICAL_SECTION(pqueue->ControlMutex);
218+
ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
185219
}
186220
}
187221

@@ -205,7 +239,7 @@ class CCheckQueueControl
205239
if (!fDone)
206240
Wait();
207241
if (pqueue != nullptr) {
208-
LEAVE_CRITICAL_SECTION(pqueue->ControlMutex);
242+
LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
209243
}
210244
}
211245
};

src/init.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ void Shutdown(NodeContext& node)
223223
if (g_load_block.joinable()) g_load_block.join();
224224
threadGroup.interrupt_all();
225225
threadGroup.join_all();
226+
StopScriptCheckWorkerThreads();
226227

227228
// After the threads that potentially access these pointers have been stopped,
228229
// destruct and reset all to nullptr.
@@ -1334,9 +1335,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA
13341335
LogPrintf("Script verification uses %d additional threads\n", script_threads);
13351336
if (script_threads >= 1) {
13361337
g_parallel_script_checks = true;
1337-
for (int i = 0; i < script_threads; ++i) {
1338-
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
1339-
}
1338+
StartScriptCheckWorkerThreads(script_threads);
13401339
}
13411340

13421341
assert(!node.scheduler);

0 commit comments

Comments
 (0)