Skip to content

Commit d3d7056

Browse files
committed
net: make net processing interruptible
1 parent 0985052 commit d3d7056

File tree

5 files changed

+39
-27
lines changed

5 files changed

+39
-27
lines changed

src/net.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,7 +1856,7 @@ void CConnman::ThreadMessageHandler()
18561856
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
18571857
if (lockRecv)
18581858
{
1859-
if (!GetNodeSignals().ProcessMessages(pnode, *this))
1859+
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
18601860
pnode->CloseSocketDisconnect();
18611861

18621862
if (pnode->nSendSize < GetSendBufferSize())
@@ -1875,7 +1875,7 @@ void CConnman::ThreadMessageHandler()
18751875
{
18761876
TRY_LOCK(pnode->cs_vSend, lockSend);
18771877
if (lockSend)
1878-
GetNodeSignals().SendMessages(pnode, *this);
1878+
GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
18791879
}
18801880
if (flagInterruptMsgProc)
18811881
return;

src/net.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,8 @@ struct CombinerAll
460460
// Signals for message handling
461461
struct CNodeSignals
462462
{
463-
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> ProcessMessages;
464-
boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> SendMessages;
463+
boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> ProcessMessages;
464+
boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> SendMessages;
465465
boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode;
466466
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
467467
};

src/net_processing.cpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
886886
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
887887
}
888888

889-
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman)
889+
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
890890
{
891891
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
892892
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
901901

902902
const CInv &inv = *it;
903903
{
904-
boost::this_thread::interruption_point();
904+
if (interruptMsgProc)
905+
return;
906+
905907
it++;
906908

907909
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK)
@@ -1055,7 +1057,7 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
10551057
return nFetchFlags;
10561058
}
10571059

1058-
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman)
1060+
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
10591061
{
10601062
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
10611063

@@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
12951297
int64_t nSince = nNow - 10 * 60;
12961298
BOOST_FOREACH(CAddress& addr, vAddr)
12971299
{
1298-
boost::this_thread::interruption_point();
1300+
if (interruptMsgProc)
1301+
return true;
12991302

13001303
if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES)
13011304
continue;
@@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
13771380
{
13781381
CInv &inv = vInv[nInv];
13791382

1380-
boost::this_thread::interruption_point();
1383+
if (interruptMsgProc)
1384+
return true;
13811385

13821386
bool fAlreadyHave = AlreadyHave(inv);
13831387
LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id);
@@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
14391443
LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id);
14401444

14411445
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
1442-
ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
1446+
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
14431447
}
14441448

14451449

@@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
15131517
inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
15141518
inv.hash = req.blockhash;
15151519
pfrom->vRecvGetData.push_back(inv);
1516-
ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
1520+
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
15171521
return true;
15181522
}
15191523

@@ -1925,10 +1929,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
19251929
} // cs_main
19261930

19271931
if (fProcessBLOCKTXN)
1928-
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman);
1932+
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
19291933

19301934
if (fRevertToHeaderProcessing)
1931-
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman);
1935+
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc);
19321936

19331937
if (fBlockReconstructed) {
19341938
// If we got here, we were able to optimistically reconstruct a
@@ -2441,7 +2445,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
24412445
}
24422446

24432447
// requires LOCK(cs_vRecvMsg)
2444-
bool ProcessMessages(CNode* pfrom, CConnman& connman)
2448+
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
24452449
{
24462450
const CChainParams& chainparams = Params();
24472451
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
@@ -2459,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
24592463
bool fOk = true;
24602464

24612465
if (!pfrom->vRecvGetData.empty())
2462-
ProcessGetData(pfrom, chainparams.GetConsensus(), connman);
2466+
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
24632467

24642468
// this maintains the order of responses
24652469
if (!pfrom->vRecvGetData.empty()) return fOk;
@@ -2520,8 +2524,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
25202524
bool fRet = false;
25212525
try
25222526
{
2523-
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman);
2524-
boost::this_thread::interruption_point();
2527+
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
2528+
if (interruptMsgProc)
2529+
return true;
25252530
}
25262531
catch (const std::ios_base::failure& e)
25272532
{
@@ -2585,7 +2590,7 @@ class CompareInvMempoolOrder
25852590
}
25862591
};
25872592

2588-
bool SendMessages(CNode* pto, CConnman& connman)
2593+
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc)
25892594
{
25902595
const Consensus::Params& consensusParams = Params().GetConsensus();
25912596
{

src/net_processing.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
3939
void Misbehaving(NodeId nodeid, int howmuch);
4040

4141
/** Process protocol messages received from a given node */
42-
bool ProcessMessages(CNode* pfrom, CConnman& connman);
42+
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interrupt);
4343
/**
4444
* Send queued protocol messages to be sent to a give node.
4545
*
4646
* @param[in] pto The node which we are sending messages to.
4747
* @param[in] connman The connection manager for that node.
48+
* @param[in] interrupt Interrupt condition for processing threads
4849
*/
49-
bool SendMessages(CNode* pto, CConnman& connman);
50+
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
5051

5152
#endif // BITCOIN_NET_PROCESSING_H

src/test/DoS_tests.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,16 @@ BOOST_FIXTURE_TEST_SUITE(DoS_tests, TestingSetup)
4747

4848
BOOST_AUTO_TEST_CASE(DoS_banning)
4949
{
50+
std::atomic<bool> interruptDummy(false);
51+
5052
connman->ClearBanned();
5153
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
5254
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
5355
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
5456
GetNodeSignals().InitializeNode(&dummyNode1, *connman);
5557
dummyNode1.nVersion = 1;
5658
Misbehaving(dummyNode1.GetId(), 100); // Should get banned
57-
SendMessages(&dummyNode1, *connman);
59+
SendMessages(&dummyNode1, *connman, interruptDummy);
5860
BOOST_CHECK(connman->IsBanned(addr1));
5961
BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned
6062

@@ -64,16 +66,18 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
6466
GetNodeSignals().InitializeNode(&dummyNode2, *connman);
6567
dummyNode2.nVersion = 1;
6668
Misbehaving(dummyNode2.GetId(), 50);
67-
SendMessages(&dummyNode2, *connman);
69+
SendMessages(&dummyNode2, *connman, interruptDummy);
6870
BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet...
6971
BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be
7072
Misbehaving(dummyNode2.GetId(), 50);
71-
SendMessages(&dummyNode2, *connman);
73+
SendMessages(&dummyNode2, *connman, interruptDummy);
7274
BOOST_CHECK(connman->IsBanned(addr2));
7375
}
7476

7577
BOOST_AUTO_TEST_CASE(DoS_banscore)
7678
{
79+
std::atomic<bool> interruptDummy(false);
80+
7781
connman->ClearBanned();
7882
ForceSetArg("-banscore", "111"); // because 11 is my favorite number
7983
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
@@ -82,19 +86,21 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
8286
GetNodeSignals().InitializeNode(&dummyNode1, *connman);
8387
dummyNode1.nVersion = 1;
8488
Misbehaving(dummyNode1.GetId(), 100);
85-
SendMessages(&dummyNode1, *connman);
89+
SendMessages(&dummyNode1, *connman, interruptDummy);
8690
BOOST_CHECK(!connman->IsBanned(addr1));
8791
Misbehaving(dummyNode1.GetId(), 10);
88-
SendMessages(&dummyNode1, *connman);
92+
SendMessages(&dummyNode1, *connman, interruptDummy);
8993
BOOST_CHECK(!connman->IsBanned(addr1));
9094
Misbehaving(dummyNode1.GetId(), 1);
91-
SendMessages(&dummyNode1, *connman);
95+
SendMessages(&dummyNode1, *connman, interruptDummy);
9296
BOOST_CHECK(connman->IsBanned(addr1));
9397
ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD));
9498
}
9599

96100
BOOST_AUTO_TEST_CASE(DoS_bantime)
97101
{
102+
std::atomic<bool> interruptDummy(false);
103+
98104
connman->ClearBanned();
99105
int64_t nStartTime = GetTime();
100106
SetMockTime(nStartTime); // Overrides future calls to GetTime()
@@ -106,7 +112,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
106112
dummyNode.nVersion = 1;
107113

108114
Misbehaving(dummyNode.GetId(), 100);
109-
SendMessages(&dummyNode, *connman);
115+
SendMessages(&dummyNode, *connman, interruptDummy);
110116
BOOST_CHECK(connman->IsBanned(addr));
111117

112118
SetMockTime(nStartTime+60*60);

0 commit comments

Comments
 (0)