Skip to content

Commit 4d712e3

Browse files
committed
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
1 parent c5a8b1b commit 4d712e3

File tree

3 files changed

+24
-16
lines changed

3 files changed

+24
-16
lines changed

src/net.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
12391239
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
12401240
pnode->CloseSocketDisconnect();
12411241
RecordBytesRecv(nBytes);
1242-
if (notify)
1242+
if (notify) {
1243+
auto it(pnode->vRecvMsg.begin());
1244+
for (; it != pnode->vRecvMsg.end(); ++it) {
1245+
if (!it->complete())
1246+
break;
1247+
}
1248+
{
1249+
LOCK(pnode->cs_vProcessMsg);
1250+
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
1251+
}
12431252
WakeMessageHandler();
1253+
}
12441254
}
12451255
else if (nBytes == 0)
12461256
{

src/net.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,9 @@ class CNode
608608
std::deque<std::vector<unsigned char>> vSendMsg;
609609
CCriticalSection cs_vSend;
610610

611+
CCriticalSection cs_vProcessMsg;
612+
std::list<CNetMessage> vProcessMsg;
613+
611614
std::deque<CInv> vRecvGetData;
612615
std::list<CNetMessage> vRecvMsg;
613616
CCriticalSection cs_vRecvMsg;

src/net_processing.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
24682468
if (pfrom->nSendSize >= nMaxSendBufferSize)
24692469
return false;
24702470

2471-
auto it = pfrom->vRecvMsg.begin();
2472-
if (it == pfrom->vRecvMsg.end())
2473-
return false;
2474-
2475-
// end, if an incomplete message is found
2476-
if (!it->complete())
2477-
return false;
2478-
2479-
// get next message
2480-
CNetMessage msg = std::move(*it);
2481-
2482-
// at this point, any failure means we can delete the current message
2483-
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
2484-
2485-
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
2471+
std::list<CNetMessage> msgs;
2472+
{
2473+
LOCK(pfrom->cs_vProcessMsg);
2474+
if (pfrom->vProcessMsg.empty())
2475+
return false;
2476+
// Just take one message
2477+
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
2478+
fMoreWork = !pfrom->vProcessMsg.empty();
2479+
}
2480+
CNetMessage& msg(msgs.front());
24862481

24872482
msg.SetVersion(pfrom->GetRecvVersion());
24882483
// Scan for message start

0 commit comments

Comments
 (0)