@@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect()
437
437
LogPrint (" net" , " disconnecting peer=%d\n " , id);
438
438
CloseSocket (hSocket);
439
439
}
440
-
441
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
442
- TRY_LOCK (cs_vRecvMsg, lockRecv);
443
- if (lockRecv)
444
- vRecvMsg.clear ();
445
440
}
446
441
447
442
void CConnman::ClearBanned ()
@@ -650,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats)
650
645
}
651
646
#undef X
652
647
653
- // requires LOCK(cs_vRecvMsg)
654
648
bool CNode::ReceiveMsgBytes (const char *pch, unsigned int nBytes, bool & complete)
655
649
{
656
650
complete = false ;
651
+ int64_t nTimeMicros = GetTimeMicros ();
652
+ nLastRecv = nTimeMicros / 1000000 ;
653
+ nRecvBytes += nBytes;
657
654
while (nBytes > 0 ) {
658
655
659
656
// get current incomplete message, or create a new one
660
657
if (vRecvMsg.empty () ||
661
658
vRecvMsg.back ().complete ())
662
- vRecvMsg.push_back (CNetMessage (Params ().MessageStart (), SER_NETWORK, nRecvVersion ));
659
+ vRecvMsg.push_back (CNetMessage (Params ().MessageStart (), SER_NETWORK, INIT_PROTO_VERSION ));
663
660
664
661
CNetMessage& msg = vRecvMsg.back ();
665
662
@@ -691,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
691
688
assert (i != mapRecvBytesPerMsgCmd.end ());
692
689
i->second += msg.hdr .nMessageSize + CMessageHeader::HEADER_SIZE;
693
690
694
- msg.nTime = GetTimeMicros () ;
691
+ msg.nTime = nTimeMicros ;
695
692
complete = true ;
696
693
}
697
694
}
@@ -764,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
764
761
765
762
766
763
// requires LOCK(cs_vSend)
767
- size_t SocketSendData (CNode *pnode)
764
+ size_t CConnman:: SocketSendData (CNode *pnode)
768
765
{
769
766
auto it = pnode->vSendMsg .begin ();
770
767
size_t nSentSize = 0 ;
@@ -781,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
781
778
if (pnode->nSendOffset == data.size ()) {
782
779
pnode->nSendOffset = 0 ;
783
780
pnode->nSendSize -= data.size ();
781
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
784
782
it++;
785
783
} else {
786
784
// could not send full message; stop sending more
@@ -1052,8 +1050,7 @@ void CConnman::ThreadSocketHandler()
1052
1050
std::vector<CNode*> vNodesCopy = vNodes;
1053
1051
BOOST_FOREACH (CNode* pnode, vNodesCopy)
1054
1052
{
1055
- if (pnode->fDisconnect ||
1056
- (pnode->GetRefCount () <= 0 && pnode->vRecvMsg .empty () && pnode->nSendSize == 0 ))
1053
+ if (pnode->fDisconnect )
1057
1054
{
1058
1055
// remove from vNodes
1059
1056
vNodes.erase (remove (vNodes.begin (), vNodes.end (), pnode), vNodes.end ());
@@ -1083,13 +1080,9 @@ void CConnman::ThreadSocketHandler()
1083
1080
TRY_LOCK (pnode->cs_vSend , lockSend);
1084
1081
if (lockSend)
1085
1082
{
1086
- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1087
- if (lockRecv)
1088
- {
1089
1083
TRY_LOCK (pnode->cs_inventory , lockInv);
1090
1084
if (lockInv)
1091
1085
fDelete = true ;
1092
- }
1093
1086
}
1094
1087
}
1095
1088
if (fDelete )
@@ -1149,15 +1142,10 @@ void CConnman::ThreadSocketHandler()
1149
1142
// write buffer in this case before receiving more. This avoids
1150
1143
// needlessly queueing received data, if the remote peer is not themselves
1151
1144
// receiving data. This means properly utilizing TCP flow control signalling.
1152
- // * Otherwise, if there is no (complete) message in the receive buffer,
1153
- // or there is space left in the buffer, select() for receiving data.
1154
- // * (if neither of the above applies, there is certainly one message
1155
- // in the receiver buffer ready to be processed).
1156
- // Together, that means that at least one of the following is always possible,
1157
- // so we don't deadlock:
1158
- // * We send some data.
1159
- // * We wait for data to be received (and disconnect after timeout).
1160
- // * We process a message in the buffer (message handler thread).
1145
+ // * Otherwise, if there is space left in the receive buffer, select() for
1146
+ // receiving data.
1147
+ // * Hand off all complete messages to the processor, to be handled without
1148
+ // blocking here.
1161
1149
{
1162
1150
TRY_LOCK (pnode->cs_vSend , lockSend);
1163
1151
if (lockSend) {
@@ -1168,10 +1156,7 @@ void CConnman::ThreadSocketHandler()
1168
1156
}
1169
1157
}
1170
1158
{
1171
- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1172
- if (lockRecv && (
1173
- pnode->vRecvMsg .empty () || !pnode->vRecvMsg .front ().complete () ||
1174
- pnode->GetTotalRecvSize () <= GetReceiveFloodSize ()))
1159
+ if (!pnode->fPauseRecv )
1175
1160
FD_SET (pnode->hSocket , &fdsetRecv);
1176
1161
}
1177
1162
}
@@ -1230,8 +1215,6 @@ void CConnman::ThreadSocketHandler()
1230
1215
continue ;
1231
1216
if (FD_ISSET (pnode->hSocket , &fdsetRecv) || FD_ISSET (pnode->hSocket , &fdsetError))
1232
1217
{
1233
- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1234
- if (lockRecv)
1235
1218
{
1236
1219
{
1237
1220
// typical socket buffer is 8K-64K
@@ -1242,11 +1225,23 @@ void CConnman::ThreadSocketHandler()
1242
1225
bool notify = false ;
1243
1226
if (!pnode->ReceiveMsgBytes (pchBuf, nBytes, notify))
1244
1227
pnode->CloseSocketDisconnect ();
1245
- if (notify)
1246
- condMsgProc.notify_one ();
1247
- pnode->nLastRecv = GetTime ();
1248
- pnode->nRecvBytes += nBytes;
1249
1228
RecordBytesRecv (nBytes);
1229
+ if (notify) {
1230
+ size_t nSizeAdded = 0 ;
1231
+ auto it (pnode->vRecvMsg .begin ());
1232
+ for (; it != pnode->vRecvMsg .end (); ++it) {
1233
+ if (!it->complete ())
1234
+ break ;
1235
+ nSizeAdded += it->vRecv .size () + CMessageHeader::HEADER_SIZE;
1236
+ }
1237
+ {
1238
+ LOCK (pnode->cs_vProcessMsg );
1239
+ pnode->vProcessMsg .splice (pnode->vProcessMsg .end (), pnode->vRecvMsg , pnode->vRecvMsg .begin (), it);
1240
+ pnode->nProcessQueueSize += nSizeAdded;
1241
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
1242
+ }
1243
+ WakeMessageHandler ();
1244
+ }
1250
1245
}
1251
1246
else if (nBytes == 0 )
1252
1247
{
@@ -1280,8 +1275,9 @@ void CConnman::ThreadSocketHandler()
1280
1275
TRY_LOCK (pnode->cs_vSend , lockSend);
1281
1276
if (lockSend) {
1282
1277
size_t nBytes = SocketSendData (pnode);
1283
- if (nBytes)
1278
+ if (nBytes) {
1284
1279
RecordBytesSent (nBytes);
1280
+ }
1285
1281
}
1286
1282
}
1287
1283
@@ -1321,8 +1317,14 @@ void CConnman::ThreadSocketHandler()
1321
1317
}
1322
1318
}
1323
1319
1324
-
1325
-
1320
+ void CConnman::WakeMessageHandler ()
1321
+ {
1322
+ {
1323
+ std::lock_guard<std::mutex> lock (mutexMsgProc);
1324
+ fMsgProcWake = true ;
1325
+ }
1326
+ condMsgProc.notify_one ();
1327
+ }
1326
1328
1327
1329
1328
1330
@@ -1858,30 +1860,16 @@ void CConnman::ThreadMessageHandler()
1858
1860
}
1859
1861
}
1860
1862
1861
- bool fSleep = true ;
1863
+ bool fMoreWork = false ;
1862
1864
1863
1865
BOOST_FOREACH (CNode* pnode, vNodesCopy)
1864
1866
{
1865
1867
if (pnode->fDisconnect )
1866
1868
continue ;
1867
1869
1868
1870
// Receive messages
1869
- {
1870
- TRY_LOCK (pnode->cs_vRecvMsg , lockRecv);
1871
- if (lockRecv)
1872
- {
1873
- if (!GetNodeSignals ().ProcessMessages (pnode, *this , flagInterruptMsgProc))
1874
- pnode->CloseSocketDisconnect ();
1875
-
1876
- if (pnode->nSendSize < GetSendBufferSize ())
1877
- {
1878
- if (!pnode->vRecvGetData .empty () || (!pnode->vRecvMsg .empty () && pnode->vRecvMsg [0 ].complete ()))
1879
- {
1880
- fSleep = false ;
1881
- }
1882
- }
1883
- }
1884
- }
1871
+ bool fMoreNodeWork = GetNodeSignals ().ProcessMessages (pnode, *this , flagInterruptMsgProc);
1872
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend );
1885
1873
if (flagInterruptMsgProc)
1886
1874
return ;
1887
1875
@@ -1901,10 +1889,11 @@ void CConnman::ThreadMessageHandler()
1901
1889
pnode->Release ();
1902
1890
}
1903
1891
1904
- if ( fSleep ) {
1905
- std::unique_lock<std::mutex> lock (mutexMsgProc);
1906
- condMsgProc.wait_until (lock, std::chrono::steady_clock::now () + std::chrono::milliseconds (100 ));
1892
+ std::unique_lock<std::mutex> lock (mutexMsgProc);
1893
+ if (! fMoreWork ) {
1894
+ condMsgProc.wait_until (lock, std::chrono::steady_clock::now () + std::chrono::milliseconds (100 ), [ this ] { return fMsgProcWake ; } );
1907
1895
}
1896
+ fMsgProcWake = false ;
1908
1897
}
1909
1898
}
1910
1899
@@ -2121,7 +2110,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
2121
2110
nMaxFeeler = connOptions.nMaxFeeler ;
2122
2111
2123
2112
nSendBufferMaxSize = connOptions.nSendBufferMaxSize ;
2124
- nReceiveFloodSize = connOptions.nSendBufferMaxSize ;
2113
+ nReceiveFloodSize = connOptions.nReceiveFloodSize ;
2125
2114
2126
2115
nMaxOutboundLimit = connOptions.nMaxOutboundLimit ;
2127
2116
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe ;
@@ -2182,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
2182
2171
interruptNet.reset ();
2183
2172
flagInterruptMsgProc = false ;
2184
2173
2174
+ {
2175
+ std::unique_lock<std::mutex> lock (mutexMsgProc);
2176
+ fMsgProcWake = false ;
2177
+ }
2178
+
2185
2179
// Send and receive from sockets, accept connections
2186
2180
threadSocketHandler = std::thread (&TraceThread<std::function<void ()> >, " net" , std::function<void ()>(std::bind (&CConnman::ThreadSocketHandler, this )));
2187
2181
@@ -2613,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
2613
2607
minFeeFilter = 0 ;
2614
2608
lastSentFeeFilter = 0 ;
2615
2609
nextSendTimeFeeFilter = 0 ;
2610
+ fPauseRecv = false ;
2611
+ fPauseSend = false ;
2612
+ nProcessQueueSize = 0 ;
2616
2613
2617
2614
BOOST_FOREACH (const std::string &msg, getAllNetMessageTypes ())
2618
2615
mapRecvBytesPerMsgCmd[msg] = 0 ;
@@ -2692,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
2692
2689
pnode->mapSendBytesPerMsgCmd [msg.command ] += nTotalSize;
2693
2690
pnode->nSendSize += nTotalSize;
2694
2691
2692
+ if (pnode->nSendSize > nSendBufferMaxSize)
2693
+ pnode->fPauseSend = true ;
2695
2694
pnode->vSendMsg .push_back (std::move (serializedHeader));
2696
2695
if (nMessageSize)
2697
2696
pnode->vSendMsg .push_back (std::move (msg.data ));
0 commit comments