Skip to content

Commit df127ec

Browse files
author
MarcoFalke
committed
Merge #19829: net processing: Move block inventory state to net_processing
3002b4a [net processing] Guard m_continuation_block with m_block_inv_mutex (John Newbery) 184557e [net processing] Move hashContinue to net processing (John Newbery) c853ef0 scripted-diff: rename vBlockHashesToAnnounce and vInventoryBlockToSend (John Newbery) 53b7ac1 [net processing] Move block inventory data to Peer (John Newbery) 78040f9 [net processing] Rename nStartingHeight to m_starting_height (John Newbery) 77a2c2f [net processing] Move nStartingHeight to Peer (John Newbery) 717a374 [net processing] Improve documentation for Peer destruction/locking (John Newbery) Pull request description: This continues the work of moving application layer data into net_processing, by moving all block inventory state into the new Peer object added in #19607. For motivation, see #19398. ACKs for top commit: jonatack: re-ACK 3002b4a per `git diff 9aad3e4 3002b4a` Sjors: Code review re-ACK 3002b4a MarcoFalke: re-ACK 3002b4a 🌓 Tree-SHA512: eb2b474b73b025791ee3e6e41809926b332b48468763219f31638ca390f427632f05902dfc6a2c6bdc1ce47b215782f67874ddbf05b97d77d5897b7e2abfe4d9
2 parents 209974d + 3002b4a commit df127ec

File tree

6 files changed

+92
-64
lines changed

6 files changed

+92
-64
lines changed

src/net.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,6 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
590590
stats.m_manual_connection = IsManualConn();
591591
X(m_bip152_highbandwidth_to);
592592
X(m_bip152_highbandwidth_from);
593-
X(nStartingHeight);
594593
{
595594
LOCK(cs_vSend);
596595
X(mapSendBytesPerMsgCmd);
@@ -2956,7 +2955,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
29562955
{
29572956
hSocket = hSocketIn;
29582957
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
2959-
hashContinue = uint256();
29602958
if (conn_type_in != ConnectionType::BLOCK_RELAY) {
29612959
m_tx_relay = MakeUnique<TxRelay>();
29622960
}

src/net.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ class CNodeStats
705705
bool m_manual_connection;
706706
bool m_bip152_highbandwidth_to;
707707
bool m_bip152_highbandwidth_from;
708-
int nStartingHeight;
708+
int m_starting_height;
709709
uint64_t nSendBytes;
710710
mapMsgCmdSize mapSendBytesPerMsgCmd;
711711
uint64_t nRecvBytes;
@@ -993,8 +993,6 @@ class CNode
993993
mapMsgCmdSize mapRecvBytesPerMsgCmd GUARDED_BY(cs_vRecv);
994994

995995
public:
996-
uint256 hashContinue;
997-
std::atomic<int> nStartingHeight{-1};
998996
// We selected peer as (compact blocks) high-bandwidth peer (BIP152)
999997
std::atomic<bool> m_bip152_highbandwidth_to{false};
1000998
// Peer selected us as (compact blocks) high-bandwidth peer (BIP152)
@@ -1007,12 +1005,6 @@ class CNode
10071005
std::chrono::microseconds m_next_addr_send GUARDED_BY(cs_sendProcessing){0};
10081006
std::chrono::microseconds m_next_local_addr_send GUARDED_BY(cs_sendProcessing){0};
10091007

1010-
// List of block ids we still have announce.
1011-
// There is no final sorting before sending, as they are always sent immediately
1012-
// and in the order requested.
1013-
std::vector<uint256> vInventoryBlockToSend GUARDED_BY(cs_inventory);
1014-
Mutex cs_inventory;
1015-
10161008
struct TxRelay {
10171009
mutable RecursiveMutex cs_filter;
10181010
// We use fRelayTxes for two purposes -
@@ -1043,9 +1035,6 @@ class CNode
10431035
// m_tx_relay == nullptr if we're not relaying transactions with this peer
10441036
std::unique_ptr<TxRelay> m_tx_relay;
10451037

1046-
// Used for headers announcements - unfiltered blocks to relay
1047-
std::vector<uint256> vBlockHashesToAnnounce GUARDED_BY(cs_inventory);
1048-
10491038
/** UNIX epoch time of the last block received from this peer that we had
10501039
* not yet seen (e.g. not already received from another peer), that passed
10511040
* preliminary validity checks and was saved to disk, even if we don't

src/net_processing.cpp

Lines changed: 60 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,11 @@ void PeerManager::FinalizeNode(const CNode& node, bool& fUpdateConnectionTime) {
791791
LOCK(cs_main);
792792
int misbehavior{0};
793793
{
794+
// We remove the PeerRef from g_peer_map here, but we don't always
795+
// destruct the Peer. Sometimes another thread is still holding a
796+
// PeerRef, so the refcount is >= 1. Be careful not to do any
797+
// processing here that assumes Peer won't be changed before it's
798+
// destructed.
794799
PeerRef peer = RemovePeer(nodeid);
795800
assert(peer != nullptr);
796801
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
@@ -870,6 +875,7 @@ bool PeerManager::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
870875
PeerRef peer = GetPeerRef(nodeid);
871876
if (peer == nullptr) return false;
872877
stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
878+
stats.m_starting_height = peer->m_starting_height;
873879

874880
return true;
875881
}
@@ -1309,13 +1315,17 @@ void PeerManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInde
13091315
}
13101316
}
13111317

1312-
// Relay to all peers
1313-
m_connman.ForEachNode([&vHashes](CNode* pnode) {
1314-
LOCK(pnode->cs_inventory);
1315-
for (const uint256& hash : reverse_iterate(vHashes)) {
1316-
pnode->vBlockHashesToAnnounce.push_back(hash);
1318+
{
1319+
LOCK(m_peer_mutex);
1320+
for (auto& it : m_peer_map) {
1321+
Peer& peer = *it.second;
1322+
LOCK(peer.m_block_inv_mutex);
1323+
for (const uint256& hash : reverse_iterate(vHashes)) {
1324+
peer.m_blocks_for_headers_relay.push_back(hash);
1325+
}
13171326
}
1318-
});
1327+
}
1328+
13191329
m_connman.WakeMessageHandler();
13201330
}
13211331

@@ -1465,7 +1475,7 @@ static void RelayAddress(const CNode& originator,
14651475
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
14661476
}
14671477

1468-
void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
1478+
void static ProcessGetBlockData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, const CInv& inv, CConnman& connman)
14691479
{
14701480
bool send = false;
14711481
std::shared_ptr<const CBlock> a_recent_block;
@@ -1605,16 +1615,18 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c
16051615
}
16061616
}
16071617

1608-
// Trigger the peer node to send a getblocks request for the next batch of inventory
1609-
if (inv.hash == pfrom.hashContinue)
16101618
{
1611-
// Send immediately. This must send even if redundant,
1612-
// and we want it right after the last block so they don't
1613-
// wait for other stuff first.
1614-
std::vector<CInv> vInv;
1615-
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
1616-
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
1617-
pfrom.hashContinue.SetNull();
1619+
LOCK(peer.m_block_inv_mutex);
1620+
// Trigger the peer node to send a getblocks request for the next batch of inventory
1621+
if (inv.hash == peer.m_continuation_block) {
1622+
// Send immediately. This must send even if redundant,
1623+
// and we want it right after the last block so they don't
1624+
// wait for other stuff first.
1625+
std::vector<CInv> vInv;
1626+
vInv.push_back(CInv(MSG_BLOCK, ::ChainActive().Tip()->GetBlockHash()));
1627+
connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::INV, vInv));
1628+
peer.m_continuation_block.SetNull();
1629+
}
16181630
}
16191631
}
16201632
}
@@ -1714,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa
17141726
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
17151727
const CInv &inv = *it++;
17161728
if (inv.IsGenBlkMsg()) {
1717-
ProcessGetBlockData(pfrom, chainparams, inv, connman);
1729+
ProcessGetBlockData(pfrom, peer, chainparams, inv, connman);
17181730
}
17191731
// else: If the first item on the queue is an unknown type, we erase it
17201732
// and continue processing the queue on the next call.
@@ -1764,7 +1776,9 @@ void PeerManager::SendBlockTransactions(CNode& pfrom, const CBlock& block, const
17641776
m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
17651777
}
17661778

1767-
void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block)
1779+
void PeerManager::ProcessHeadersMessage(CNode& pfrom, const Peer& peer,
1780+
const std::vector<CBlockHeader>& headers,
1781+
bool via_compact_block)
17681782
{
17691783
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
17701784
size_t nCount = headers.size();
@@ -1854,7 +1868,8 @@ void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHe
18541868
// Headers message had its maximum size; the peer may have more headers.
18551869
// TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue
18561870
// from there instead.
1857-
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom.GetId(), pfrom.nStartingHeight);
1871+
LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n",
1872+
pindexLast->nHeight, pfrom.GetId(), peer.m_starting_height);
18581873
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256()));
18591874
}
18601875

@@ -2280,7 +2295,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
22802295
ServiceFlags nServices;
22812296
int nVersion;
22822297
std::string cleanSubVer;
2283-
int nStartingHeight = -1;
2298+
int starting_height = -1;
22842299
bool fRelay = true;
22852300

22862301
vRecv >> nVersion >> nServiceInt >> nTime >> addrMe;
@@ -2311,7 +2326,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
23112326
cleanSubVer = SanitizeString(strSubVer);
23122327
}
23132328
if (!vRecv.empty()) {
2314-
vRecv >> nStartingHeight;
2329+
vRecv >> starting_height;
23152330
}
23162331
if (!vRecv.empty())
23172332
vRecv >> fRelay;
@@ -2360,7 +2375,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
23602375
LOCK(pfrom.cs_SubVer);
23612376
pfrom.cleanSubVer = cleanSubVer;
23622377
}
2363-
pfrom.nStartingHeight = nStartingHeight;
2378+
peer->m_starting_height = starting_height;
23642379

23652380
// set nodes not relaying blocks and tx and not serving (parts) of the historical blockchain as "clients"
23662381
pfrom.fClient = (!(nServices & NODE_NETWORK) && !(nServices & NODE_NETWORK_LIMITED));
@@ -2440,7 +2455,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
24402455

24412456
LogPrint(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, peer=%d%s\n",
24422457
cleanSubVer, pfrom.nVersion,
2443-
pfrom.nStartingHeight, addrMe.ToString(), pfrom.GetId(),
2458+
peer->m_starting_height, addrMe.ToString(), pfrom.GetId(),
24442459
remoteAddr);
24452460

24462461
int64_t nTimeOffset = nTime - GetTime();
@@ -2474,7 +2489,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
24742489

24752490
if (!pfrom.IsInboundConn()) {
24762491
LogPrintf("New outbound peer connected: version: %d, blocks=%d, peer=%d%s (%s)\n",
2477-
pfrom.nVersion.load(), pfrom.nStartingHeight,
2492+
pfrom.nVersion.load(), peer->m_starting_height,
24782493
pfrom.GetId(), (fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToString()) : ""),
24792494
pfrom.ConnectionTypeAsString());
24802495
}
@@ -2786,13 +2801,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
27862801
LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
27872802
break;
27882803
}
2789-
WITH_LOCK(pfrom.cs_inventory, pfrom.vInventoryBlockToSend.push_back(pindex->GetBlockHash()));
2790-
if (--nLimit <= 0)
2791-
{
2804+
WITH_LOCK(peer->m_block_inv_mutex, peer->m_blocks_for_inv_relay.push_back(pindex->GetBlockHash()));
2805+
if (--nLimit <= 0) {
27922806
// When this block is requested, we'll send an inv that'll
27932807
// trigger the peer to getblocks the next batch of inventory.
27942808
LogPrint(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
2795-
pfrom.hashContinue = pindex->GetBlockHash();
2809+
WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
27962810
break;
27972811
}
27982812
}
@@ -3316,7 +3330,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
33163330
// the peer if the header turns out to be for an invalid block.
33173331
// Note that if a peer tries to build on an invalid chain, that
33183332
// will be detected and the peer will be disconnected/discouraged.
3319-
return ProcessHeadersMessage(pfrom, {cmpctblock.header}, /*via_compact_block=*/true);
3333+
return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.header}, /*via_compact_block=*/true);
33203334
}
33213335

33223336
if (fBlockReconstructed) {
@@ -3459,7 +3473,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
34593473
ReadCompactSize(vRecv); // ignore tx count; assume it is 0.
34603474
}
34613475

3462-
return ProcessHeadersMessage(pfrom, headers, /*via_compact_block=*/false);
3476+
return ProcessHeadersMessage(pfrom, *peer, headers, /*via_compact_block=*/false);
34633477
}
34643478

34653479
if (msg_type == NetMsgType::BLOCK)
@@ -4067,6 +4081,7 @@ class CompareInvMempoolOrder
40674081

40684082
bool PeerManager::SendMessages(CNode* pto)
40694083
{
4084+
PeerRef peer = GetPeerRef(pto->GetId());
40704085
const Consensus::Params& consensusParams = m_chainparams.GetConsensus();
40714086

40724087
// We must call MaybeDiscourageAndDisconnect first, to ensure that we'll
@@ -4192,7 +4207,7 @@ bool PeerManager::SendMessages(CNode* pto)
41924207
got back an empty response. */
41934208
if (pindexStart->pprev)
41944209
pindexStart = pindexStart->pprev;
4195-
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight);
4210+
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height);
41964211
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256()));
41974212
}
41984213
}
@@ -4208,11 +4223,11 @@ bool PeerManager::SendMessages(CNode* pto)
42084223
// If no header would connect, or if we have too many
42094224
// blocks, or if the peer doesn't want headers, just
42104225
// add all to the inv queue.
4211-
LOCK(pto->cs_inventory);
4226+
LOCK(peer->m_block_inv_mutex);
42124227
std::vector<CBlock> vHeaders;
42134228
bool fRevertToInv = ((!state.fPreferHeaders &&
4214-
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) ||
4215-
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE);
4229+
(!state.fPreferHeaderAndIDs || peer->m_blocks_for_headers_relay.size() > 1)) ||
4230+
peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE);
42164231
const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery
42174232
ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date
42184233

@@ -4221,7 +4236,7 @@ bool PeerManager::SendMessages(CNode* pto)
42214236
// Try to find first header that our peer doesn't have, and
42224237
// then send all headers past that one. If we come across any
42234238
// headers that aren't on ::ChainActive(), give up.
4224-
for (const uint256 &hash : pto->vBlockHashesToAnnounce) {
4239+
for (const uint256& hash : peer->m_blocks_for_headers_relay) {
42254240
const CBlockIndex* pindex = LookupBlockIndex(hash);
42264241
assert(pindex);
42274242
if (::ChainActive()[pindex->nHeight] != pindex) {
@@ -4238,7 +4253,7 @@ bool PeerManager::SendMessages(CNode* pto)
42384253
// which should be caught by the prior check), but one
42394254
// way this could happen is by using invalidateblock /
42404255
// reconsiderblock repeatedly on the tip, causing it to
4241-
// be added multiple times to vBlockHashesToAnnounce.
4256+
// be added multiple times to m_blocks_for_headers_relay.
42424257
// Robustly deal with this rare situation by reverting
42434258
// to an inv.
42444259
fRevertToInv = true;
@@ -4310,10 +4325,10 @@ bool PeerManager::SendMessages(CNode* pto)
43104325
}
43114326
if (fRevertToInv) {
43124327
// If falling back to using an inv, just try to inv the tip.
4313-
// The last entry in vBlockHashesToAnnounce was our tip at some point
4328+
// The last entry in m_blocks_for_headers_relay was our tip at some point
43144329
// in the past.
4315-
if (!pto->vBlockHashesToAnnounce.empty()) {
4316-
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back();
4330+
if (!peer->m_blocks_for_headers_relay.empty()) {
4331+
const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();
43174332
const CBlockIndex* pindex = LookupBlockIndex(hashToAnnounce);
43184333
assert(pindex);
43194334

@@ -4327,32 +4342,32 @@ bool PeerManager::SendMessages(CNode* pto)
43274342

43284343
// If the peer's chain has this block, don't inv it back.
43294344
if (!PeerHasHeader(&state, pindex)) {
4330-
pto->vInventoryBlockToSend.push_back(hashToAnnounce);
4345+
peer->m_blocks_for_inv_relay.push_back(hashToAnnounce);
43314346
LogPrint(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__,
43324347
pto->GetId(), hashToAnnounce.ToString());
43334348
}
43344349
}
43354350
}
4336-
pto->vBlockHashesToAnnounce.clear();
4351+
peer->m_blocks_for_headers_relay.clear();
43374352
}
43384353

43394354
//
43404355
// Message: inventory
43414356
//
43424357
std::vector<CInv> vInv;
43434358
{
4344-
LOCK(pto->cs_inventory);
4345-
vInv.reserve(std::max<size_t>(pto->vInventoryBlockToSend.size(), INVENTORY_BROADCAST_MAX));
4359+
LOCK(peer->m_block_inv_mutex);
4360+
vInv.reserve(std::max<size_t>(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_MAX));
43464361

43474362
// Add blocks
4348-
for (const uint256& hash : pto->vInventoryBlockToSend) {
4363+
for (const uint256& hash : peer->m_blocks_for_inv_relay) {
43494364
vInv.push_back(CInv(MSG_BLOCK, hash));
43504365
if (vInv.size() == MAX_INV_SZ) {
43514366
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
43524367
vInv.clear();
43534368
}
43544369
}
4355-
pto->vInventoryBlockToSend.clear();
4370+
peer->m_blocks_for_inv_relay.clear();
43564371

43574372
if (pto->m_tx_relay != nullptr) {
43584373
LOCK(pto->m_tx_relay->cs_tx_inventory);

0 commit comments

Comments
 (0)