Skip to content

Commit c6e8a9b

Browse files
committed
net: add a flag to indicate when a node's process queue is full
Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue.
1 parent 4d712e3 commit c6e8a9b

File tree

3 files changed

+11
-12
lines changed

3 files changed

+11
-12
lines changed

src/net.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler()
11651165
}
11661166
{
11671167
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
1168-
if (lockRecv && (
1169-
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
1170-
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
1168+
if (lockRecv && !pnode->fPauseRecv)
11711169
FD_SET(pnode->hSocket, &fdsetRecv);
11721170
}
11731171
}
@@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler()
12401238
pnode->CloseSocketDisconnect();
12411239
RecordBytesRecv(nBytes);
12421240
if (notify) {
1241+
size_t nSizeAdded = 0;
12431242
auto it(pnode->vRecvMsg.begin());
12441243
for (; it != pnode->vRecvMsg.end(); ++it) {
12451244
if (!it->complete())
12461245
break;
1246+
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
12471247
}
12481248
{
12491249
LOCK(pnode->cs_vProcessMsg);
12501250
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
1251+
pnode->nProcessQueueSize += nSizeAdded;
1252+
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
12511253
}
12521254
WakeMessageHandler();
12531255
}
@@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25922594
minFeeFilter = 0;
25932595
lastSentFeeFilter = 0;
25942596
nextSendTimeFeeFilter = 0;
2597+
fPauseRecv = false;
2598+
nProcessQueueSize = 0;
25952599

25962600
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
25972601
mapRecvBytesPerMsgCmd[msg] = 0;

src/net.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ class CNode
610610

611611
CCriticalSection cs_vProcessMsg;
612612
std::list<CNetMessage> vProcessMsg;
613+
size_t nProcessQueueSize;
613614

614615
std::deque<CInv> vRecvGetData;
615616
std::list<CNetMessage> vRecvMsg;
@@ -650,6 +651,7 @@ class CNode
650651
const NodeId id;
651652

652653
const uint64_t nKeyedNetGroup;
654+
std::atomic_bool fPauseRecv;
653655
protected:
654656

655657
mapMsgCmdSize mapSendBytesPerMsgCmd;
@@ -743,15 +745,6 @@ class CNode
743745
return nRefCount;
744746
}
745747

746-
// requires LOCK(cs_vRecvMsg)
747-
unsigned int GetTotalRecvSize()
748-
{
749-
unsigned int total = 0;
750-
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
751-
total += msg.vRecv.size() + 24;
752-
return total;
753-
}
754-
755748
// requires LOCK(cs_vRecvMsg)
756749
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
757750

src/net_processing.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
24752475
return false;
24762476
// Just take one message
24772477
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
2478+
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
2479+
pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
24782480
fMoreWork = !pfrom->vProcessMsg.empty();
24792481
}
24802482
CNetMessage& msg(msgs.front());

0 commit comments

Comments
 (0)