Skip to content

Commit c5a8b1b

Browse files
committed
net: rework the way that the messagehandler sleeps
In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases.
1 parent c72cc88 commit c5a8b1b

File tree

4 files changed

+45
-37
lines changed

4 files changed

+45
-37
lines changed

src/net.cpp

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler()
13171317

13181318
void CConnman::WakeMessageHandler()
13191319
{
1320+
{
1321+
std::lock_guard<std::mutex> lock(mutexMsgProc);
1322+
fMsgProcWake = true;
1323+
}
13201324
condMsgProc.notify_one();
13211325
}
13221326

@@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler()
18391843
}
18401844
}
18411845

1842-
bool fSleep = true;
1846+
bool fMoreWork = false;
18431847

18441848
BOOST_FOREACH(CNode* pnode, vNodesCopy)
18451849
{
@@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler()
18511855
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
18521856
if (lockRecv)
18531857
{
1854-
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
1855-
pnode->CloseSocketDisconnect();
1856-
1857-
if (pnode->nSendSize < GetSendBufferSize())
1858-
{
1859-
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete()))
1860-
{
1861-
fSleep = false;
1862-
}
1863-
}
1858+
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
1859+
fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
18641860
}
18651861
}
18661862
if (flagInterruptMsgProc)
@@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler()
18821878
pnode->Release();
18831879
}
18841880

1885-
if (fSleep) {
1886-
std::unique_lock<std::mutex> lock(mutexMsgProc);
1887-
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
1881+
std::unique_lock<std::mutex> lock(mutexMsgProc);
1882+
if (!fMoreWork) {
1883+
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
18881884
}
1885+
fMsgProcWake = false;
18891886
}
18901887
}
18911888

@@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
21562153
interruptNet.reset();
21572154
flagInterruptMsgProc = false;
21582155

2156+
{
2157+
std::unique_lock<std::mutex> lock(mutexMsgProc);
2158+
fMsgProcWake = false;
2159+
}
2160+
21592161
// Send and receive from sockets, accept connections
21602162
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
21612163

src/net.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,9 @@ class CConnman
424424
/** SipHasher seeds for deterministic randomness */
425425
const uint64_t nSeed0, nSeed1;
426426

427+
/** flag for waking the message processor. */
428+
bool fMsgProcWake;
429+
427430
std::condition_variable condMsgProc;
428431
std::mutex mutexMsgProc;
429432
std::atomic<bool> flagInterruptMsgProc;

src/net_processing.cpp

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2453,44 +2453,51 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
24532453
// (4) checksum
24542454
// (x) data
24552455
//
2456-
bool fOk = true;
2456+
bool fMoreWork = false;
24572457

24582458
if (!pfrom->vRecvGetData.empty())
24592459
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
24602460

2461+
if (pfrom->fDisconnect)
2462+
return false;
2463+
24612464
// this maintains the order of responses
2462-
if (!pfrom->vRecvGetData.empty()) return fOk;
2465+
if (!pfrom->vRecvGetData.empty()) return true;
24632466

2464-
auto it = pfrom->vRecvMsg.begin();
2465-
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
24662467
// Don't bother if send buffer is too full to respond anyway
24672468
if (pfrom->nSendSize >= nMaxSendBufferSize)
2468-
break;
2469+
return false;
24692470

2470-
// get next message
2471-
CNetMessage& msg = *it;
2471+
auto it = pfrom->vRecvMsg.begin();
2472+
if (it == pfrom->vRecvMsg.end())
2473+
return false;
24722474

24732475
// end, if an incomplete message is found
2474-
if (!msg.complete())
2475-
break;
2476+
if (!it->complete())
2477+
return false;
2478+
2479+
// get next message
2480+
CNetMessage msg = std::move(*it);
24762481

24772482
// at this point, any failure means we can delete the current message
2478-
it++;
2483+
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
2484+
2485+
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
24792486

24802487
msg.SetVersion(pfrom->GetRecvVersion());
24812488
// Scan for message start
24822489
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
24832490
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
2484-
fOk = false;
2485-
break;
2491+
pfrom->fDisconnect = true;
2492+
return false;
24862493
}
24872494

24882495
// Read header
24892496
CMessageHeader& hdr = msg.hdr;
24902497
if (!hdr.IsValid(chainparams.MessageStart()))
24912498
{
24922499
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
2493-
continue;
2500+
return fMoreWork;
24942501
}
24952502
string strCommand = hdr.GetCommand();
24962503

@@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
25062513
SanitizeString(strCommand), nMessageSize,
25072514
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
25082515
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
2509-
continue;
2516+
return fMoreWork;
25102517
}
25112518

25122519
// Process message
@@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
25152522
{
25162523
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
25172524
if (interruptMsgProc)
2518-
return true;
2525+
return false;
2526+
if (!pfrom->vRecvGetData.empty())
2527+
fMoreWork = true;
25192528
}
25202529
catch (const std::ios_base::failure& e)
25212530
{
@@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
25492558
if (!fRet)
25502559
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
25512560

2552-
break;
2553-
}
2554-
2555-
// In case the connection got shut down, its receive buffer was wiped
2556-
if (!pfrom->fDisconnect)
2557-
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
2558-
2559-
return fOk;
2561+
return fMoreWork;
25602562
}
25612563

25622564
class CompareInvMempoolOrder

src/net_processing.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
4646
* @param[in] pto The node which we are sending messages to.
4747
* @param[in] connman The connection manager for that node.
4848
* @param[in] interrupt Interrupt condition for processing threads
49+
* @return True if there is more work to be done
4950
*/
5051
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);
5152

0 commit comments

Comments
 (0)