Skip to content

Commit b668f8d

Browse files
Merge #7004: refactor: enhance CSigSharesManager with multi-threaded worker pool and dispatcher
f033c54 fix: call join first (pasta) cc70203 fix: 12 char thread name (PastaPastaPasta) 1c52425 refactor: streamline CSigSharesManager's message handling and improve thread safety (pasta) e150738 refactor: enhance CSigSharesManager with multi-threaded worker pool and dispatcher (pasta) Pull request description: ## Issue being fixed or feature implemented I don't have good data as to if this results in improvement. However, when sigShare BLS signing (if ever) takes more than a full thread, we will be bottlenecked. This PR addresses two concerns 1. Separate out cleanup and network sending logic into it's own thread This should prevent the network sending or cleanup logic from slowing down the creation of sigShares 2. Use a thread pool and dispatch jobs to that pool This should enable scaling sigShare creation past 1 full thread, and currently could use up to half of available cores for sigShare signing. ## How Has This Been Tested? This hadn't been tested; please review, but I'll want to deploy this on testnet before merging I think. ## Breaking Changes ## Checklist: _Go over all the following points, and put an `x` in all the boxes that apply._ - [ ] I have performed a self-review of my own code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_ Top commit has no ACKs. Tree-SHA512: 231f4910bc328a4b70ca3df6d758705699f29465f592d08a31c5ae6991db4928da0285412e1578c6ba55488d619374749f5ea857f471784f9f1937be8489fcd3
2 parents f57147c + f033c54 commit b668f8d

File tree

3 files changed

+129
-51
lines changed

3 files changed

+129
-51
lines changed

src/llmq/signing_shares.cpp

Lines changed: 108 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -196,26 +196,45 @@ CSigSharesManager::CSigSharesManager(CConnman& connman, CChainState& chainstate,
196196

197197
CSigSharesManager::~CSigSharesManager() = default;
198198

199-
void CSigSharesManager::StartWorkerThread()
199+
void CSigSharesManager::Start()
200200
{
201-
// can't start new thread if we have one running already
202-
if (workThread.joinable()) {
201+
// can't start if threads are already running
202+
if (housekeepingThread.joinable() || dispatcherThread.joinable()) {
203203
assert(false);
204204
}
205205

206-
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
206+
// Initialize worker pool
207+
int workerCount = std::clamp(static_cast<int>(std::thread::hardware_concurrency() / 2), 1, 4);
208+
workerPool.resize(workerCount);
209+
RenameThreadPool(workerPool, "sigsh-work");
210+
211+
// Start housekeeping thread
212+
housekeepingThread = std::thread(&util::TraceThread, "sigsh-maint",
213+
[this] { HousekeepingThreadMain(); });
214+
215+
// Start dispatcher thread
216+
dispatcherThread = std::thread(&util::TraceThread, "sigsh-dispat",
217+
[this] { WorkDispatcherThreadMain(); });
207218
}
208219

209-
void CSigSharesManager::StopWorkerThread()
220+
void CSigSharesManager::Stop()
210221
{
211222
// make sure to call InterruptWorkerThread() first
212223
if (!workInterrupt) {
213224
assert(false);
214225
}
215226

216-
if (workThread.joinable()) {
217-
workThread.join();
227+
// Join threads FIRST to stop any pending push() calls
228+
if (housekeepingThread.joinable()) {
229+
housekeepingThread.join();
218230
}
231+
if (dispatcherThread.joinable()) {
232+
dispatcherThread.join();
233+
}
234+
235+
// Then stop worker pool (now safe, no more push() calls)
236+
workerPool.clear_queue();
237+
workerPool.stop(true);
219238
}
220239

221240
void CSigSharesManager::RegisterAsRecoveredSigsListener()
@@ -1611,60 +1630,106 @@ void CSigSharesManager::BanNode(NodeId nodeId)
16111630
nodeState.banned = true;
16121631
}
16131632

1614-
void CSigSharesManager::WorkThreadMain()
1633+
void CSigSharesManager::HousekeepingThreadMain()
16151634
{
1616-
int64_t lastSendTime = 0;
1617-
16181635
while (!workInterrupt) {
16191636
RemoveBannedNodeStates();
1637+
SendMessages();
1638+
Cleanup();
16201639

1621-
bool fMoreWork = ProcessPendingSigShares();
1622-
SignPendingSigShares();
1640+
workInterrupt.sleep_for(std::chrono::milliseconds(100));
1641+
}
1642+
}
16231643

1624-
if (TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now()) - lastSendTime > 100) {
1625-
SendMessages();
1626-
lastSendTime = TicksSinceEpoch<std::chrono::milliseconds>(SystemClock::now());
1627-
}
1644+
void CSigSharesManager::WorkDispatcherThreadMain()
1645+
{
1646+
while (!workInterrupt) {
1647+
// Dispatch all pending signs (individual tasks)
1648+
DispatchPendingSigns();
16281649

1629-
Cleanup();
1650+
// If there's processing work, spawn a helper worker
1651+
DispatchPendingProcessing();
16301652

1631-
// TODO Wakeup when pending signing is needed?
1632-
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
1633-
return;
1634-
}
1653+
// Always sleep briefly between checks
1654+
workInterrupt.sleep_for(std::chrono::milliseconds(10));
16351655
}
16361656
}
16371657

1638-
void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash)
1658+
void CSigSharesManager::DispatchPendingSigns()
16391659
{
1640-
LOCK(cs_pendingSigns);
1641-
pendingSigns.emplace_back(std::move(quorum), id, msgHash);
1660+
// Swap out entire vector to avoid lock thrashing
1661+
std::vector<PendingSignatureData> signs;
1662+
{
1663+
LOCK(cs_pendingSigns);
1664+
signs.swap(pendingSigns);
1665+
}
1666+
1667+
// Dispatch all signs to worker pool
1668+
for (auto& work : signs) {
1669+
if (workInterrupt) break;
1670+
1671+
workerPool.push([this, work = std::move(work)](int) {
1672+
SignAndProcessSingleShare(std::move(work));
1673+
});
1674+
}
16421675
}
16431676

1644-
void CSigSharesManager::SignPendingSigShares()
1677+
void CSigSharesManager::DispatchPendingProcessing()
16451678
{
1646-
std::vector<PendingSignatureData> v;
1647-
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));
1648-
1649-
for (const auto& [pQuorum, id, msgHash] : v) {
1650-
auto opt_sigShare = CreateSigShare(*pQuorum, id, msgHash);
1651-
1652-
if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
1653-
auto& sigShare = *opt_sigShare;
1654-
ProcessSigShare(sigShare, pQuorum);
1655-
1656-
if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
1657-
LOCK(cs);
1658-
auto& session = signedSessions[sigShare.GetSignHash()];
1659-
session.sigShare = std::move(sigShare);
1660-
session.quorum = pQuorum;
1661-
session.nextAttemptTime = 0;
1662-
session.attempt = 0;
1663-
}
1679+
// Check if there's work, spawn a helper if so
1680+
bool hasWork = false;
1681+
{
1682+
LOCK(cs);
1683+
hasWork = std::any_of(nodeStates.begin(), nodeStates.end(),
1684+
[](const auto& entry) {
1685+
return !entry.second.pendingIncomingSigShares.Empty();
1686+
});
1687+
}
1688+
1689+
if (hasWork) {
1690+
// Work exists - spawn a worker to help!
1691+
workerPool.push([this](int) {
1692+
ProcessPendingSigSharesLoop();
1693+
});
1694+
}
1695+
}
1696+
1697+
void CSigSharesManager::ProcessPendingSigSharesLoop()
1698+
{
1699+
while (!workInterrupt) {
1700+
bool moreWork = ProcessPendingSigShares();
1701+
1702+
if (!moreWork) {
1703+
return; // No work found, exit immediately
1704+
}
1705+
}
1706+
}
1707+
1708+
void CSigSharesManager::SignAndProcessSingleShare(PendingSignatureData work)
1709+
{
1710+
auto opt_sigShare = CreateSigShare(*work.quorum, work.id, work.msgHash);
1711+
1712+
if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
1713+
auto& sigShare = *opt_sigShare;
1714+
ProcessSigShare(sigShare, work.quorum);
1715+
1716+
if (IsAllMembersConnectedEnabled(work.quorum->params.type, m_sporkman)) {
1717+
LOCK(cs);
1718+
auto& session = signedSessions[sigShare.GetSignHash()];
1719+
session.sigShare = std::move(sigShare);
1720+
session.quorum = work.quorum;
1721+
session.nextAttemptTime = 0;
1722+
session.attempt = 0;
16641723
}
16651724
}
16661725
}
16671726

1727+
void CSigSharesManager::AsyncSign(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash)
1728+
{
1729+
LOCK(cs_pendingSigns);
1730+
pendingSigns.emplace_back(std::move(quorum), id, msgHash);
1731+
}
1732+
16681733
std::optional<CSigShare> CSigSharesManager::CreateSigShareForSingleMember(const CQuorum& quorum, const uint256& id, const uint256& msgHash) const
16691734
{
16701735
cxxtimer::Timer t(true);

src/llmq/signing_shares.h

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define BITCOIN_LLMQ_SIGNING_SHARES_H
77

88
#include <bls/bls.h>
9+
#include <ctpl_stl.h>
910
#include <evo/types.h>
1011
#include <llmq/signhash.h>
1112
#include <llmq/signing.h>
@@ -361,7 +362,7 @@ class CSignedSession
361362
int attempt{0};
362363
};
363364

364-
class CSigSharesManager : public CRecoveredSigsListener
365+
class CSigSharesManager : public llmq::CRecoveredSigsListener
365366
{
366367
private:
367368
static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60};
@@ -380,7 +381,9 @@ class CSigSharesManager : public CRecoveredSigsListener
380381

381382
Mutex cs;
382383

383-
std::thread workThread;
384+
mutable ctpl::thread_pool workerPool;
385+
std::thread housekeepingThread;
386+
std::thread dispatcherThread;
384387
CThreadInterrupt workInterrupt;
385388

386389
SigShareMap<CSigShare> sigShares GUARDED_BY(cs);
@@ -426,8 +429,8 @@ class CSigSharesManager : public CRecoveredSigsListener
426429
const CQuorumManager& _qman, const CSporkManager& sporkman);
427430
~CSigSharesManager() override;
428431

429-
void StartWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
430-
void StopWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
432+
void Start() EXCLUSIVE_LOCKS_REQUIRED(!cs);
433+
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!cs);
431434
void RegisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs);
432435
void UnregisterAsRecoveredSigsListener() EXCLUSIVE_LOCKS_REQUIRED(!cs);
433436
void InterruptWorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!cs);
@@ -500,8 +503,18 @@ class CSigSharesManager : public CRecoveredSigsListener
500503
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs);
501504
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, Uint256HashMap<CSigSharesInv>>& sigSharesToAnnounce)
502505
EXCLUSIVE_LOCKS_REQUIRED(cs);
503-
void SignPendingSigShares() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
504-
void WorkThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
506+
507+
// Thread main functions
508+
void HousekeepingThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs);
509+
void WorkDispatcherThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
510+
511+
// Dispatcher functions
512+
void DispatchPendingSigns() EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns);
513+
void DispatchPendingProcessing() EXCLUSIVE_LOCKS_REQUIRED(!cs);
514+
515+
// Worker pool task functions
516+
void ProcessPendingSigSharesLoop() EXCLUSIVE_LOCKS_REQUIRED(!cs);
517+
void SignAndProcessSingleShare(PendingSignatureData work) EXCLUSIVE_LOCKS_REQUIRED(!cs_pendingSigns, !cs);
505518
};
506519
} // namespace llmq
507520

src/masternode/active/context.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ void ActiveContext::Start(CConnman& connman, PeerManager& peerman)
5555
{
5656
m_llmq_ctx.qdkgsman->StartThreads(connman, peerman);
5757
shareman->RegisterAsRecoveredSigsListener();
58-
shareman->StartWorkerThread();
58+
shareman->Start();
5959
}
6060

6161
void ActiveContext::Stop()
6262
{
63-
shareman->StopWorkerThread();
63+
shareman->Stop();
6464
shareman->UnregisterAsRecoveredSigsListener();
6565
m_llmq_ctx.qdkgsman->StopThreads();
6666
}

0 commit comments

Comments
 (0)