@@ -570,42 +570,42 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
570
570
nLastRecv = nTimeMicros / 1000000 ;
571
571
nRecvBytes += nBytes;
572
572
while (nBytes > 0 ) {
573
-
574
- // get current incomplete message, or create a new one
575
- if (vRecvMsg.empty () ||
576
- vRecvMsg.back ().complete ())
577
- vRecvMsg.push_back (CNetMessage (Params ().MessageStart (), SER_NETWORK, INIT_PROTO_VERSION));
578
-
579
- CNetMessage& msg = vRecvMsg.back ();
580
-
581
573
// absorb network data
582
574
int handled;
583
- if (!msg. in_data )
584
- handled = msg. readHeader (pch, nBytes);
575
+ if (!m_deserializer-> in_data )
576
+ handled = m_deserializer-> readHeader (pch, nBytes);
585
577
else
586
- handled = msg. readData (pch, nBytes);
578
+ handled = m_deserializer-> readData (pch, nBytes);
587
579
588
- if (handled < 0 )
580
+ if (handled < 0 ) {
581
+ m_deserializer->Reset ();
589
582
return false ;
583
+ }
590
584
591
- if (msg. in_data && msg. hdr .nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) {
585
+ if (m_deserializer-> in_data && m_deserializer-> hdr .nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) {
592
586
LogPrint (BCLog::NET, " Oversized message from peer=%i, disconnecting\n " , GetId ());
587
+ m_deserializer->Reset ();
593
588
return false ;
594
589
}
595
590
596
591
pch += handled;
597
592
nBytes -= handled;
598
593
599
- if (msg.complete ()) {
594
+ if (m_deserializer->complete ()) {
595
+ // decompose a transport agnostic CNetMessage from the deserializer
596
+ CNetMessage msg = m_deserializer->GetMessage (Params ().MessageStart (), nTimeMicros);
597
+
600
598
// store received bytes per message command
601
599
// to prevent a memory DOS, only allow valid commands
602
- mapMsgCmdSize::iterator i = mapRecvBytesPerMsgCmd.find (msg. hdr .pchCommand );
600
+ mapMsgCmdSize::iterator i = mapRecvBytesPerMsgCmd.find (m_deserializer-> hdr .pchCommand );
603
601
if (i == mapRecvBytesPerMsgCmd.end ())
604
602
i = mapRecvBytesPerMsgCmd.find (NET_MESSAGE_COMMAND_OTHER);
605
603
assert (i != mapRecvBytesPerMsgCmd.end ());
606
- i->second += msg.hdr .nMessageSize + CMessageHeader::HEADER_SIZE;
604
+ i->second += m_deserializer->hdr .nMessageSize + CMessageHeader::HEADER_SIZE;
605
+
606
+ // push the message to the process queue,
607
+ vRecvMsg.push_back (std::move (msg));
607
608
608
- msg.nTime = nTimeMicros;
609
609
complete = true ;
610
610
}
611
611
}
@@ -639,8 +639,7 @@ int CNode::GetSendVersion() const
639
639
return nSendVersion;
640
640
}
641
641
642
-
643
- int CNetMessage::readHeader (const char *pch, unsigned int nBytes)
642
+ int TransportDeserializer::readHeader (const char *pch, unsigned int nBytes)
644
643
{
645
644
// copy data to temporary parsing buffer
646
645
unsigned int nRemaining = 24 - nHdrPos;
@@ -671,7 +670,7 @@ int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
671
670
return nCopy;
672
671
}
673
672
674
- int CNetMessage ::readData (const char *pch, unsigned int nBytes)
673
+ int TransportDeserializer ::readData (const char *pch, unsigned int nBytes)
675
674
{
676
675
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
677
676
unsigned int nCopy = std::min (nRemaining, nBytes);
@@ -688,14 +687,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
688
687
return nCopy;
689
688
}
690
689
691
- const uint256& CNetMessage ::GetMessageHash () const
690
+ const uint256& TransportDeserializer ::GetMessageHash () const
692
691
{
693
692
assert (complete ());
694
693
if (data_hash.IsNull ())
695
694
hasher.Finalize (data_hash.begin ());
696
695
return data_hash;
697
696
}
698
697
698
+ CNetMessage TransportDeserializer::GetMessage (const CMessageHeader::MessageStartChars& message_start, int64_t time) {
699
+ // decompose a single CNetMessage from the TransportDeserializer
700
+ CNetMessage msg (std::move (vRecv));
701
+
702
+ // store state about valid header, netmagic and checksum
703
+ msg.m_valid_header = hdr.IsValid (message_start);
704
+ msg.m_valid_netmagic = (memcmp (hdr.pchMessageStart , message_start, CMessageHeader::MESSAGE_START_SIZE) == 0 );
705
+ uint256 hash = GetMessageHash ();
706
+
707
+ // store command string, payload size
708
+ msg.m_command = hdr.GetCommand ();
709
+ msg.m_message_size = hdr.nMessageSize ;
710
+
711
+ msg.m_valid_checksum = (memcmp (hash.begin (), hdr.pchChecksum , CMessageHeader::CHECKSUM_SIZE) == 0 );
712
+ if (!msg.m_valid_checksum ) {
713
+ LogPrint (BCLog::NET, " CHECKSUM ERROR (%s, %u bytes), expected %s was %s\n " ,
714
+ SanitizeString (msg.m_command ), msg.m_message_size ,
715
+ HexStr (hash.begin (), hash.begin ()+CMessageHeader::CHECKSUM_SIZE),
716
+ HexStr (hdr.pchChecksum , hdr.pchChecksum +CMessageHeader::CHECKSUM_SIZE));
717
+ }
718
+
719
+ // store receive time
720
+ msg.m_time = time;
721
+
722
+ // reset the network deserializer (prepare for the next message)
723
+ Reset ();
724
+ return msg;
725
+ }
726
+
699
727
size_t CConnman::SocketSendData (CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_vSend)
700
728
{
701
729
auto it = pnode->vSendMsg .begin ();
@@ -1347,9 +1375,9 @@ void CConnman::SocketHandler()
1347
1375
size_t nSizeAdded = 0 ;
1348
1376
auto it (pnode->vRecvMsg .begin ());
1349
1377
for (; it != pnode->vRecvMsg .end (); ++it) {
1350
- if (!it-> complete ())
1351
- break ;
1352
- nSizeAdded += it->vRecv .size () + CMessageHeader::HEADER_SIZE;
1378
+ // vRecvMsg contains only completed CNetMessage
1379
+ // the single possible partially deserialized message are held by TransportDeserializer
1380
+ nSizeAdded += it->m_recv .size () + CMessageHeader::HEADER_SIZE;
1353
1381
}
1354
1382
{
1355
1383
LOCK (pnode->cs_vProcessMsg );
@@ -2678,6 +2706,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
2678
2706
} else {
2679
2707
LogPrint (BCLog::NET, " Added connection peer=%d\n " , id);
2680
2708
}
2709
+
2710
+ m_deserializer = MakeUnique<TransportDeserializer>(TransportDeserializer (Params ().MessageStart (), SER_NETWORK, INIT_PROTO_VERSION));
2681
2711
}
2682
2712
2683
2713
CNode::~CNode ()
0 commit comments