Skip to content

Commit c8c572f

Browse files
committed
Merge #8708: net: have CConnman handle message sending
9027680 net: handle version push in InitializeNode (Cory Fields) 7588b85 net: construct CNodeStates in place (Cory Fields) 440f1d3 net: remove now-unused ssSend and Fuzz (Cory Fields) 5c2169c drop the optimistic write counter hack (Cory Fields) ea33268 net: switch all callers to connman for pushing messages (Cory Fields) 3e32cd0 connman is in charge of pushing messages (Cory Fields) b98c14c serialization: teach serializers variadics (Cory Fields)
2 parents 7b22e50 + 9027680 commit c8c572f

File tree

7 files changed

+320
-380
lines changed

7 files changed

+320
-380
lines changed

src/main.cpp

Lines changed: 90 additions & 64 deletions
Large diffs are not rendered by default.

src/net.cpp

Lines changed: 39 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -393,18 +393,15 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
393393
NodeId id = GetNewNodeId();
394394
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
395395
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
396-
397-
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
396+
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
397+
pnode->nTimeConnected = GetTime();
398398
pnode->AddRef();
399-
399+
GetNodeSignals().InitializeNode(pnode, *this);
400400
{
401401
LOCK(cs_vNodes);
402402
vNodes.push_back(pnode);
403403
}
404404

405-
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
406-
pnode->nTimeConnected = GetTime();
407-
408405
return pnode;
409406
} else if (!proxyConnectionFailed) {
410407
// If connecting to the node failed, and failure is not caused by a problem connecting to
@@ -450,23 +447,6 @@ void CNode::CloseSocketDisconnect()
450447
vRecvMsg.clear();
451448
}
452449

453-
void CNode::PushVersion()
454-
{
455-
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
456-
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
457-
CAddress addrMe = CAddress(CService(), nLocalServices);
458-
if (fLogIPs)
459-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
460-
else
461-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
462-
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
463-
nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
464-
}
465-
466-
467-
468-
469-
470450
void CConnman::ClearBanned()
471451
{
472452
{
@@ -1032,9 +1012,9 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
10321012
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
10331013

10341014
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true);
1035-
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
10361015
pnode->AddRef();
10371016
pnode->fWhitelisted = whitelisted;
1017+
GetNodeSignals().InitializeNode(pnode, *this);
10381018

10391019
LogPrint("net", "connection from %s accepted\n", addr.ToString());
10401020

@@ -1059,7 +1039,7 @@ void CConnman::ThreadSocketHandler()
10591039
BOOST_FOREACH(CNode* pnode, vNodesCopy)
10601040
{
10611041
if (pnode->fDisconnect ||
1062-
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
1042+
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
10631043
{
10641044
// remove from vNodes
10651045
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1163,10 +1143,6 @@ void CConnman::ThreadSocketHandler()
11631143
{
11641144
TRY_LOCK(pnode->cs_vSend, lockSend);
11651145
if (lockSend) {
1166-
if (pnode->nOptimisticBytesWritten) {
1167-
RecordBytesSent(pnode->nOptimisticBytesWritten);
1168-
pnode->nOptimisticBytesWritten = 0;
1169-
}
11701146
if (!pnode->vSendMsg.empty()) {
11711147
FD_SET(pnode->hSocket, &fdsetSend);
11721148
continue;
@@ -2130,7 +2106,7 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
21302106
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
21312107

21322108
pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce);
2133-
GetNodeSignals().InitializeNode(pnodeLocalHost->GetId(), pnodeLocalHost);
2109+
GetNodeSignals().InitializeNode(pnodeLocalHost, *this);
21342110
}
21352111

21362112
//
@@ -2482,46 +2458,10 @@ int CConnman::GetBestHeight() const
24822458
return nBestHeight.load(std::memory_order_acquire);
24832459
}
24842460

2485-
void CNode::Fuzz(int nChance)
2486-
{
2487-
if (!fSuccessfullyConnected) return; // Don't fuzz initial handshake
2488-
if (GetRand(nChance) != 0) return; // Fuzz 1 of every nChance messages
2489-
2490-
switch (GetRand(3))
2491-
{
2492-
case 0:
2493-
// xor a random byte with a random value:
2494-
if (!ssSend.empty()) {
2495-
CDataStream::size_type pos = GetRand(ssSend.size());
2496-
ssSend[pos] ^= (unsigned char)(GetRand(256));
2497-
}
2498-
break;
2499-
case 1:
2500-
// delete a random byte:
2501-
if (!ssSend.empty()) {
2502-
CDataStream::size_type pos = GetRand(ssSend.size());
2503-
ssSend.erase(ssSend.begin()+pos);
2504-
}
2505-
break;
2506-
case 2:
2507-
// insert a random byte at a random position
2508-
{
2509-
CDataStream::size_type pos = GetRand(ssSend.size());
2510-
char ch = (char)GetRand(256);
2511-
ssSend.insert(ssSend.begin()+pos, ch);
2512-
}
2513-
break;
2514-
}
2515-
// Chance of more than one change half the time:
2516-
// (more changes exponentially less likely):
2517-
Fuzz(2);
2518-
}
2519-
25202461
unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
25212462
unsigned int CConnman::GetSendBufferSize() const{ return nSendBufferMaxSize; }
25222463

25232464
CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const std::string& addrNameIn, bool fInboundIn) :
2524-
ssSend(SER_NETWORK, INIT_PROTO_VERSION),
25252465
addr(addrIn),
25262466
fInbound(fInboundIn),
25272467
id(idIn),
@@ -2530,7 +2470,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25302470
filterInventoryKnown(50000, 0.000001),
25312471
nLocalHostNonce(nLocalHostNonceIn),
25322472
nLocalServices(nLocalServicesIn),
2533-
nMyStartingHeight(nMyStartingHeightIn)
2473+
nMyStartingHeight(nMyStartingHeightIn),
2474+
nSendVersion(0)
25342475
{
25352476
nServices = NODE_NONE;
25362477
nServicesExpected = NODE_NONE;
@@ -2577,7 +2518,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25772518
minFeeFilter = 0;
25782519
lastSentFeeFilter = 0;
25792520
nextSendTimeFeeFilter = 0;
2580-
nOptimisticBytesWritten = 0;
25812521

25822522
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
25832523
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2587,10 +2527,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25872527
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
25882528
else
25892529
LogPrint("net", "Added connection peer=%d\n", id);
2590-
2591-
// Be shy and don't send version until we hear
2592-
if (hSocket != INVALID_SOCKET && !fInbound)
2593-
PushVersion();
25942530
}
25952531

25962532
CNode::~CNode()
@@ -2635,65 +2571,50 @@ void CNode::AskFor(const CInv& inv)
26352571
mapAskFor.insert(std::make_pair(nRequestTime, inv));
26362572
}
26372573

2638-
void CNode::BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
2574+
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
26392575
{
2640-
ENTER_CRITICAL_SECTION(cs_vSend);
2641-
assert(ssSend.size() == 0);
2642-
ssSend << CMessageHeader(Params().MessageStart(), pszCommand, 0);
2643-
LogPrint("net", "sending: %s ", SanitizeString(pszCommand));
2576+
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
26442577
}
26452578

2646-
void CNode::AbortMessage() UNLOCK_FUNCTION(cs_vSend)
2579+
void CConnman::EndMessage(CDataStream& strm)
26472580
{
2648-
ssSend.clear();
2649-
2650-
LEAVE_CRITICAL_SECTION(cs_vSend);
2581+
// Set the size
2582+
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
2583+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2584+
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2585+
// Set the checksum
2586+
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
2587+
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
26512588

2652-
LogPrint("net", "(aborted)\n");
26532589
}
26542590

2655-
void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
2591+
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
26562592
{
2657-
// The -*messagestest options are intentionally not documented in the help message,
2658-
// since they are only used during development to debug the networking code and are
2659-
// not intended for end-users.
2660-
if (mapArgs.count("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 2)) == 0)
2661-
{
2662-
LogPrint("net", "dropmessages DROPPING SEND MESSAGE\n");
2663-
AbortMessage();
2664-
return;
2665-
}
2666-
if (mapArgs.count("-fuzzmessagestest"))
2667-
Fuzz(GetArg("-fuzzmessagestest", 10));
2668-
2669-
if (ssSend.size() == 0)
2670-
{
2671-
LEAVE_CRITICAL_SECTION(cs_vSend);
2593+
if(strm.empty())
26722594
return;
2673-
}
2674-
// Set the size
2675-
unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
2676-
WriteLE32((uint8_t*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2677-
2678-
//log total amount of bytes per command
2679-
mapSendBytesPerMsgCmd[std::string(pszCommand)] += nSize + CMessageHeader::HEADER_SIZE;
26802595

2681-
// Set the checksum
2682-
uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
2683-
assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + CMessageHeader::CHECKSUM_SIZE);
2684-
memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
2596+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2597+
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
26852598

2686-
LogPrint("net", "(%d bytes) peer=%d\n", nSize, id);
2687-
2688-
std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
2689-
ssSend.GetAndClear(*it);
2690-
nSendSize += (*it).size();
2599+
size_t nBytesSent = 0;
2600+
{
2601+
LOCK(pnode->cs_vSend);
2602+
if(pnode->hSocket == INVALID_SOCKET) {
2603+
return;
2604+
}
2605+
bool optimisticSend(pnode->vSendMsg.empty());
2606+
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
26912607

2692-
// If write queue empty, attempt "optimistic write"
2693-
if (it == vSendMsg.begin())
2694-
nOptimisticBytesWritten += SocketSendData(this);
2608+
//log total amount of bytes per command
2609+
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
2610+
pnode->nSendSize += strm.size();
26952611

2696-
LEAVE_CRITICAL_SECTION(cs_vSend);
2612+
// If write queue empty, attempt "optimistic write"
2613+
if (optimisticSend == true)
2614+
nBytesSent = SocketSendData(pnode);
2615+
}
2616+
if (nBytesSent)
2617+
RecordBytesSent(nBytesSent);
26972618
}
26982619

26992620
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)

0 commit comments

Comments
 (0)