Skip to content

Commit 0151177

Browse files
committed
Add local thread pool to CCheckQueue
1 parent 0ef9386 commit 0151177

File tree

5 files changed

+64
-15
lines changed

5 files changed

+64
-15
lines changed

src/checkqueue.h

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#define BITCOIN_CHECKQUEUE_H
77

88
#include <sync.h>
9+
#include <tinyformat.h>
10+
#include <util/threadnames.h>
911

1012
#include <algorithm>
1113
#include <vector>
@@ -62,8 +64,11 @@ class CCheckQueue
6264
//! The maximum number of elements to be processed in one batch
6365
const unsigned int nBatchSize;
6466

67+
std::vector<std::thread> m_worker_threads;
68+
bool m_request_stop{false};
69+
6570
/** Internal function that does bulk of the verification work. */
66-
bool Loop(bool fMaster = false)
71+
bool Loop(bool fMaster)
6772
{
6873
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
6974
std::vector<T> vChecks;
@@ -85,7 +90,7 @@ class CCheckQueue
8590
nTotal++;
8691
}
8792
// logically, the do loop starts here
88-
while (queue.empty()) {
93+
while (queue.empty() && !m_request_stop) {
8994
if (fMaster && nTodo == 0) {
9095
nTotal--;
9196
bool fRet = fAllOk;
@@ -98,6 +103,10 @@ class CCheckQueue
98103
cond.wait(lock); // wait
99104
nIdle--;
100105
}
106+
if (m_request_stop) {
107+
return false;
108+
}
109+
101110
// Decide how many work units to process now.
102111
// * Do not try to do everything at once, but aim for increasingly smaller batches so
103112
// all workers finish approximately simultaneously.
@@ -132,16 +141,34 @@ class CCheckQueue
132141
{
133142
}
134143

144+
//! Create a pool of new worker threads.
145+
void StartWorkerThreads(const int threads_num)
146+
{
147+
{
148+
boost::unique_lock<boost::mutex> lock(mutex);
149+
nIdle = 0;
150+
nTotal = 0;
151+
fAllOk = true;
152+
}
153+
assert(m_worker_threads.empty());
154+
for (int n = 0; n < threads_num; ++n) {
155+
m_worker_threads.emplace_back([this, n]() {
156+
util::ThreadRename(strprintf("scriptch.%i", n));
157+
Loop(false /* worker thread */);
158+
});
159+
}
160+
}
161+
135162
//! Worker thread
136163
void Thread()
137164
{
138-
Loop();
165+
Loop(false /* worker thread */);
139166
}
140167

141168
//! Wait until execution finishes, and return whether all evaluations were successful.
142169
bool Wait()
143170
{
144-
return Loop(true);
171+
return Loop(true /* master thread */);
145172
}
146173

147174
//! Add a batch of checks to the queue
@@ -159,8 +186,25 @@ class CCheckQueue
159186
condWorker.notify_all();
160187
}
161188

189+
//! Stop all of the worker threads.
190+
void StopWorkerThreads()
191+
{
192+
{
193+
boost::unique_lock<boost::mutex> lock(mutex);
194+
m_request_stop = true;
195+
}
196+
condWorker.notify_all();
197+
for (std::thread& t : m_worker_threads) {
198+
t.join();
199+
}
200+
m_worker_threads.clear();
201+
boost::unique_lock<boost::mutex> lock(mutex);
202+
m_request_stop = false;
203+
}
204+
162205
~CCheckQueue()
163206
{
207+
assert(m_worker_threads.empty());
164208
}
165209

166210
};

src/init.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ void Shutdown(NodeContext& node)
224224
if (g_load_block.joinable()) g_load_block.join();
225225
threadGroup.interrupt_all();
226226
threadGroup.join_all();
227+
StopScriptCheckWorkerThreads();
227228

228229
// After the threads that potentially access these pointers have been stopped,
229230
// destruct and reset all to nullptr.
@@ -1307,9 +1308,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA
13071308
LogPrintf("Script verification uses %d additional threads\n", script_threads);
13081309
if (script_threads >= 1) {
13091310
g_parallel_script_checks = true;
1310-
for (int i = 0; i < script_threads; ++i) {
1311-
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
1312-
}
1311+
StartScriptCheckWorkerThreads(script_threads);
13131312
}
13141313

13151314
assert(!node.scheduler);

src/test/util/setup_common.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector<const
162162

163163
// Start script-checking threads. Set g_parallel_script_checks to true so they are used.
164164
constexpr int script_check_threads = 2;
165-
for (int i = 0; i < script_check_threads; ++i) {
166-
threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
167-
}
165+
StartScriptCheckWorkerThreads(script_check_threads);
168166
g_parallel_script_checks = true;
169167

170168
m_node.banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
@@ -182,6 +180,7 @@ TestingSetup::~TestingSetup()
182180
if (m_node.scheduler) m_node.scheduler->stop();
183181
threadGroup.interrupt_all();
184182
threadGroup.join_all();
183+
StopScriptCheckWorkerThreads();
185184
GetMainSignals().FlushBackgroundCallbacks();
186185
GetMainSignals().UnregisterBackgroundSignalScheduler();
187186
m_node.connman.reset();

src/validation.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,9 +1817,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, BlockValidationSt
18171817

18181818
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);
18191819

1820-
void ThreadScriptCheck(int worker_num) {
1821-
util::ThreadRename(strprintf("scriptch.%i", worker_num));
1822-
scriptcheckqueue.Thread();
1820+
void StartScriptCheckWorkerThreads(int threads_num)
1821+
{
1822+
scriptcheckqueue.StartWorkerThreads(threads_num);
1823+
}
1824+
1825+
void StopScriptCheckWorkerThreads()
1826+
{
1827+
scriptcheckqueue.StopWorkerThreads();
18231828
}
18241829

18251830
VersionBitsCache versionbitscache GUARDED_BY(cs_main);

src/validation.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,10 @@ void LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, FlatFi
158158
bool LoadGenesisBlock(const CChainParams& chainparams);
159159
/** Unload database information */
160160
void UnloadBlockIndex(CTxMemPool* mempool, ChainstateManager& chainman);
161-
/** Run an instance of the script checking thread */
162-
void ThreadScriptCheck(int worker_num);
161+
/** Run instances of script checking worker threads */
162+
void StartScriptCheckWorkerThreads(int threads_num);
163+
/** Stop all of the script checking worker threads */
164+
void StopScriptCheckWorkerThreads();
163165
/**
164166
* Return transaction from the block at block_index.
165167
* If block_index is not provided, fall back to mempool.

0 commit comments

Comments
 (0)