Skip to content

Commit 44a7a8c

Browse files
Merge #7070: refactor: break some circular dependencies over coinjoin/server
a56c106 refactor: drop dependency of PeerManager on CoinJoinServer (Konstantin Akimov) e8685c5 refactor: use Scheduler's feature of NetHandler for coinjoin/server (Konstantin Akimov) 3274051 refactor: use NetHandler for CoinJoinServer to avoid multiple circular dependencies (Konstantin Akimov) Pull request description: ## Issue being fixed or feature implemented There are several circular dependencies that may be resolved by using `NetHandler` interface in CJ "coinjoin/server -> net_processing -> masternode/active/context -> coinjoin/server" "coinjoin/server -> net_processing -> coinjoin/server" **This PR is not a part of separation of Network and Consensus code**; it's a side quest to break couple circular dependencies. It doesn't block me currently and any conflicting PRs may be merged before this one. ToDo the same refactoring for `coinjoin/client` and `coinjoin/walletman` (out of scope this PR, could be done anytime in the future, but probably a bit more complex than coinjoin/server because need to split `CJWalletManager` and `CJWalletManagerImpl::queueman`) "chainlock/chainlock -> llmq/quorums -> msg_result -> coinjoin/coinjoin -> chainlock/chainlock", "coinjoin/coinjoin -> instantsend/instantsend -> spork -> msg_result -> coinjoin/coinjoin", ## What was done? Two circular dependencies are resolved. This PR use `NetHandler` interface to aggregate network code. ## How Has This Been Tested? Run unit & functional tests. ## Breaking Changes N/A ## Checklist: - [x] I have performed a self-review of my own code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone ACKs for top commit: UdjinM6: utACK a56c106 Tree-SHA512: 2a6593cca3b8c15b48d382f0dc1f695594eca72f0739390c7da66e3a4a57bff6d75e23f5f142819a8536d7dc638071713a376a529d91dc1e4eb8d2df43abedc8
2 parents d4ae23e + a56c106 commit 44a7a8c

File tree

9 files changed

+146
-98
lines changed

9 files changed

+146
-98
lines changed

src/coinjoin/server.cpp

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#include <masternode/node.h>
1111
#include <masternode/sync.h>
1212
#include <net.h>
13-
#include <netmessagemaker.h>
1413
#include <net_processing.h>
14+
#include <netmessagemaker.h>
15+
#include <scheduler.h>
1516
#include <script/interpreter.h>
1617
#include <shutdown.h>
1718
#include <streams.h>
@@ -23,17 +24,17 @@
2324

2425
#include <univalue.h>
2526

26-
CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman, CDeterministicMNManager& dmnman,
27-
CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
28-
PeerManager& peerman, const CActiveMasternodeManager& mn_activeman,
27+
CCoinJoinServer::CCoinJoinServer(PeerManagerInternal* peer_manager, ChainstateManager& chainman, CConnman& _connman,
28+
CDeterministicMNManager& dmnman, CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman,
29+
CTxMemPool& mempool, const CActiveMasternodeManager& mn_activeman,
2930
const CMasternodeSync& mn_sync, const llmq::CInstantSendManager& isman) :
31+
NetHandler(peer_manager),
3032
m_chainman{chainman},
3133
connman{_connman},
3234
m_dmnman{dmnman},
3335
m_dstxman{dstxman},
3436
m_mn_metaman{mn_metaman},
3537
mempool{mempool},
36-
m_peerman{peerman},
3738
m_mn_activeman{mn_activeman},
3839
m_mn_sync{mn_sync},
3940
m_isman{isman},
@@ -44,20 +45,19 @@ CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman
4445

4546
CCoinJoinServer::~CCoinJoinServer() = default;
4647

47-
MessageProcessingResult CCoinJoinServer::ProcessMessage(CNode& peer, std::string_view msg_type, CDataStream& vRecv)
48+
void CCoinJoinServer::ProcessMessage(CNode& peer, const std::string& msg_type, CDataStream& vRecv)
4849
{
49-
if (!m_mn_sync.IsBlockchainSynced()) return {};
50+
if (!m_mn_sync.IsBlockchainSynced()) return;
5051

5152
if (msg_type == NetMsgType::DSACCEPT) {
5253
ProcessDSACCEPT(peer, vRecv);
5354
} else if (msg_type == NetMsgType::DSQUEUE) {
54-
return ProcessDSQUEUE(peer.GetId(), vRecv);
55+
ProcessDSQUEUE(peer.GetId(), vRecv);
5556
} else if (msg_type == NetMsgType::DSVIN) {
5657
ProcessDSVIN(peer, vRecv);
5758
} else if (msg_type == NetMsgType::DSSIGNFINALTX) {
5859
ProcessDSSIGNFINALTX(vRecv);
5960
}
60-
return {};
6161
}
6262

6363
void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
@@ -126,87 +126,85 @@ void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
126126
}
127127
}
128128

129-
MessageProcessingResult CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv)
129+
void CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv)
130130
{
131131
assert(m_mn_metaman.IsValid());
132132

133133
CCoinJoinQueue dsq;
134134
vRecv >> dsq;
135135

136-
MessageProcessingResult ret{};
137-
ret.m_to_erase = CInv{MSG_DSQ, dsq.GetHash()};
136+
WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{MSG_DSQ, dsq.GetHash()}));
138137

139138
// Validate denomination first
140139
if (!CoinJoin::IsValidDenomination(dsq.nDenom)) {
141140
LogPrint(BCLog::COINJOIN, "DSQUEUE -- invalid denomination %d from peer %d\n", dsq.nDenom, from);
142-
ret.m_error = MisbehavingError{10};
143-
return ret;
141+
m_peer_manager->PeerMisbehaving(from, 10);
142+
return;
144143
}
145144

146145
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
147-
ret.m_error = MisbehavingError{100};
148-
return ret;
146+
m_peer_manager->PeerMisbehaving(from, 100);
147+
return;
149148
}
150149

151150
const auto tip_mn_list = m_dmnman.GetListAtChainTip();
152151
if (dsq.masternodeOutpoint.IsNull()) {
153152
if (auto dmn = tip_mn_list.GetValidMN(dsq.m_protxHash)) {
154153
dsq.masternodeOutpoint = dmn->collateralOutpoint;
155154
} else {
156-
ret.m_error = MisbehavingError{10};
157-
return ret;
155+
m_peer_manager->PeerMisbehaving(from, 10);
156+
return;
158157
}
159158
}
160159

161160
{
162161
TRY_LOCK(cs_vecqueue, lockRecv);
163-
if (!lockRecv) return ret;
162+
if (!lockRecv) return;
164163

165164
// process every dsq only once
166165
for (const auto& q : vecCoinJoinQueue) {
167166
if (q == dsq) {
168-
return ret;
167+
return;
169168
}
170169
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
171170
// no way the same mn can send another dsq with the same readiness this soon
172171
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Peer %d is sending WAY too many dsq messages for a masternode with collateral %s\n", from, dsq.masternodeOutpoint.ToStringShort());
173-
return ret;
172+
return;
174173
}
175174
}
176175
} // cs_vecqueue
177176

178177
LogPrint(BCLog::COINJOIN, "DSQUEUE -- %s new\n", dsq.ToString());
179178

180-
if (dsq.IsTimeOutOfBounds()) return ret;
179+
if (dsq.IsTimeOutOfBounds()) return;
181180

182181
auto dmn = tip_mn_list.GetValidMNByCollateral(dsq.masternodeOutpoint);
183-
if (!dmn) return ret;
182+
if (!dmn) return;
184183

185184
if (dsq.m_protxHash.IsNull()) {
186185
dsq.m_protxHash = dmn->proTxHash;
187186
}
188187

189188
if (!dsq.CheckSignature(dmn->pdmnState->pubKeyOperator.Get())) {
190-
ret.m_error = MisbehavingError{10};
191-
return ret;
189+
m_peer_manager->PeerMisbehaving(from, 10);
190+
return;
192191
}
193192

194193
if (!dsq.fReady) {
195194
//don't allow a few nodes to dominate the queuing process
196195
if (m_mn_metaman.IsMixingThresholdExceeded(dmn->proTxHash, tip_mn_list.GetValidMNsCount())) {
197196
LogPrint(BCLog::COINJOIN, "DSQUEUE -- node sending too many dsq messages, masternode=%s\n", dmn->proTxHash.ToString());
198-
return ret;
197+
return;
199198
}
200199
m_mn_metaman.AllowMixing(dmn->proTxHash);
201200

202201
LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue, masternode=%s, queue=%s\n", dmn->proTxHash.ToString(), dsq.ToString());
203202

204203
TRY_LOCK(cs_vecqueue, lockRecv);
205-
if (!lockRecv) return ret;
204+
if (!lockRecv) return;
206205
vecCoinJoinQueue.push_back(dsq);
207-
ret.m_dsq.push_back(dsq);
206+
m_peer_manager->PeerRelayDSQ(dsq);
208207
}
209-
return ret;
210208
}
211209

212210
void CCoinJoinServer::ProcessDSVIN(CNode& peer, CDataStream& vRecv)
@@ -275,7 +273,8 @@ void CCoinJoinServer::SetNull()
275273
//
276274
void CCoinJoinServer::CheckPool()
277275
{
278-
if (int entries = GetEntriesCount(); entries != 0) LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries);
276+
if (int entries = GetEntriesCount(); entries != 0)
277+
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries);
279278

280279
// If we have an entry for each collateral, then create final tx
281280
if (nState == POOL_STATE_ACCEPTING_ENTRIES && size_t(GetEntriesCount()) == vecSessionCollaterals.size()) {
@@ -286,8 +285,8 @@ void CCoinJoinServer::CheckPool()
286285

287286
// Check for Time Out
288287
// If we timed out while accepting entries, then if we have more than minimum, create final tx
289-
if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut()
290-
&& GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) {
288+
if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut() &&
289+
GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) {
291290
// Punish misbehaving participants
292291
ChargeFees();
293292
// Try to complete this session ignoring the misbehaving ones
@@ -326,7 +325,8 @@ void CCoinJoinServer::CreateFinalTransaction()
326325
sort(txNew.vout.begin(), txNew.vout.end(), CompareOutputBIP69());
327326

328327
finalMutableTransaction = txNew;
329-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", txNew.ToString()); /* Continued */
328+
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", /* Continued */
329+
txNew.ToString());
330330

331331
// request signatures from clients
332332
SetState(POOL_STATE_SIGNING);
@@ -340,14 +340,16 @@ void CCoinJoinServer::CommitFinalTransaction()
340340
CTransactionRef finalTransaction = WITH_LOCK(cs_coinjoin, return MakeTransactionRef(finalMutableTransaction));
341341
uint256 hashTx = finalTransaction->GetHash();
342342

343-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", finalTransaction->ToString()); /* Continued */
343+
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", /* Continued */
344+
finalTransaction->ToString());
344345

345346
{
346347
// See if the transaction is valid
347348
TRY_LOCK(::cs_main, lockMain);
348349
mempool.PrioritiseTransaction(hashTx, 0.1 * COIN);
349350
if (!lockMain || !ATMPIfSaneFee(m_chainman, finalTransaction)) {
350-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n");
351+
LogPrint(BCLog::COINJOIN, /* Continued */
352+
"CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n");
351353
WITH_LOCK(cs_coinjoin, SetNull());
352354
// not much we can do in this case, just notify clients
353355
RelayCompletedTransaction(ERR_INVALID_TX);
@@ -368,7 +370,7 @@ void CCoinJoinServer::CommitFinalTransaction()
368370
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- TRANSMITTING DSTX\n");
369371

370372
CInv inv(MSG_DSTX, hashTx);
371-
m_peerman.RelayInv(inv);
373+
m_peer_manager->PeerRelayInv(inv);
372374

373375
// Tell the clients it was successful
374376
RelayCompletedTransaction(MSG_SUCCESS);
@@ -411,7 +413,9 @@ void CCoinJoinServer::ChargeFees() const
411413

412414
// This queue entry didn't send us the promised transaction
413415
if (!fFound) {
414-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found offence\n");
416+
LogPrint(BCLog::COINJOIN, /* Continued */
417+
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found "
418+
"offence\n");
415419
vecOffendersCollaterals.push_back(txCollateral);
416420
}
417421
}
@@ -423,7 +427,8 @@ void CCoinJoinServer::ChargeFees() const
423427
for (const auto& entry : vecEntries) {
424428
for (const auto& txdsin : entry.vecTxDSIn) {
425429
if (!txdsin.fHasSig) {
426-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n");
430+
LogPrint(BCLog::COINJOIN, /* Continued */
431+
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n");
427432
vecOffendersCollaterals.push_back(entry.txCollateral);
428433
}
429434
}
@@ -443,8 +448,9 @@ void CCoinJoinServer::ChargeFees() const
443448
Shuffle(vecOffendersCollaterals.begin(), vecOffendersCollaterals.end(), FastRandomContext());
444449

445450
if (nState == POOL_STATE_ACCEPTING_ENTRIES || nState == POOL_STATE_SIGNING) {
446-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s", /* Continued */
447-
(nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString());
451+
LogPrint(BCLog::COINJOIN, /* Continued */
452+
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s",
453+
(nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString());
448454
ConsumeCollateral(vecOffendersCollaterals[0]);
449455
}
450456
}
@@ -465,7 +471,8 @@ void CCoinJoinServer::ChargeRandomFees() const
465471
{
466472
for (const auto& txCollateral : vecSessionCollaterals) {
467473
if (GetRand<int>(/*nMax=*/100) > 10) return;
468-
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString()); /* Continued */
474+
LogPrint(BCLog::COINJOIN, /* Continued */
475+
"CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString());
469476
ConsumeCollateral(txCollateral);
470477
}
471478
}
@@ -476,7 +483,7 @@ void CCoinJoinServer::ConsumeCollateral(const CTransactionRef& txref) const
476483
if (!ATMPIfSaneFee(m_chainman, txref)) {
477484
LogPrint(BCLog::COINJOIN, "%s -- ATMPIfSaneFee failed\n", __func__);
478485
} else {
479-
m_peerman.RelayTransaction(txref->GetHash());
486+
m_peer_manager->PeerRelayTransaction(txref->GetHash());
480487
LogPrint(BCLog::COINJOIN, "%s -- Collateral was consumed\n", __func__);
481488
}
482489
}
@@ -521,7 +528,7 @@ void CCoinJoinServer::CheckForCompleteQueue()
521528
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */
522529
"with %d participants\n", dsq.ToString(), vecSessionCollaterals.size());
523530
dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash());
524-
m_peerman.RelayDSQ(dsq);
531+
m_peer_manager->PeerRelayDSQ(dsq);
525532
WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
526533
}
527534
}
@@ -731,7 +738,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage&
731738
GetAdjustedTime(), false);
732739
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString());
733740
dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash());
734-
m_peerman.RelayDSQ(dsq);
741+
m_peer_manager->PeerRelayDSQ(dsq);
735742
LOCK(cs_vecqueue);
736743
vecCoinJoinQueue.push_back(dsq);
737744
}
@@ -891,14 +898,18 @@ void CCoinJoinServer::SetState(PoolState nStateNew)
891898
nState = nStateNew;
892899
}
893900

894-
void CCoinJoinServer::DoMaintenance()
901+
void CCoinJoinServer::Schedule(CScheduler& scheduler)
895902
{
896-
if (!m_mn_sync.IsBlockchainSynced()) return;
897-
if (ShutdownRequested()) return;
898-
899-
CheckForCompleteQueue();
900-
CheckPool();
901-
CheckTimeout();
903+
scheduler.scheduleEvery(
904+
[this]() -> void {
905+
if (!m_mn_sync.IsBlockchainSynced()) return;
906+
if (ShutdownRequested()) return;
907+
908+
CheckForCompleteQueue();
909+
CheckPool();
910+
CheckTimeout();
911+
},
912+
std::chrono::seconds{1});
902913
}
903914

904915
void CCoinJoinServer::GetJsonInfo(UniValue& obj) const
@@ -910,3 +921,19 @@ void CCoinJoinServer::GetJsonInfo(UniValue& obj) const
910921
obj.pushKV("state", GetStateString());
911922
obj.pushKV("entries_count", GetEntriesCount());
912923
}
924+
925+
bool CCoinJoinServer::AlreadyHave(const CInv& inv)
926+
{
927+
return (inv.type == MSG_DSQ) ? HasQueue(inv.hash) : false;
928+
}
929+
930+
bool CCoinJoinServer::ProcessGetData(CNode& pfrom, const CInv& inv, CConnman& connman, const CNetMsgMaker& msgMaker)
931+
{
932+
if (inv.type != MSG_DSQ) return false;
933+
934+
auto opt_dsq = GetQueueFromHash(inv.hash);
935+
if (!opt_dsq.has_value()) return false;
936+
937+
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::DSQUEUE, *opt_dsq));
938+
return true;
939+
}

0 commit comments

Comments
 (0)