Skip to content

Commit 4becf98

Browse files
authored
refactor: remove circular dependencies through net_processing (2/N) (dashpay#5792)
## Issue being fixed or feature implemented The architecture of bitcoin assumes that there's no any external class that processes network messages and knows anything about PeerManager from net_processing; no any external call for PeerManager::Misbehaving in bitcoin. All logic related to processing messages are located in net_processing. Dash has many many extra types of network messages and many of them processed by external components such as llmq/signing or coinjoin/client. Current architecture creates multiple circular dependency. ## What was done? That's part II of refactorings. This PR removes PeerManager from several constructor and let LLMQContext to forget about PeerManager. Prior work in this PR: dashpay#5782 ## What else to do? Some network messages are processed asynchronously in external components such as llmq/signing, llmq/instantsend, llmq/dkgsessionhandler. It doesn't let to refactor them easily, because they can't just simple return status of processing; status of processing would be available sometime later and there's need callback or other way to pass result code without spreading PeerManager over codebase. ## How Has This Been Tested? - Run unit/functional tests - run a linter test/lint/lint-circular-dependencies.sh ## Breaking Changes N/A ## Checklist: - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone
1 parent dd27e11 commit 4becf98

19 files changed

+265
-192
lines changed

src/llmq/chainlocks.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ bool CChainLocksHandler::VerifyChainLock(const CChainLockSig& clsig) const
556556
{
557557
const auto llmqType = Params().GetConsensus().llmqTypeChainLocks;
558558
const uint256 nRequestId = ::SerializeHash(std::make_pair(llmq::CLSIG_REQUESTID_PREFIX, clsig.getHeight()));
559-
return llmq::CSigningManager::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig());
559+
return llmq::VerifyRecoveredSig(llmqType, qman, clsig.getHeight(), nRequestId, clsig.getBlockHash(), clsig.getSig());
560560
}
561561

562562
bool CChainLocksHandler::InternalHasChainLock(int nHeight, const uint256& blockHash) const

src/llmq/context.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo
2828
llmq::quorumBlockProcessor = std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, connman, evo_db);
2929
return llmq::quorumBlockProcessor.get();
3030
}()},
31-
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, peerman, unit_tests, wipe)},
31+
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, *dkg_debugman, *quorum_block_processor, sporkman, unit_tests, wipe)},
3232
qman{[&]() -> llmq::CQuorumManager* const {
3333
assert(llmq::quorumManager == nullptr);
3434
llmq::quorumManager = std::make_unique<llmq::CQuorumManager>(*bls_worker, chainstate, connman, *qdkgsman, evo_db, *quorum_block_processor, ::masternodeSync);
3535
return llmq::quorumManager.get();
3636
}()},
37-
sigman{std::make_unique<llmq::CSigningManager>(connman, *llmq::quorumManager, peerman, unit_tests, wipe)},
37+
sigman{std::make_unique<llmq::CSigningManager>(connman, *llmq::quorumManager, unit_tests, wipe)},
3838
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *llmq::quorumManager, *sigman, peerman)},
3939
clhandler{[&]() -> llmq::CChainLocksHandler* const {
4040
assert(llmq::chainLocksHandler == nullptr);
@@ -43,7 +43,7 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CEvoDB& evo
4343
}()},
4444
isman{[&]() -> llmq::CInstantSendManager* const {
4545
assert(llmq::quorumInstantSendManager == nullptr);
46-
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, peerman, unit_tests, wipe);
46+
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, chainstate, connman, *llmq::quorumManager, *sigman, *shareman, sporkman, mempool, *::masternodeSync, unit_tests, wipe);
4747
return llmq::quorumInstantSendManager.get();
4848
}()},
4949
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainstate, connman, *sigman, *shareman, sporkman, *llmq::quorumManager, mempool)}
@@ -62,6 +62,7 @@ LLMQContext::~LLMQContext() {
6262
}
6363

6464
void LLMQContext::Interrupt() {
65+
sigman->InterruptWorkerThread();
6566
shareman->InterruptWorkerThread();
6667

6768
assert(isman == llmq::quorumInstantSendManager.get());
@@ -79,6 +80,7 @@ void LLMQContext::Start() {
7980
qman->Start();
8081
shareman->RegisterAsRecoveredSigsListener();
8182
shareman->StartWorkerThread();
83+
sigman->StartWorkerThread();
8284

8385
llmq::chainLocksHandler->Start();
8486
llmq::quorumInstantSendManager->Start();
@@ -95,6 +97,7 @@ void LLMQContext::Stop() {
9597

9698
shareman->StopWorkerThread();
9799
shareman->UnregisterAsRecoveredSigsListener();
100+
sigman->StopWorkerThread();
98101
qman->Stop();
99102
qdkgsman->StopThreads();
100103
bls_worker->Stop();

src/llmq/dkgsession.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages)
208208
return true;
209209
});
210210

211-
pendingMessages.PushPendingMessage(-1, qc);
211+
pendingMessages.PushPendingMessage(-1, nullptr, qc);
212212
}
213213

214214
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@@ -526,7 +526,7 @@ void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages)
526526
return true;
527527
});
528528

529-
pendingMessages.PushPendingMessage(-1, qc);
529+
pendingMessages.PushPendingMessage(-1, nullptr, qc);
530530
}
531531

532532
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@@ -720,7 +720,7 @@ void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const
720720
return true;
721721
});
722722

723-
pendingMessages.PushPendingMessage(-1, qj);
723+
pendingMessages.PushPendingMessage(-1, nullptr, qj);
724724
}
725725

726726
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@@ -1032,7 +1032,7 @@ void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages)
10321032
return true;
10331033
});
10341034

1035-
pendingMessages.PushPendingMessage(-1, qc);
1035+
pendingMessages.PushPendingMessage(-1, nullptr, qc);
10361036
}
10371037

10381038
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification

src/llmq/dkgsessionhandler.cpp

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ namespace llmq
2626

2727
CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager,
2828
CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor,
29-
const Consensus::LLMQParams& _params, const std::unique_ptr<PeerManager>& peerman, int _quorumIndex) :
29+
const Consensus::LLMQParams& _params, int _quorumIndex) :
3030
blsWorker(_blsWorker),
3131
m_chainstate(chainstate),
3232
connman(_connman),
3333
dkgDebugManager(_dkgDebugManager),
3434
dkgManager(_dkgManager),
3535
quorumBlockProcessor(_quorumBlockProcessor),
3636
params(_params),
37-
m_peerman(peerman),
3837
quorumIndex(_quorumIndex),
3938
curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _dkgManager, _dkgDebugManager, _connman)),
4039
pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
@@ -47,8 +46,18 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
4746
}
4847
}
4948

50-
void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv)
49+
void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv)
5150
{
51+
// if peer is not -1 we should always pass valid peerman
52+
assert(from == -1 || peerman != nullptr);
53+
if (peerman != nullptr) {
54+
if (m_peerman == nullptr) {
55+
m_peerman = peerman;
56+
}
57+
// we should never use one different PeerManagers for same queue
58+
assert(m_peerman == peerman);
59+
}
60+
5261
// this will also consume the data, even if we bail out early
5362
auto pm = std::make_shared<CDataStream>(std::move(vRecv));
5463

@@ -97,6 +106,12 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const
97106
return seenMessages.count(hash) != 0;
98107
}
99108

109+
void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)
110+
{
111+
if (from == -1) return;
112+
m_peerman.load()->Misbehaving(from, score);
113+
}
114+
100115
void CDKGPendingMessages::Clear()
101116
{
102117
LOCK(cs);
@@ -134,17 +149,17 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
134149
params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase));
135150
}
136151

137-
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
152+
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv)
138153
{
139154
// We don't handle messages in the calling thread as deserialization/processing of these would block everything
140155
if (msg_type == NetMsgType::QCONTRIB) {
141-
pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv);
156+
pendingContributions.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
142157
} else if (msg_type == NetMsgType::QCOMPLAINT) {
143-
pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv);
158+
pendingComplaints.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
144159
} else if (msg_type == NetMsgType::QJUSTIFICATION) {
145-
pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv);
160+
pendingJustifications.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
146161
} else if (msg_type == NetMsgType::QPCOMMITMENT) {
147-
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv);
162+
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
148163
}
149164
}
150165

@@ -420,7 +435,7 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
420435
}
421436

422437
template<typename Message, int MessageType>
423-
bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKGPendingMessages& pendingMessages, size_t maxCount)
438+
bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
424439
{
425440
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
426441
if (msgs.empty()) {
@@ -435,7 +450,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
435450
if (!p.second) {
436451
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId);
437452
{
438-
peerman.Misbehaving(nodeId, 100);
453+
pendingMessages.Misbehaving(nodeId, 100);
439454
}
440455
continue;
441456
}
@@ -444,7 +459,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
444459
if (ban) {
445460
LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId);
446461
{
447-
peerman.Misbehaving(nodeId, 100);
462+
pendingMessages.Misbehaving(nodeId, 100);
448463
}
449464
}
450465
LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId);
@@ -461,7 +476,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
461476
LOCK(cs_main);
462477
for (auto nodeId : badNodes) {
463478
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
464-
peerman.Misbehaving(nodeId, 100);
479+
pendingMessages.Misbehaving(nodeId, 100);
465480
}
466481
}
467482

@@ -474,7 +489,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, PeerManager& peerman, CDKG
474489
session.ReceiveMessage(*p.second, ban);
475490
if (ban) {
476491
LogPrint(BCLog::LLMQ_DKG, "%s -- banning node after ReceiveMessage failed, peer=%d\n", __func__, nodeId);
477-
peerman.Misbehaving(nodeId, 100);
492+
pendingMessages.Misbehaving(nodeId, 100);
478493
badNodes.emplace(nodeId);
479494
}
480495
}
@@ -523,7 +538,7 @@ void CDKGSessionHandler::HandleDKGRound()
523538
curSession->Contribute(pendingContributions);
524539
};
525540
auto fContributeWait = [this] {
526-
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, *m_peerman, pendingContributions, 8);
541+
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, pendingContributions, 8);
527542
};
528543
HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);
529544

@@ -532,7 +547,7 @@ void CDKGSessionHandler::HandleDKGRound()
532547
curSession->VerifyAndComplain(pendingComplaints);
533548
};
534549
auto fComplainWait = [this] {
535-
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, *m_peerman, pendingComplaints, 8);
550+
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, pendingComplaints, 8);
536551
};
537552
HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);
538553

@@ -541,7 +556,7 @@ void CDKGSessionHandler::HandleDKGRound()
541556
curSession->VerifyAndJustify(pendingJustifications);
542557
};
543558
auto fJustifyWait = [this] {
544-
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, *m_peerman, pendingJustifications, 8);
559+
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, pendingJustifications, 8);
545560
};
546561
HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);
547562

@@ -550,7 +565,7 @@ void CDKGSessionHandler::HandleDKGRound()
550565
curSession->VerifyAndCommit(pendingPrematureCommitments);
551566
};
552567
auto fCommitWait = [this] {
553-
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, *m_peerman, pendingPrematureCommitments, 8);
568+
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, pendingPrematureCommitments, 8);
554569
};
555570
HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);
556571

src/llmq/dkgsessionhandler.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
#ifndef BITCOIN_LLMQ_DKGSESSIONHANDLER_H
66
#define BITCOIN_LLMQ_DKGSESSIONHANDLER_H
77

8-
98
#include <ctpl_stl.h>
109
#include <net.h>
1110

11+
#include <gsl/pointers.h>
12+
1213
#include <atomic>
1314
#include <map>
1415
#include <optional>
@@ -50,6 +51,7 @@ class CDKGPendingMessages
5051

5152
private:
5253
mutable RecursiveMutex cs;
54+
std::atomic<PeerManager*> m_peerman{nullptr};
5355
const int invType;
5456
size_t maxMessagesPerNode GUARDED_BY(cs);
5557
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs);
@@ -60,17 +62,18 @@ class CDKGPendingMessages
6062
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
6163
invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {};
6264

63-
void PushPendingMessage(NodeId from, CDataStream& vRecv);
65+
void PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv);
6466
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
6567
bool HasSeen(const uint256& hash) const;
68+
void Misbehaving(NodeId from, int score);
6669
void Clear();
6770

6871
template<typename Message>
69-
void PushPendingMessage(NodeId from, Message& msg)
72+
void PushPendingMessage(NodeId from, PeerManager* peerman, Message& msg)
7073
{
7174
CDataStream ds(SER_NETWORK, PROTOCOL_VERSION);
7275
ds << msg;
73-
PushPendingMessage(from, ds);
76+
PushPendingMessage(from, peerman, ds);
7477
}
7578

7679
// Might return nullptr messages, which indicates that deserialization failed for some reason
@@ -120,7 +123,6 @@ class CDKGSessionHandler
120123
CDKGSessionManager& dkgManager;
121124
CQuorumBlockProcessor& quorumBlockProcessor;
122125
const Consensus::LLMQParams params;
123-
const std::unique_ptr<PeerManager>& m_peerman;
124126
const int quorumIndex;
125127

126128
QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle};
@@ -140,11 +142,11 @@ class CDKGSessionHandler
140142
public:
141143
CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDKGDebugManager& _dkgDebugManager,
142144
CDKGSessionManager& _dkgManager, CQuorumBlockProcessor& _quorumBlockProcessor,
143-
const Consensus::LLMQParams& _params, const std::unique_ptr<PeerManager>& peerman, int _quorumIndex);
145+
const Consensus::LLMQParams& _params, int _quorumIndex);
144146
~CDKGSessionHandler() = default;
145147

146148
void UpdatedBlockTip(const CBlockIndex *pindexNew);
147-
void ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv);
149+
void ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv);
148150

149151
void StartThread();
150152
void StopThread();

0 commit comments

Comments
 (0)