Skip to content

Commit 76fec09

Browse files
committed
Merge #9128: net: Decouple CConnman and message serialization
c7be56d net: push only raw data into CConnman (Cory Fields) 2ec935d net: add CVectorWriter and CNetMsgMaker (Cory Fields) b7695c2 net: No need to check individually for disconnection anymore (Cory Fields) fedea8a net: don't send any messages before handshake or after requested disconnect (Cory Fields) d74e352 net: Set feelers to disconnect at the end of the version message (Cory Fields)
2 parents e22f409 + c7be56d commit 76fec09

File tree

7 files changed

+308
-152
lines changed

7 files changed

+308
-152
lines changed

src/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ BITCOIN_CORE_H = \
111111
net.h \
112112
netaddress.h \
113113
netbase.h \
114+
netmessagemaker.h \
114115
noui.h \
115116
policy/fees.h \
116117
policy/policy.h \

src/main.cpp

Lines changed: 108 additions & 93 deletions
Large diffs are not rendered by default.

src/net.cpp

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -768,13 +768,13 @@ const uint256& CNetMessage::GetMessageHash() const
768768
// requires LOCK(cs_vSend)
769769
size_t SocketSendData(CNode *pnode)
770770
{
771-
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
771+
auto it = pnode->vSendMsg.begin();
772772
size_t nSentSize = 0;
773773

774774
while (it != pnode->vSendMsg.end()) {
775-
const CSerializeData &data = *it;
775+
const auto &data = *it;
776776
assert(data.size() > pnode->nSendOffset);
777-
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
777+
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
778778
if (nBytes > 0) {
779779
pnode->nLastSend = GetTime();
780780
pnode->nSendBytes += nBytes;
@@ -2612,30 +2612,19 @@ void CNode::AskFor(const CInv& inv)
26122612
mapAskFor.insert(std::make_pair(nRequestTime, inv));
26132613
}
26142614

2615-
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
2615+
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
26162616
{
2617-
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
2618-
}
2619-
2620-
void CConnman::EndMessage(CDataStream& strm)
2621-
{
2622-
// Set the size
2623-
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
2624-
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2625-
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2626-
// Set the checksum
2627-
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
2628-
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
2629-
2630-
}
2617+
size_t nMessageSize = msg.data.size();
2618+
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
2619+
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);
26312620

2632-
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
2633-
{
2634-
if(strm.empty())
2635-
return;
2621+
std::vector<unsigned char> serializedHeader;
2622+
serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
2623+
uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize);
2624+
CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize);
2625+
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
26362626

2637-
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2638-
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
2627+
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr};
26392628

26402629
size_t nBytesSent = 0;
26412630
{
@@ -2644,11 +2633,14 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
26442633
return;
26452634
}
26462635
bool optimisticSend(pnode->vSendMsg.empty());
2647-
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
26482636

26492637
//log total amount of bytes per command
2650-
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
2651-
pnode->nSendSize += strm.size();
2638+
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
2639+
pnode->nSendSize += nTotalSize;
2640+
2641+
pnode->vSendMsg.push_back(std::move(serializedHeader));
2642+
if (nMessageSize)
2643+
pnode->vSendMsg.push_back(std::move(msg.data));
26522644

26532645
// If write queue empty, attempt "optimistic write"
26542646
if (optimisticSend == true)

src/net.h

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ class CTransaction;
101101
class CNodeStats;
102102
class CClientUIInterface;
103103

104+
struct CSerializedNetMsg
105+
{
106+
CSerializedNetMsg() = default;
107+
CSerializedNetMsg(CSerializedNetMsg&&) = default;
108+
CSerializedNetMsg& operator=(CSerializedNetMsg&&) = default;
109+
// No copying, only moves.
110+
CSerializedNetMsg(const CSerializedNetMsg& msg) = delete;
111+
CSerializedNetMsg& operator=(const CSerializedNetMsg&) = delete;
112+
113+
std::vector<unsigned char> data;
114+
std::string command;
115+
};
116+
117+
104118
class CConnman
105119
{
106120
public:
@@ -138,32 +152,7 @@ class CConnman
138152

139153
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
140154

141-
template <typename... Args>
142-
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
143-
{
144-
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
145-
::SerializeMany(msg, std::forward<Args>(args)...);
146-
EndMessage(msg);
147-
PushMessage(pnode, msg, sCommand);
148-
}
149-
150-
template <typename... Args>
151-
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
152-
{
153-
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
154-
}
155-
156-
template <typename... Args>
157-
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
158-
{
159-
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
160-
}
161-
162-
template <typename... Args>
163-
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
164-
{
165-
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
166-
}
155+
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
167156

168157
template<typename Callable>
169158
bool ForEachNodeContinueIf(Callable&& func)
@@ -374,10 +363,6 @@ class CConnman
374363

375364
unsigned int GetReceiveFloodSize() const;
376365

377-
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
378-
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
379-
void EndMessage(CDataStream& strm);
380-
381366
// Network stats
382367
void RecordBytesRecv(uint64_t bytes);
383368
void RecordBytesSent(uint64_t bytes);
@@ -601,7 +586,7 @@ class CNode
601586
size_t nSendSize; // total size of all vSendMsg entries
602587
size_t nSendOffset; // offset inside the first vSendMsg already sent
603588
uint64_t nSendBytes;
604-
std::deque<CSerializeData> vSendMsg;
589+
std::deque<std::vector<unsigned char>> vSendMsg;
605590
CCriticalSection cs_vSend;
606591

607592
std::deque<CInv> vRecvGetData;
@@ -771,7 +756,7 @@ class CNode
771756
{
772757
// The send version should always be explicitly set to
773758
// INIT_PROTO_VERSION rather than using this value until the handshake
774-
// is complete. See PushMessageWithVersion().
759+
// is complete.
775760
assert(nSendVersion != 0);
776761
return nSendVersion;
777762
}

src/netmessagemaker.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2009-2010 Satoshi Nakamoto
2+
// Copyright (c) 2009-2016 The Bitcoin Core developers
3+
// Distributed under the MIT software license, see the accompanying
4+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
6+
#ifndef BITCOIN_NETMESSAGEMAKER_H
7+
#define BITCOIN_NETMESSAGEMAKER_H
8+
9+
#include "net.h"
10+
#include "serialize.h"
11+
12+
class CNetMsgMaker
13+
{
14+
public:
15+
CNetMsgMaker(int nVersionIn) : nVersion(nVersionIn){}
16+
17+
template <typename... Args>
18+
CSerializedNetMsg Make(int nFlags, std::string sCommand, Args&&... args)
19+
{
20+
CSerializedNetMsg msg;
21+
msg.command = std::move(sCommand);
22+
CVectorWriter{ SER_NETWORK, nFlags | nVersion, msg.data, 0, std::forward<Args>(args)... };
23+
return msg;
24+
}
25+
26+
template <typename... Args>
27+
CSerializedNetMsg Make(std::string sCommand, Args&&... args)
28+
{
29+
return Make(0, std::move(sCommand), std::forward<Args>(args)...);
30+
}
31+
32+
private:
33+
const int nVersion;
34+
};
35+
36+
#endif // BITCOIN_NETMESSAGEMAKER_H

src/streams.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,75 @@ OverrideStream<S> WithOrVersion(S* s, int nVersionFlag)
6969
return OverrideStream<S>(s, s->GetType(), s->GetVersion() | nVersionFlag);
7070
}
7171

72+
/* Minimal stream for overwriting and/or appending to an existing byte vector
73+
*
74+
* The referenced vector will grow as necessary
75+
*/
76+
class CVectorWriter
77+
{
78+
public:
79+
80+
/*
81+
* @param[in] nTypeIn Serialization Type
82+
* @param[in] nVersionIn Serialization Version (including any flags)
83+
* @param[in] vchDataIn Referenced byte vector to overwrite/append
84+
* @param[in] nPosIn Starting position. Vector index where writes should start. The vector will initially
85+
* grow as necessary to max(index, vec.size()). So to append, use vec.size().
86+
*/
87+
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn) : nType(nTypeIn), nVersion(nVersionIn), vchData(vchDataIn), nPos(nPosIn)
88+
{
89+
if(nPos > vchData.size())
90+
vchData.resize(nPos);
91+
}
92+
/*
93+
* (other params same as above)
94+
* @param[in] args A list of items to serialize starting at nPos.
95+
*/
96+
template <typename... Args>
97+
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn, Args&&... args) : CVectorWriter(nTypeIn, nVersionIn, vchDataIn, nPosIn)
98+
{
99+
::SerializeMany(*this, std::forward<Args>(args)...);
100+
}
101+
void write(const char* pch, size_t nSize)
102+
{
103+
assert(nPos <= vchData.size());
104+
size_t nOverwrite = std::min(nSize, vchData.size() - nPos);
105+
if (nOverwrite) {
106+
memcpy(vchData.data() + nPos, reinterpret_cast<const unsigned char*>(pch), nOverwrite);
107+
}
108+
if (nOverwrite < nSize) {
109+
vchData.insert(vchData.end(), reinterpret_cast<const unsigned char*>(pch) + nOverwrite, reinterpret_cast<const unsigned char*>(pch) + nSize);
110+
}
111+
nPos += nSize;
112+
}
113+
template<typename T>
114+
CVectorWriter& operator<<(const T& obj)
115+
{
116+
// Serialize to this stream
117+
::Serialize(*this, obj);
118+
return (*this);
119+
}
120+
int GetVersion() const
121+
{
122+
return nVersion;
123+
}
124+
int GetType() const
125+
{
126+
return nType;
127+
}
128+
void seek(size_t nSize)
129+
{
130+
nPos += nSize;
131+
if(nPos > vchData.size())
132+
vchData.resize(nPos);
133+
}
134+
private:
135+
const int nType;
136+
const int nVersion;
137+
std::vector<unsigned char>& vchData;
138+
size_t nPos;
139+
};
140+
72141
/** Double ended buffer combining vector and stream-like interfaces.
73142
*
74143
* >> and << read and write unformatted data using the above serialization templates.

src/test/streams_tests.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,64 @@ using namespace boost::assign; // bring 'operator+=()' into scope
1515

1616
BOOST_FIXTURE_TEST_SUITE(streams_tests, BasicTestingSetup)
1717

18+
BOOST_AUTO_TEST_CASE(streams_vector_writer)
19+
{
20+
unsigned char a(1);
21+
unsigned char b(2);
22+
unsigned char bytes[] = { 3, 4, 5, 6 };
23+
std::vector<unsigned char> vch;
24+
25+
// Each test runs twice. Serializing a second time at the same starting
26+
// point should yield the same results, even if the first test grew the
27+
// vector.
28+
29+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
30+
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
31+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
32+
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
33+
vch.clear();
34+
35+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
36+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
37+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
38+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
39+
vch.clear();
40+
41+
vch.resize(5, 0);
42+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
43+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
44+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
45+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
46+
vch.clear();
47+
48+
vch.resize(4, 0);
49+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
50+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
51+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
52+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
53+
vch.clear();
54+
55+
vch.resize(4, 0);
56+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
57+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
58+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
59+
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
60+
vch.clear();
61+
62+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
63+
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
64+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
65+
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
66+
vch.clear();
67+
68+
vch.resize(4, 8);
69+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
70+
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
71+
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
72+
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
73+
vch.clear();
74+
}
75+
1876
BOOST_AUTO_TEST_CASE(streams_serializedata_xor)
1977
{
2078
std::vector<char> in;

0 commit comments

Comments
 (0)