Skip to content

Commit 991955e

Browse files
committed
net: add a flag to indicate when a node's send buffer is full
Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer.
1 parent c6e8a9b commit 991955e

File tree

3 files changed

+12
-8
lines changed

3 files changed

+12
-8
lines changed

src/net.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
761761

762762

763763
// requires LOCK(cs_vSend)
764-
size_t SocketSendData(CNode *pnode)
764+
size_t CConnman::SocketSendData(CNode *pnode)
765765
{
766766
auto it = pnode->vSendMsg.begin();
767767
size_t nSentSize = 0;
@@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
778778
if (pnode->nSendOffset == data.size()) {
779779
pnode->nSendOffset = 0;
780780
pnode->nSendSize -= data.size();
781+
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
781782
it++;
782783
} else {
783784
// could not send full message; stop sending more
@@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler()
12861287
TRY_LOCK(pnode->cs_vSend, lockSend);
12871288
if (lockSend) {
12881289
size_t nBytes = SocketSendData(pnode);
1289-
if (nBytes)
1290+
if (nBytes) {
12901291
RecordBytesSent(nBytes);
1292+
}
12911293
}
12921294
}
12931295

@@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler()
18681870
if (lockRecv)
18691871
{
18701872
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
1871-
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
1873+
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
18721874
}
18731875
}
18741876
if (flagInterruptMsgProc)
@@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25952597
lastSentFeeFilter = 0;
25962598
nextSendTimeFeeFilter = 0;
25972599
fPauseRecv = false;
2600+
fPauseSend = false;
25982601
nProcessQueueSize = 0;
25992602

26002603
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
@@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
26752678
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
26762679
pnode->nSendSize += nTotalSize;
26772680

2681+
if (pnode->nSendSize > nSendBufferMaxSize)
2682+
pnode->fPauseSend = true;
26782683
pnode->vSendMsg.push_back(std::move(serializedHeader));
26792684
if (nMessageSize)
26802685
pnode->vSendMsg.push_back(std::move(msg.data));

src/net.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ class CConnman
358358

359359
NodeId GetNewNodeId();
360360

361+
size_t SocketSendData(CNode *pnode);
361362
//!check is the banlist has unwritten changes
362363
bool BannedSetIsDirty();
363364
//!set the "dirty" flag for the banlist
@@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup);
444445
void MapPort(bool fUseUPnP);
445446
unsigned short GetListenPort();
446447
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
447-
size_t SocketSendData(CNode *pnode);
448448

449449
struct CombinerAll
450450
{
@@ -652,6 +652,7 @@ class CNode
652652

653653
const uint64_t nKeyedNetGroup;
654654
std::atomic_bool fPauseRecv;
655+
std::atomic_bool fPauseSend;
655656
protected:
656657

657658
mapMsgCmdSize mapSendBytesPerMsgCmd;

src/net_processing.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
889889
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
890890
{
891891
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
892-
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
893892
vector<CInv> vNotFound;
894893
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
895894
LOCK(cs_main);
896895

897896
while (it != pfrom->vRecvGetData.end()) {
898897
// Don't bother if send buffer is too full to respond anyway
899-
if (pfrom->nSendSize >= nMaxSendBufferSize)
898+
if (pfrom->fPauseSend)
900899
break;
901900

902901
const CInv &inv = *it;
@@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
24442443
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
24452444
{
24462445
const CChainParams& chainparams = Params();
2447-
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
24482446
//
24492447
// Message format
24502448
// (4) message start
@@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
24652463
if (!pfrom->vRecvGetData.empty()) return true;
24662464

24672465
// Don't bother if send buffer is too full to respond anyway
2468-
if (pfrom->nSendSize >= nMaxSendBufferSize)
2466+
if (pfrom->fPauseSend)
24692467
return false;
24702468

24712469
std::list<CNetMessage> msgs;

0 commit comments

Comments
 (0)