@@ -446,6 +446,14 @@ struct Peer {
446
446
/* * Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
447
447
bool m_should_discourage GUARDED_BY (m_misbehavior_mutex){false };
448
448
449
+ /* * Set of txids to reconsider once their parent transactions have been accepted **/
450
+ std::set<uint256> m_orphan_work_set GUARDED_BY (g_cs_orphans);
451
+
452
+ /* * Protects m_getdata_requests **/
453
+ Mutex m_getdata_requests_mutex;
454
+ /* * Work queue of items requested by this peer **/
455
+ std::deque<CInv> m_getdata_requests GUARDED_BY (m_getdata_requests_mutex);
456
+
449
457
Peer (NodeId id) : m_id(id) {}
450
458
};
451
459
@@ -1654,11 +1662,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
1654
1662
return {};
1655
1663
}
1656
1664
1657
- void static ProcessGetData (CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) LOCKS_EXCLUDED( cs_main)
1665
+ void static ProcessGetData (CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(! cs_main, peer.m_getdata_requests_mutex )
1658
1666
{
1659
1667
AssertLockNotHeld (cs_main);
1660
1668
1661
- std::deque<CInv>::iterator it = pfrom. vRecvGetData .begin ();
1669
+ std::deque<CInv>::iterator it = peer. m_getdata_requests .begin ();
1662
1670
std::vector<CInv> vNotFound;
1663
1671
const CNetMsgMaker msgMaker (pfrom.GetCommonVersion ());
1664
1672
@@ -1670,7 +1678,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1670
1678
// Process as many TX items from the front of the getdata queue as
1671
1679
// possible, since they're common and it's efficient to batch process
1672
1680
// them.
1673
- while (it != pfrom. vRecvGetData .end () && it->IsGenTxMsg ()) {
1681
+ while (it != peer. m_getdata_requests .end () && it->IsGenTxMsg ()) {
1674
1682
if (interruptMsgProc) return ;
1675
1683
// The send buffer provides backpressure. If there's no space in
1676
1684
// the buffer, pause processing until the next call.
@@ -1718,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1718
1726
1719
1727
// Only process one BLOCK item per call, since they're uncommon and can be
1720
1728
// expensive to process.
1721
- if (it != pfrom. vRecvGetData .end () && !pfrom.fPauseSend ) {
1729
+ if (it != peer. m_getdata_requests .end () && !pfrom.fPauseSend ) {
1722
1730
const CInv &inv = *it++;
1723
1731
if (inv.IsGenBlkMsg ()) {
1724
1732
ProcessGetBlockData (pfrom, chainparams, inv, connman);
@@ -1727,7 +1735,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1727
1735
// and continue processing the queue on the next call.
1728
1736
}
1729
1737
1730
- pfrom. vRecvGetData .erase (pfrom. vRecvGetData .begin (), it);
1738
+ peer. m_getdata_requests .erase (peer. m_getdata_requests .begin (), it);
1731
1739
1732
1740
if (!vNotFound.empty ()) {
1733
1741
// Let the peer know that we didn't find what it asked for, so it doesn't
@@ -2270,6 +2278,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2270
2278
return ;
2271
2279
}
2272
2280
2281
+ PeerRef peer = GetPeerRef (pfrom.GetId ());
2282
+ if (peer == nullptr ) return ;
2273
2283
2274
2284
if (msg_type == NetMsgType::VERSION) {
2275
2285
// Each connection can only send one version message
@@ -2708,8 +2718,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2708
2718
LogPrint (BCLog::NET, " received getdata for: %s peer=%d\n " , vInv[0 ].ToString (), pfrom.GetId ());
2709
2719
}
2710
2720
2711
- pfrom.vRecvGetData .insert (pfrom.vRecvGetData .end (), vInv.begin (), vInv.end ());
2712
- ProcessGetData (pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2721
+ {
2722
+ LOCK (peer->m_getdata_requests_mutex );
2723
+ peer->m_getdata_requests .insert (peer->m_getdata_requests .end (), vInv.begin (), vInv.end ());
2724
+ ProcessGetData (pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2725
+ }
2726
+
2713
2727
return ;
2714
2728
}
2715
2729
@@ -2797,36 +2811,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2797
2811
return ;
2798
2812
}
2799
2813
2800
- LOCK (cs_main);
2814
+ {
2815
+ LOCK (cs_main);
2801
2816
2802
- const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2803
- if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2804
- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2805
- return ;
2806
- }
2817
+ const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2818
+ if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2819
+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2820
+ return ;
2821
+ }
2807
2822
2808
- if (pindex->nHeight < ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2809
- // If an older block is requested (should never happen in practice,
2810
- // but can happen in tests) send a block response instead of a
2811
- // blocktxn response. Sending a full block response instead of a
2812
- // small blocktxn response is preferable in the case where a peer
2813
- // might maliciously send lots of getblocktxn requests to trigger
2814
- // expensive disk reads, because it will require the peer to
2815
- // actually receive all the data read from disk over the network.
2816
- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2817
- CInv inv;
2818
- inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
2819
- inv.hash = req.blockhash ;
2820
- pfrom.vRecvGetData .push_back (inv);
2821
- // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
2822
- return ;
2823
- }
2823
+ if (pindex->nHeight >= ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2824
+ CBlock block;
2825
+ bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2826
+ assert (ret);
2824
2827
2825
- CBlock block;
2826
- bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2827
- assert (ret);
2828
+ SendBlockTransactions (pfrom, block, req);
2829
+ return ;
2830
+ }
2831
+ }
2828
2832
2829
- SendBlockTransactions (pfrom, block, req);
2833
+ // If an older block is requested (should never happen in practice,
2834
+ // but can happen in tests) send a block response instead of a
2835
+ // blocktxn response. Sending a full block response instead of a
2836
+ // small blocktxn response is preferable in the case where a peer
2837
+ // might maliciously send lots of getblocktxn requests to trigger
2838
+ // expensive disk reads, because it will require the peer to
2839
+ // actually receive all the data read from disk over the network.
2840
+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2841
+ CInv inv;
2842
+ WITH_LOCK (cs_main, inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
2843
+ inv.hash = req.blockhash ;
2844
+ WITH_LOCK (peer->m_getdata_requests_mutex , peer->m_getdata_requests .push_back (inv));
2845
+ // The message processing loop will go around again (without pausing) and we'll respond then
2830
2846
return ;
2831
2847
}
2832
2848
@@ -2961,7 +2977,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2961
2977
auto it_by_prev = mapOrphanTransactionsByPrev.find (COutPoint (txid, i));
2962
2978
if (it_by_prev != mapOrphanTransactionsByPrev.end ()) {
2963
2979
for (const auto & elem : it_by_prev->second ) {
2964
- pfrom. orphan_work_set .insert (elem->first );
2980
+ peer-> m_orphan_work_set .insert (elem->first );
2965
2981
}
2966
2982
}
2967
2983
}
@@ -2978,7 +2994,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2978
2994
}
2979
2995
2980
2996
// Recursively process any orphan transactions that depended on this one
2981
- ProcessOrphanTx (pfrom. orphan_work_set );
2997
+ ProcessOrphanTx (peer-> m_orphan_work_set );
2982
2998
}
2983
2999
else if (state.GetResult () == TxValidationResult::TX_MISSING_INPUTS)
2984
3000
{
@@ -3773,21 +3789,37 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
3773
3789
{
3774
3790
bool fMoreWork = false ;
3775
3791
3776
- if (! pfrom->vRecvGetData . empty ())
3777
- ProcessGetData (*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc) ;
3792
+ PeerRef peer = GetPeerRef ( pfrom->GetId ());
3793
+ if (peer == nullptr ) return false ;
3778
3794
3779
- if (!pfrom->orphan_work_set .empty ()) {
3795
+ {
3796
+ LOCK (peer->m_getdata_requests_mutex );
3797
+ if (!peer->m_getdata_requests .empty ()) {
3798
+ ProcessGetData (*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3799
+ }
3800
+ }
3801
+
3802
+ {
3780
3803
LOCK2 (cs_main, g_cs_orphans);
3781
- ProcessOrphanTx (pfrom->orphan_work_set );
3804
+ if (!peer->m_orphan_work_set .empty ()) {
3805
+ ProcessOrphanTx (peer->m_orphan_work_set );
3806
+ }
3782
3807
}
3783
3808
3784
3809
if (pfrom->fDisconnect )
3785
3810
return false ;
3786
3811
3787
3812
// this maintains the order of responses
3788
- // and prevents vRecvGetData to grow unbounded
3789
- if (!pfrom->vRecvGetData .empty ()) return true ;
3790
- if (!pfrom->orphan_work_set .empty ()) return true ;
3813
+ // and prevents m_getdata_requests to grow unbounded
3814
+ {
3815
+ LOCK (peer->m_getdata_requests_mutex );
3816
+ if (!peer->m_getdata_requests .empty ()) return true ;
3817
+ }
3818
+
3819
+ {
3820
+ LOCK (g_cs_orphans);
3821
+ if (!peer->m_orphan_work_set .empty ()) return true ;
3822
+ }
3791
3823
3792
3824
// Don't bother if send buffer is too full to respond anyway
3793
3825
if (pfrom->fPauseSend )
@@ -3814,10 +3846,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
3814
3846
3815
3847
try {
3816
3848
ProcessMessage (*pfrom, msg_type, msg.m_recv , msg.m_time , interruptMsgProc);
3817
- if (interruptMsgProc)
3818
- return false ;
3819
- if (!pfrom->vRecvGetData .empty ())
3820
- fMoreWork = true ;
3849
+ if (interruptMsgProc) return false ;
3850
+ {
3851
+ LOCK (peer->m_getdata_requests_mutex );
3852
+ if (!peer->m_getdata_requests .empty ()) fMoreWork = true ;
3853
+ }
3821
3854
} catch (const std::exception& e) {
3822
3855
LogPrint (BCLog::NET, " %s(%s, %u bytes): Exception '%s' (%s) caught\n " , __func__, SanitizeString (msg_type), nMessageSize, e.what (), typeid (e).name ());
3823
3856
} catch (...) {
0 commit comments