Skip to content

Commit 3e32cd0

Browse files
theunisipa
authored andcommitted
connman is in charge of pushing messages
The changes here are dense and subtle, but hopefully all is more explicit than before. - CConnman is now in charge of sending data rather than the nodes themselves. This is necessary because many decisions need to be made with all nodes in mind, and a model that requires the nodes calling up to their manager quickly turns to spaghetti. - The per-node-serializer (ssSend) has been replaced with a (quasi-)const send-version. Since the send version for serialization can only change once per connection, we now explicitly tag messages with INIT_PROTO_VERSION if they are sent before the handshake. With this done, there's no need to lock for access to nSendVersion. Also, a new stream is used for each message, so there's no need to lock during the serialization process. - This takes care of accounting for optimistic sends, so the nOptimisticBytesWritten hack can be removed. - -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect they haven't been used in years.
1 parent b98c14c commit 3e32cd0

File tree

4 files changed

+135
-32
lines changed

4 files changed

+135
-32
lines changed

src/main.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5047,7 +5047,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50475047
// Each connection can only send one version message
50485048
if (pfrom->nVersion != 0)
50495049
{
5050-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
5050+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
50515051
LOCK(cs_main);
50525052
Misbehaving(pfrom->GetId(), 1);
50535053
return false;
@@ -5067,7 +5067,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50675067
if (pfrom->nServicesExpected & ~pfrom->nServices)
50685068
{
50695069
LogPrint("net", "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
5070-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
5070+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
50715071
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
50725072
pfrom->fDisconnect = true;
50735073
return false;
@@ -5077,7 +5077,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50775077
{
50785078
// disconnect from peers older than this proto version
50795079
LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->id, pfrom->nVersion);
5080-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
5080+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
50815081
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION));
50825082
pfrom->fDisconnect = true;
50835083
return false;
@@ -5118,7 +5118,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
51185118

51195119
// Be shy and don't send version until we hear
51205120
if (pfrom->fInbound)
5121-
pfrom->PushVersion();
5121+
connman.PushVersion(pfrom, GetAdjustedTime());
51225122

51235123
pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
51245124

@@ -5135,8 +5135,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
51355135
}
51365136

51375137
// Change version
5138-
pfrom->PushMessage(NetMsgType::VERACK);
5139-
pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
5138+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
5139+
pfrom->SetSendVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
51405140

51415141
if (!pfrom->fInbound)
51425142
{
@@ -6391,7 +6391,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
63916391
}
63926392
catch (const std::ios_base::failure& e)
63936393
{
6394-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
6394+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
63956395
if (strstr(e.what(), "end of data"))
63966396
{
63976397
// Allow exceptions from under-length message on vRecv

src/net.cpp

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
394394
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
395395
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
396396

397+
398+
PushVersion(pnode, GetTime());
399+
397400
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
398401
pnode->AddRef();
399402

@@ -415,6 +418,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
415418
return NULL;
416419
}
417420

421+
void CConnman::PushVersion(CNode* pnode, int64_t nTime)
422+
{
423+
ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
424+
CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices));
425+
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
426+
uint64_t nonce = pnode->GetLocalNonce();
427+
int nNodeStartingHeight = pnode->nMyStartingHeight;
428+
NodeId id = pnode->GetId();
429+
430+
PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
431+
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
432+
433+
if (fLogIPs)
434+
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
435+
else
436+
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id);
437+
}
438+
418439
void CConnman::DumpBanlist()
419440
{
420441
SweepBanned(); // clean unused entries (if bantime has expired)
@@ -450,23 +471,6 @@ void CNode::CloseSocketDisconnect()
450471
vRecvMsg.clear();
451472
}
452473

453-
void CNode::PushVersion()
454-
{
455-
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
456-
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
457-
CAddress addrMe = CAddress(CService(), nLocalServices);
458-
if (fLogIPs)
459-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
460-
else
461-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
462-
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
463-
nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
464-
}
465-
466-
467-
468-
469-
470474
void CConnman::ClearBanned()
471475
{
472476
{
@@ -2530,7 +2534,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25302534
filterInventoryKnown(50000, 0.000001),
25312535
nLocalHostNonce(nLocalHostNonceIn),
25322536
nLocalServices(nLocalServicesIn),
2533-
nMyStartingHeight(nMyStartingHeightIn)
2537+
nMyStartingHeight(nMyStartingHeightIn),
2538+
nSendVersion(0)
25342539
{
25352540
nServices = NODE_NONE;
25362541
nServicesExpected = NODE_NONE;
@@ -2587,10 +2592,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25872592
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
25882593
else
25892594
LogPrint("net", "Added connection peer=%d\n", id);
2590-
2591-
// Be shy and don't send version until we hear
2592-
if (hSocket != INVALID_SOCKET && !fInbound)
2593-
PushVersion();
25942595
}
25952596

25962597
CNode::~CNode()
@@ -2696,6 +2697,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
26962697
LEAVE_CRITICAL_SECTION(cs_vSend);
26972698
}
26982699

2700+
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
2701+
{
2702+
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
2703+
}
2704+
2705+
void CConnman::EndMessage(CDataStream& strm)
2706+
{
2707+
// Set the size
2708+
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
2709+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2710+
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2711+
// Set the checksum
2712+
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
2713+
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
2714+
2715+
}
2716+
2717+
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
2718+
{
2719+
if(strm.empty())
2720+
return;
2721+
2722+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2723+
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
2724+
2725+
size_t nBytesSent = 0;
2726+
{
2727+
LOCK(pnode->cs_vSend);
2728+
if(pnode->hSocket == INVALID_SOCKET) {
2729+
return;
2730+
}
2731+
bool optimisticSend(pnode->vSendMsg.empty());
2732+
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
2733+
2734+
//log total amount of bytes per command
2735+
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
2736+
pnode->nSendSize += strm.size();
2737+
2738+
// If write queue empty, attempt "optimistic write"
2739+
if (optimisticSend == true)
2740+
nBytesSent = SocketSendData(pnode);
2741+
}
2742+
if (nBytesSent)
2743+
RecordBytesSent(nBytesSent);
2744+
}
2745+
26992746
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
27002747
{
27012748
CNode* found = nullptr;

src/net.h

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,36 @@ class CConnman
136136

137137
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
138138

139+
template <typename... Args>
140+
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
141+
{
142+
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
143+
::SerializeMany(msg, msg.nType, msg.nVersion, std::forward<Args>(args)...);
144+
EndMessage(msg);
145+
PushMessage(pnode, msg, sCommand);
146+
}
147+
148+
template <typename... Args>
149+
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
150+
{
151+
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
152+
}
153+
154+
template <typename... Args>
155+
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
156+
{
157+
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
158+
}
159+
160+
template <typename... Args>
161+
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
162+
{
163+
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
164+
}
165+
166+
void PushVersion(CNode* pnode, int64_t nTime);
167+
168+
139169
template<typename Callable>
140170
bool ForEachNodeContinueIf(Callable&& func)
141171
{
@@ -345,6 +375,10 @@ class CConnman
345375

346376
unsigned int GetReceiveFloodSize() const;
347377

378+
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
379+
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
380+
void EndMessage(CDataStream& strm);
381+
348382
// Network stats
349383
void RecordBytesRecv(uint64_t bytes);
350384
void RecordBytesSent(uint64_t bytes);
@@ -553,6 +587,7 @@ class CNetMessage {
553587
/** Information about a peer */
554588
class CNode
555589
{
590+
friend class CConnman;
556591
public:
557592
// socket
558593
ServiceFlags nServices;
@@ -681,6 +716,7 @@ class CNode
681716
// Services offered to this peer
682717
const ServiceFlags nLocalServices;
683718
const int nMyStartingHeight;
719+
int nSendVersion;
684720
public:
685721

686722
NodeId GetId() const {
@@ -716,6 +752,25 @@ class CNode
716752
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
717753
msg.SetVersion(nVersionIn);
718754
}
755+
void SetSendVersion(int nVersionIn)
756+
{
757+
// Send version may only be changed in the version message, and
758+
// only one version message is allowed per session. We can therefore
759+
// treat this value as const and even atomic as long as it's only used
760+
// once the handshake is complete. Any attempt to set this twice is an
761+
// error.
762+
assert(nSendVersion == 0);
763+
nSendVersion = nVersionIn;
764+
}
765+
766+
int GetSendVersion() const
767+
{
768+
// The send version should always be explicitly set to
769+
// INIT_PROTO_VERSION rather than using this value until the handshake
770+
// is complete. See PushMessageWithVersion().
771+
assert(nSendVersion != 0);
772+
return nSendVersion;
773+
}
719774

720775
CNode* AddRef()
721776
{
@@ -787,9 +842,6 @@ class CNode
787842
// TODO: Document the precondition of this function. Is cs_vSend locked?
788843
void EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend);
789844

790-
void PushVersion();
791-
792-
793845
void PushMessage(const char* pszCommand)
794846
{
795847
try

src/test/DoS_tests.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
4949
connman->ClearBanned();
5050
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
5151
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
52+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
5253
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
5354
dummyNode1.nVersion = 1;
5455
Misbehaving(dummyNode1.GetId(), 100); // Should get banned
@@ -58,6 +59,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
5859

5960
CAddress addr2(ip(0xa0b0c002), NODE_NONE);
6061
CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true);
62+
dummyNode2.SetSendVersion(PROTOCOL_VERSION);
6163
GetNodeSignals().InitializeNode(dummyNode2.GetId(), &dummyNode2);
6264
dummyNode2.nVersion = 1;
6365
Misbehaving(dummyNode2.GetId(), 50);
@@ -75,6 +77,7 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
7577
mapArgs["-banscore"] = "111"; // because 11 is my favorite number
7678
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
7779
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true);
80+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
7881
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
7982
dummyNode1.nVersion = 1;
8083
Misbehaving(dummyNode1.GetId(), 100);
@@ -97,6 +100,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
97100

98101
CAddress addr(ip(0xa0b0c001), NODE_NONE);
99102
CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true);
103+
dummyNode.SetSendVersion(PROTOCOL_VERSION);
100104
GetNodeSignals().InitializeNode(dummyNode.GetId(), &dummyNode);
101105
dummyNode.nVersion = 1;
102106

0 commit comments

Comments
 (0)