Skip to content

Commit 8ad663c

Browse files
committed
net: use an interface class rather than signals for message processing
Drop boost signals in favor of a stateful class. This will allow the message processing loop to actually move to net_processing in a future step.
1 parent 28f11e9 commit 8ad663c

File tree

8 files changed

+103
-126
lines changed

8 files changed

+103
-126
lines changed

src/init.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,10 @@ void Shutdown()
195195
#endif
196196
MapPort(false);
197197
UnregisterValidationInterface(peerLogic.get());
198-
peerLogic.reset();
199198
g_connman.reset();
199+
peerLogic.reset();
200200

201201
StopTorControl();
202-
UnregisterNodeSignals(GetNodeSignals());
203202
if (fDumpMempoolLater && gArgs.GetArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) {
204203
DumpMempool();
205204
}
@@ -1268,7 +1267,6 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
12681267

12691268
peerLogic.reset(new PeerLogicValidation(&connman));
12701269
RegisterValidationInterface(peerLogic.get());
1271-
RegisterNodeSignals(GetNodeSignals());
12721270

12731271
// sanitize comments per BIP-0014, format user agent and check total size
12741272
std::vector<std::string> uacomments;
@@ -1659,6 +1657,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
16591657
connOptions.nMaxFeeler = 1;
16601658
connOptions.nBestHeight = chainActive.Height();
16611659
connOptions.uiInterface = &uiInterface;
1660+
connOptions.m_msgproc = peerLogic.get();
16621661
connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
16631662
connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);
16641663

src/net.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,6 @@ std::string strSubVersion;
8989

9090
limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
9191

92-
// Signals for message handling
93-
static CNodeSignals g_signals;
94-
CNodeSignals& GetNodeSignals() { return g_signals; }
95-
9692
void CConnman::AddOneShot(const std::string& strDest)
9793
{
9894
LOCK(cs_vOneShots);
@@ -1114,7 +1110,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
11141110
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true);
11151111
pnode->AddRef();
11161112
pnode->fWhitelisted = whitelisted;
1117-
GetNodeSignals().InitializeNode(pnode, this);
1113+
m_msgproc->InitializeNode(pnode, this);
11181114

11191115
LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());
11201116

@@ -1966,7 +1962,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
19661962
if (fAddnode)
19671963
pnode->fAddnode = true;
19681964

1969-
GetNodeSignals().InitializeNode(pnode, this);
1965+
m_msgproc->InitializeNode(pnode, this);
19701966
{
19711967
LOCK(cs_vNodes);
19721968
vNodes.push_back(pnode);
@@ -1996,16 +1992,16 @@ void CConnman::ThreadMessageHandler()
19961992
continue;
19971993

19981994
// Receive messages
1999-
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, this, flagInterruptMsgProc);
1995+
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, this, flagInterruptMsgProc);
20001996
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
20011997
if (flagInterruptMsgProc)
20021998
return;
2003-
20041999
// Send messages
20052000
{
20062001
LOCK(pnode->cs_sendProcessing);
2007-
GetNodeSignals().SendMessages(pnode, this, flagInterruptMsgProc);
2002+
m_msgproc->SendMessages(pnode, this, flagInterruptMsgProc);
20082003
}
2004+
20092005
if (flagInterruptMsgProc)
20102006
return;
20112007
}
@@ -2324,6 +2320,7 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
23242320
//
23252321
// Start threads
23262322
//
2323+
assert(m_msgproc);
23272324
InterruptSocks5(false);
23282325
interruptNet.reset();
23292326
flagInterruptMsgProc = false;
@@ -2450,9 +2447,10 @@ void CConnman::DeleteNode(CNode* pnode)
24502447
{
24512448
assert(pnode);
24522449
bool fUpdateConnectionTime = false;
2453-
GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
2454-
if(fUpdateConnectionTime)
2450+
m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
2451+
if(fUpdateConnectionTime) {
24552452
addrman.Connected(pnode->addr);
2453+
}
24562454
delete pnode;
24572455
}
24582456

src/net.h

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
#include <arpa/inet.h>
3434
#endif
3535

36-
#include <boost/signals2/signal.hpp>
3736

3837
class CScheduler;
3938
class CNode;
@@ -116,7 +115,7 @@ struct CSerializedNetMsg
116115
std::string command;
117116
};
118117

119-
118+
class NetEventsInterface;
120119
class CConnman
121120
{
122121
public:
@@ -138,6 +137,7 @@ class CConnman
138137
int nMaxFeeler = 0;
139138
int nBestHeight = 0;
140139
CClientUIInterface* uiInterface = nullptr;
140+
NetEventsInterface* m_msgproc = nullptr;
141141
unsigned int nSendBufferMaxSize = 0;
142142
unsigned int nReceiveFloodSize = 0;
143143
uint64_t nMaxOutboundTimeframe = 0;
@@ -158,6 +158,7 @@ class CConnman
158158
nMaxFeeler = connOptions.nMaxFeeler;
159159
nBestHeight = connOptions.nBestHeight;
160160
clientInterface = connOptions.uiInterface;
161+
m_msgproc = connOptions.m_msgproc;
161162
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
162163
nReceiveFloodSize = connOptions.nReceiveFloodSize;
163164
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -398,6 +399,7 @@ class CConnman
398399
int nMaxFeeler;
399400
std::atomic<int> nBestHeight;
400401
CClientUIInterface* clientInterface;
402+
NetEventsInterface* m_msgproc;
401403

402404
/** SipHasher seeds for deterministic randomness */
403405
const uint64_t nSeed0, nSeed1;
@@ -438,19 +440,18 @@ struct CombinerAll
438440
}
439441
};
440442

441-
// Signals for message handling
442-
struct CNodeSignals
443+
/**
444+
* Interface for message handling
445+
*/
446+
class NetEventsInterface
443447
{
444-
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> ProcessMessages;
445-
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> SendMessages;
446-
boost::signals2::signal<void (CNode*, CConnman*)> InitializeNode;
447-
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
448+
public:
449+
virtual bool ProcessMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
450+
virtual bool SendMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
451+
virtual void InitializeNode(CNode* pnode, CConnman* connman) = 0;
452+
virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0;
448453
};
449454

450-
451-
CNodeSignals& GetNodeSignals();
452-
453-
454455
enum
455456
{
456457
LOCAL_NONE, // unknown

src/net_processing.cpp

Lines changed: 46 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ namespace {
123123
std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration;
124124
} // namespace
125125

126-
//////////////////////////////////////////////////////////////////////////////
127-
//
128-
// Registration of network node signals.
129-
//
130-
131126
namespace {
132127

133128
struct CBlockReject {
@@ -265,50 +260,6 @@ void PushNodeVersion(CNode *pnode, CConnman* connman, int64_t nTime)
265260
}
266261
}
267262

268-
void InitializeNode(CNode *pnode, CConnman* connman) {
269-
CAddress addr = pnode->addr;
270-
std::string addrName = pnode->GetAddrName();
271-
NodeId nodeid = pnode->GetId();
272-
{
273-
LOCK(cs_main);
274-
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
275-
}
276-
if(!pnode->fInbound)
277-
PushNodeVersion(pnode, connman, GetTime());
278-
}
279-
280-
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
281-
fUpdateConnectionTime = false;
282-
LOCK(cs_main);
283-
CNodeState *state = State(nodeid);
284-
assert(state != nullptr);
285-
286-
if (state->fSyncStarted)
287-
nSyncStarted--;
288-
289-
if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
290-
fUpdateConnectionTime = true;
291-
}
292-
293-
for (const QueuedBlock& entry : state->vBlocksInFlight) {
294-
mapBlocksInFlight.erase(entry.hash);
295-
}
296-
EraseOrphansFor(nodeid);
297-
nPreferredDownload -= state->fPreferredDownload;
298-
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
299-
assert(nPeersWithValidatedDownloads >= 0);
300-
301-
mapNodeState.erase(nodeid);
302-
303-
if (mapNodeState.empty()) {
304-
// Do a consistency check after the last peer is removed.
305-
assert(mapBlocksInFlight.empty());
306-
assert(nPreferredDownload == 0);
307-
assert(nPeersWithValidatedDownloads == 0);
308-
}
309-
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
310-
}
311-
312263
// Requires cs_main.
313264
// Returns a bool indicating whether we requested this block.
314265
// Also used if a block was /not/ received and timed out or started with another peer
@@ -545,6 +496,50 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con
545496

546497
} // namespace
547498

499+
void PeerLogicValidation::InitializeNode(CNode *pnode, CConnman* connman) {
500+
CAddress addr = pnode->addr;
501+
std::string addrName = pnode->GetAddrName();
502+
NodeId nodeid = pnode->GetId();
503+
{
504+
LOCK(cs_main);
505+
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
506+
}
507+
if(!pnode->fInbound)
508+
PushNodeVersion(pnode, connman, GetTime());
509+
}
510+
511+
void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
512+
fUpdateConnectionTime = false;
513+
LOCK(cs_main);
514+
CNodeState *state = State(nodeid);
515+
assert(state != nullptr);
516+
517+
if (state->fSyncStarted)
518+
nSyncStarted--;
519+
520+
if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
521+
fUpdateConnectionTime = true;
522+
}
523+
524+
for (const QueuedBlock& entry : state->vBlocksInFlight) {
525+
mapBlocksInFlight.erase(entry.hash);
526+
}
527+
EraseOrphansFor(nodeid);
528+
nPreferredDownload -= state->fPreferredDownload;
529+
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
530+
assert(nPeersWithValidatedDownloads >= 0);
531+
532+
mapNodeState.erase(nodeid);
533+
534+
if (mapNodeState.empty()) {
535+
// Do a consistency check after the last peer is removed.
536+
assert(mapBlocksInFlight.empty());
537+
assert(nPreferredDownload == 0);
538+
assert(nPeersWithValidatedDownloads == 0);
539+
}
540+
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
541+
}
542+
548543
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
549544
LOCK(cs_main);
550545
CNodeState *state = State(nodeid);
@@ -560,22 +555,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
560555
return true;
561556
}
562557

563-
void RegisterNodeSignals(CNodeSignals& nodeSignals)
564-
{
565-
nodeSignals.ProcessMessages.connect(&ProcessMessages);
566-
nodeSignals.SendMessages.connect(&SendMessages);
567-
nodeSignals.InitializeNode.connect(&InitializeNode);
568-
nodeSignals.FinalizeNode.connect(&FinalizeNode);
569-
}
570-
571-
void UnregisterNodeSignals(CNodeSignals& nodeSignals)
572-
{
573-
nodeSignals.ProcessMessages.disconnect(&ProcessMessages);
574-
nodeSignals.SendMessages.disconnect(&SendMessages);
575-
nodeSignals.InitializeNode.disconnect(&InitializeNode);
576-
nodeSignals.FinalizeNode.disconnect(&FinalizeNode);
577-
}
578-
579558
//////////////////////////////////////////////////////////////////////////////
580559
//
581560
// mapOrphanTransactions
@@ -2661,7 +2640,7 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman)
26612640
return false;
26622641
}
26632642

2664-
bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
2643+
bool PeerLogicValidation::ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interruptMsgProc)
26652644
{
26662645
const CChainParams& chainparams = Params();
26672646
//
@@ -2798,7 +2777,7 @@ class CompareInvMempoolOrder
27982777
}
27992778
};
28002779

2801-
bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
2780+
bool PeerLogicValidation::SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interruptMsgProc)
28022781
{
28032782
const Consensus::Params& consensusParams = Params().GetConsensus();
28042783
{

src/net_processing.h

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,32 @@ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100;
2222
static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_BASE = 15 * 60 * 1000000; // 15 minutes
2323
static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER = 1000; // 1ms/header
2424

25-
/** Register with a network node to receive its signals */
26-
void RegisterNodeSignals(CNodeSignals& nodeSignals);
27-
/** Unregister a network node */
28-
void UnregisterNodeSignals(CNodeSignals& nodeSignals);
29-
30-
class PeerLogicValidation : public CValidationInterface {
25+
class PeerLogicValidation : public CValidationInterface, public NetEventsInterface {
3126
private:
3227
CConnman* connman;
3328

3429
public:
35-
explicit PeerLogicValidation(CConnman* connmanIn);
30+
explicit PeerLogicValidation(CConnman* connman);
3631

3732
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted) override;
3833
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
3934
void BlockChecked(const CBlock& block, const CValidationState& state) override;
4035
void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;
36+
37+
38+
void InitializeNode(CNode* pnode, CConnman* connman) override;
39+
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override;
40+
/** Process protocol messages received from a given node */
41+
bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interrupt) override;
42+
/**
43+
* Send queued protocol messages to be sent to a give node.
44+
*
45+
* @param[in] pto The node which we are sending messages to.
46+
* @param[in] connman The connection manager for that node.
47+
* @param[in] interrupt Interrupt condition for processing threads
48+
* @return True if there is more work to be done
49+
*/
50+
bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interrupt) override;
4151
};
4252

4353
struct CNodeStateStats {
@@ -52,16 +62,4 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
5262
/** Increase a node's misbehavior score. */
5363
void Misbehaving(NodeId nodeid, int howmuch);
5464

55-
/** Process protocol messages received from a given node */
56-
bool ProcessMessages(CNode* pfrom, CConnman* connman, const std::atomic<bool>& interrupt);
57-
/**
58-
* Send queued protocol messages to be sent to a give node.
59-
*
60-
* @param[in] pto The node which we are sending messages to.
61-
* @param[in] connman The connection manager for that node.
62-
* @param[in] interrupt Interrupt condition for processing threads
63-
* @return True if there is more work to be done
64-
*/
65-
bool SendMessages(CNode* pto, CConnman* connman, const std::atomic<bool>& interrupt);
66-
6765
#endif // BITCOIN_NET_PROCESSING_H

0 commit comments

Comments
 (0)