Skip to content

Commit b0a4ac9

Browse files
committed
[net processing] Add m_tx_relay_mutex to protect m_tx_relay ptr
1 parent 290a8da commit b0a4ac9

File tree

1 file changed

+80
-72
lines changed

1 file changed

+80
-72
lines changed

src/net_processing.cpp

Lines changed: 80 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,17 @@ struct Peer {
280280
std::atomic<CAmount> m_fee_filter_received{0};
281281
};
282282

283+
Mutex m_tx_relay_mutex;
284+
283285
/** Transaction relay data. Will be a nullptr if we're not relaying
284-
* transactions with this peer (e.g. if it's a block-relay-only peer) */
285-
std::unique_ptr<TxRelay> m_tx_relay;
286+
* transactions with this peer (e.g. if it's a block-relay-only peer).
287+
* Users should access this with the GetTxRelay() getter. */
288+
std::unique_ptr<TxRelay> m_tx_relay GUARDED_BY(m_tx_relay_mutex);
289+
290+
TxRelay* GetTxRelay()
291+
{
292+
return WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get());
293+
};
286294

287295
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
288296
std::vector<CAddress> m_addrs_to_send;
@@ -896,10 +904,11 @@ static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& ins
896904

897905
static void AddKnownTx(Peer& peer, const uint256& hash)
898906
{
899-
if (peer.m_tx_relay != nullptr) {
900-
LOCK(peer.m_tx_relay->m_tx_inventory_mutex);
901-
peer.m_tx_relay->m_tx_inventory_known_filter.insert(hash);
902-
}
907+
auto tx_relay = peer.GetTxRelay();
908+
if (!tx_relay) return;
909+
910+
LOCK(tx_relay->m_tx_inventory_mutex);
911+
tx_relay->m_tx_inventory_known_filter.insert(hash);
903912
}
904913

905914
std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now,
@@ -1392,9 +1401,9 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
13921401
ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load();
13931402
}
13941403

1395-
if (peer->m_tx_relay != nullptr) {
1396-
stats.m_relay_txs = WITH_LOCK(peer->m_tx_relay->m_bloom_filter_mutex, return peer->m_tx_relay->m_relay_txs);
1397-
stats.m_fee_filter_received = peer->m_tx_relay->m_fee_filter_received.load();
1404+
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
1405+
stats.m_relay_txs = WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs);
1406+
stats.m_fee_filter_received = tx_relay->m_fee_filter_received.load();
13981407
} else {
13991408
stats.m_relay_txs = false;
14001409
stats.m_fee_filter_received = 0;
@@ -1810,12 +1819,13 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid
18101819
LOCK(m_peer_mutex);
18111820
for(auto& it : m_peer_map) {
18121821
Peer& peer = *it.second;
1813-
if (!peer.m_tx_relay) continue;
1822+
auto tx_relay = peer.GetTxRelay();
1823+
if (!tx_relay) continue;
18141824

18151825
const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
1816-
LOCK(peer.m_tx_relay->m_tx_inventory_mutex);
1817-
if (!peer.m_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
1818-
peer.m_tx_relay->m_tx_inventory_to_send.insert(hash);
1826+
LOCK(tx_relay->m_tx_inventory_mutex);
1827+
if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
1828+
tx_relay->m_tx_inventory_to_send.insert(hash);
18191829
}
18201830
};
18211831
}
@@ -1966,11 +1976,11 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv&
19661976
} else if (inv.IsMsgFilteredBlk()) {
19671977
bool sendMerkleBlock = false;
19681978
CMerkleBlock merkleBlock;
1969-
if (peer.m_tx_relay != nullptr) {
1970-
LOCK(peer.m_tx_relay->m_bloom_filter_mutex);
1971-
if (peer.m_tx_relay->m_bloom_filter) {
1979+
if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) {
1980+
LOCK(tx_relay->m_bloom_filter_mutex);
1981+
if (tx_relay->m_bloom_filter) {
19721982
sendMerkleBlock = true;
1973-
merkleBlock = CMerkleBlock(*pblock, *peer.m_tx_relay->m_bloom_filter);
1983+
merkleBlock = CMerkleBlock(*pblock, *tx_relay->m_bloom_filter);
19741984
}
19751985
}
19761986
if (sendMerkleBlock) {
@@ -2053,13 +2063,15 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
20532063
{
20542064
AssertLockNotHeld(cs_main);
20552065

2066+
auto tx_relay = peer.GetTxRelay();
2067+
20562068
std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
20572069
std::vector<CInv> vNotFound;
20582070
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
20592071

20602072
const auto now{GetTime<std::chrono::seconds>()};
20612073
// Get last mempool request time
2062-
const auto mempool_req = peer.m_tx_relay != nullptr ? peer.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min();
2074+
const auto mempool_req = tx_relay != nullptr ? tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min();
20632075

20642076
// Process as many TX items from the front of the getdata queue as
20652077
// possible, since they're common and it's efficient to batch process
@@ -2072,7 +2084,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
20722084

20732085
const CInv &inv = *it++;
20742086

2075-
if (peer.m_tx_relay == nullptr) {
2087+
if (tx_relay == nullptr) {
20762088
// Ignore GETDATA requests for transactions from blocks-only peers.
20772089
continue;
20782090
}
@@ -2100,7 +2112,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
21002112
}
21012113
for (const uint256& parent_txid : parent_ids_to_add) {
21022114
// Relaying a transaction with a recent but unconfirmed parent.
2103-
if (WITH_LOCK(peer.m_tx_relay->m_tx_inventory_mutex, return !peer.m_tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) {
2115+
if (WITH_LOCK(tx_relay->m_tx_inventory_mutex, return !tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) {
21042116
LOCK(cs_main);
21052117
State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid);
21062118
}
@@ -2736,10 +2748,10 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
27362748
// set nodes not capable of serving the complete blockchain history as "limited nodes"
27372749
pfrom.m_limited_node = (!(nServices & NODE_NETWORK) && (nServices & NODE_NETWORK_LIMITED));
27382750

2739-
if (peer->m_tx_relay != nullptr) {
2751+
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
27402752
{
2741-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
2742-
peer->m_tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message
2753+
LOCK(tx_relay->m_bloom_filter_mutex);
2754+
tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message
27432755
}
27442756
if (fRelay) pfrom.m_relays_txs = true;
27452757
}
@@ -3069,7 +3081,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
30693081

30703082
// Reject tx INVs when the -blocksonly setting is enabled, or this is a
30713083
// block-relay-only peer
3072-
bool reject_tx_invs{m_ignore_incoming_txs || (peer->m_tx_relay == nullptr)};
3084+
bool reject_tx_invs{m_ignore_incoming_txs || (peer->GetTxRelay() == nullptr)};
30733085

30743086
// Allow peers with relay permission to send data other than blocks in blocks only mode
30753087
if (pfrom.HasPermission(NetPermissionFlags::Relay)) {
@@ -3346,7 +3358,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
33463358
// Stop processing the transaction early if
33473359
// 1) We are in blocks only mode and peer has no relay permission
33483360
// 2) This peer is a block-relay-only peer
3349-
if ((m_ignore_incoming_txs && !pfrom.HasPermission(NetPermissionFlags::Relay)) || (peer->m_tx_relay == nullptr)) {
3361+
if ((m_ignore_incoming_txs && !pfrom.HasPermission(NetPermissionFlags::Relay)) || (peer->GetTxRelay() == nullptr)) {
33503362
LogPrint(BCLog::NET, "transaction sent in violation of protocol peer=%d\n", pfrom.GetId());
33513363
pfrom.fDisconnect = true;
33523364
return;
@@ -3958,9 +3970,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
39583970
return;
39593971
}
39603972

3961-
if (peer->m_tx_relay != nullptr) {
3962-
LOCK(peer->m_tx_relay->m_tx_inventory_mutex);
3963-
peer->m_tx_relay->m_send_mempool = true;
3973+
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
3974+
LOCK(tx_relay->m_tx_inventory_mutex);
3975+
tx_relay->m_send_mempool = true;
39643976
}
39653977
return;
39663978
}
@@ -4053,16 +4065,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
40534065
{
40544066
// There is no excuse for sending a too-large filter
40554067
Misbehaving(pfrom.GetId(), 100, "too-large bloom filter");
4056-
}
4057-
else if (peer->m_tx_relay != nullptr)
4058-
{
4068+
} else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
40594069
{
4060-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4061-
peer->m_tx_relay->m_bloom_filter.reset(new CBloomFilter(filter));
4062-
peer->m_tx_relay->m_relay_txs = true;
4070+
LOCK(tx_relay->m_bloom_filter_mutex);
4071+
tx_relay->m_bloom_filter.reset(new CBloomFilter(filter));
4072+
tx_relay->m_relay_txs = true;
40634073
}
40644074
pfrom.m_bloom_filter_loaded = true;
4065-
pfrom.m_relays_txs = true;
40664075
}
40674076
return;
40684077
}
@@ -4081,10 +4090,10 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
40814090
bool bad = false;
40824091
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
40834092
bad = true;
4084-
} else if (peer->m_tx_relay != nullptr) {
4085-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4086-
if (peer->m_tx_relay->m_bloom_filter) {
4087-
peer->m_tx_relay->m_bloom_filter->insert(vData);
4093+
} else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
4094+
LOCK(tx_relay->m_bloom_filter_mutex);
4095+
if (tx_relay->m_bloom_filter) {
4096+
tx_relay->m_bloom_filter->insert(vData);
40884097
} else {
40894098
bad = true;
40904099
}
@@ -4101,14 +4110,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
41014110
pfrom.fDisconnect = true;
41024111
return;
41034112
}
4104-
if (peer->m_tx_relay == nullptr) {
4105-
return;
4106-
}
4113+
auto tx_relay = peer->GetTxRelay();
4114+
if (!tx_relay) return;
41074115

41084116
{
4109-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4110-
peer->m_tx_relay->m_bloom_filter = nullptr;
4111-
peer->m_tx_relay->m_relay_txs = true;
4117+
LOCK(tx_relay->m_bloom_filter_mutex);
4118+
tx_relay->m_bloom_filter = nullptr;
4119+
tx_relay->m_relay_txs = true;
41124120
}
41134121
pfrom.m_bloom_filter_loaded = false;
41144122
pfrom.m_relays_txs = true;
@@ -4119,8 +4127,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
41194127
CAmount newFeeFilter = 0;
41204128
vRecv >> newFeeFilter;
41214129
if (MoneyRange(newFeeFilter)) {
4122-
if (peer->m_tx_relay != nullptr) {
4123-
peer->m_tx_relay->m_fee_filter_received = newFeeFilter;
4130+
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
4131+
tx_relay->m_fee_filter_received = newFeeFilter;
41244132
}
41254133
LogPrint(BCLog::NET, "received: feefilter of %s from peer=%d\n", CFeeRate(newFeeFilter).ToString(), pfrom.GetId());
41264134
}
@@ -4885,72 +4893,72 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
48854893
peer->m_blocks_for_inv_relay.clear();
48864894
}
48874895

4888-
if (peer->m_tx_relay != nullptr) {
4889-
LOCK(peer->m_tx_relay->m_tx_inventory_mutex);
4896+
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
4897+
LOCK(tx_relay->m_tx_inventory_mutex);
48904898
// Check whether periodic sends should happen
48914899
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
4892-
if (peer->m_tx_relay->m_next_inv_send_time < current_time) {
4900+
if (tx_relay->m_next_inv_send_time < current_time) {
48934901
fSendTrickle = true;
48944902
if (pto->IsInboundConn()) {
4895-
peer->m_tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
4903+
tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
48964904
} else {
4897-
peer->m_tx_relay->m_next_inv_send_time = GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL);
4905+
tx_relay->m_next_inv_send_time = GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL);
48984906
}
48994907
}
49004908

49014909
// Time to send but the peer has requested we not relay transactions.
49024910
if (fSendTrickle) {
4903-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4904-
if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear();
4911+
LOCK(tx_relay->m_bloom_filter_mutex);
4912+
if (!tx_relay->m_relay_txs) tx_relay->m_tx_inventory_to_send.clear();
49054913
}
49064914

49074915
// Respond to BIP35 mempool requests
4908-
if (fSendTrickle && peer->m_tx_relay->m_send_mempool) {
4916+
if (fSendTrickle && tx_relay->m_send_mempool) {
49094917
auto vtxinfo = m_mempool.infoAll();
4910-
peer->m_tx_relay->m_send_mempool = false;
4911-
const CFeeRate filterrate{peer->m_tx_relay->m_fee_filter_received.load()};
4918+
tx_relay->m_send_mempool = false;
4919+
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
49124920

4913-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4921+
LOCK(tx_relay->m_bloom_filter_mutex);
49144922

49154923
for (const auto& txinfo : vtxinfo) {
49164924
const uint256& hash = peer->m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
49174925
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
4918-
peer->m_tx_relay->m_tx_inventory_to_send.erase(hash);
4926+
tx_relay->m_tx_inventory_to_send.erase(hash);
49194927
// Don't send transactions that peers will not put into their mempool
49204928
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
49214929
continue;
49224930
}
4923-
if (peer->m_tx_relay->m_bloom_filter) {
4924-
if (!peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
4931+
if (tx_relay->m_bloom_filter) {
4932+
if (!tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
49254933
}
4926-
peer->m_tx_relay->m_tx_inventory_known_filter.insert(hash);
4934+
tx_relay->m_tx_inventory_known_filter.insert(hash);
49274935
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
49284936
vInv.push_back(inv);
49294937
if (vInv.size() == MAX_INV_SZ) {
49304938
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
49314939
vInv.clear();
49324940
}
49334941
}
4934-
peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
4942+
tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
49354943
}
49364944

49374945
// Determine transactions to relay
49384946
if (fSendTrickle) {
49394947
// Produce a vector with all candidates for sending
49404948
std::vector<std::set<uint256>::iterator> vInvTx;
4941-
vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size());
4942-
for (std::set<uint256>::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) {
4949+
vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());
4950+
for (std::set<uint256>::iterator it = tx_relay->m_tx_inventory_to_send.begin(); it != tx_relay->m_tx_inventory_to_send.end(); it++) {
49434951
vInvTx.push_back(it);
49444952
}
4945-
const CFeeRate filterrate{peer->m_tx_relay->m_fee_filter_received.load()};
4953+
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
49464954
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
49474955
// A heap is used so that not all items need sorting if only a few are being sent.
49484956
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
49494957
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
49504958
// No reason to drain out at many times the network's capacity,
49514959
// especially since we have many peers and some will draw much shorter delays.
49524960
unsigned int nRelayedTransactions = 0;
4953-
LOCK(peer->m_tx_relay->m_bloom_filter_mutex);
4961+
LOCK(tx_relay->m_bloom_filter_mutex);
49544962
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
49554963
// Fetch the top element from the heap
49564964
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
@@ -4959,9 +4967,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
49594967
uint256 hash = *it;
49604968
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
49614969
// Remove it from the to-be-sent set
4962-
peer->m_tx_relay->m_tx_inventory_to_send.erase(it);
4970+
tx_relay->m_tx_inventory_to_send.erase(it);
49634971
// Check if not in the filter already
4964-
if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
4972+
if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
49654973
continue;
49664974
}
49674975
// Not in the mempool anymore? don't bother sending it.
@@ -4975,7 +4983,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
49754983
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
49764984
continue;
49774985
}
4978-
if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
4986+
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
49794987
// Send
49804988
State(pto->GetId())->m_recently_announced_invs.insert(hash);
49814989
vInv.push_back(inv);
@@ -5002,14 +5010,14 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
50025010
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
50035011
vInv.clear();
50045012
}
5005-
peer->m_tx_relay->m_tx_inventory_known_filter.insert(hash);
5013+
tx_relay->m_tx_inventory_known_filter.insert(hash);
50065014
if (hash != txid) {
50075015
// Insert txid into m_tx_inventory_known_filter, even for
50085016
// wtxidrelay peers. This prevents re-adding of
50095017
// unconfirmed parents to the recently_announced
50105018
// filter, when a child tx is requested. See
50115019
// ProcessGetData().
5012-
peer->m_tx_relay->m_tx_inventory_known_filter.insert(txid);
5020+
tx_relay->m_tx_inventory_known_filter.insert(txid);
50135021
}
50145022
}
50155023
}

0 commit comments

Comments
 (0)