@@ -515,8 +515,10 @@ struct Peer {
515
515
/* * Set of txids to reconsider once their parent transactions have been accepted **/
516
516
std::set<uint256> m_orphan_work_set GUARDED_BY (g_cs_orphans);
517
517
518
+ /* * Protects vRecvGetData **/
519
+ Mutex m_getdata_requests_mutex;
518
520
/* * Work queue of items requested by this peer **/
519
- std::deque<CInv> vRecvGetData;
521
+ std::deque<CInv> vRecvGetData GUARDED_BY (m_getdata_requests_mutex) ;
520
522
521
523
Peer (NodeId id) : m_id(id) {}
522
524
};
@@ -1753,14 +1755,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
1753
1755
return {};
1754
1756
}
1755
1757
1756
- void static ProcessGetData (CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) LOCKS_EXCLUDED( cs_main)
1758
+ void static ProcessGetData (CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(! cs_main, peer.m_getdata_requests_mutex )
1757
1759
{
1758
1760
AssertLockNotHeld (cs_main);
1759
1761
1760
- PeerRef peer = GetPeerRef (pfrom.GetId ());
1761
- if (peer == nullptr ) return ;
1762
-
1763
- std::deque<CInv>::iterator it = peer->vRecvGetData .begin ();
1762
+ std::deque<CInv>::iterator it = peer.vRecvGetData .begin ();
1764
1763
std::vector<CInv> vNotFound;
1765
1764
const CNetMsgMaker msgMaker (pfrom.GetCommonVersion ());
1766
1765
@@ -1772,7 +1771,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1772
1771
// Process as many TX items from the front of the getdata queue as
1773
1772
// possible, since they're common and it's efficient to batch process
1774
1773
// them.
1775
- while (it != peer-> vRecvGetData .end () && it->IsGenTxMsg ()) {
1774
+ while (it != peer. vRecvGetData .end () && it->IsGenTxMsg ()) {
1776
1775
if (interruptMsgProc) return ;
1777
1776
// The send buffer provides backpressure. If there's no space in
1778
1777
// the buffer, pause processing until the next call.
@@ -1820,7 +1819,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1820
1819
1821
1820
// Only process one BLOCK item per call, since they're uncommon and can be
1822
1821
// expensive to process.
1823
- if (it != peer-> vRecvGetData .end () && !pfrom.fPauseSend ) {
1822
+ if (it != peer. vRecvGetData .end () && !pfrom.fPauseSend ) {
1824
1823
const CInv &inv = *it++;
1825
1824
if (inv.IsGenBlkMsg ()) {
1826
1825
ProcessGetBlockData (pfrom, chainparams, inv, connman);
@@ -1829,7 +1828,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
1829
1828
// and continue processing the queue on the next call.
1830
1829
}
1831
1830
1832
- peer-> vRecvGetData .erase (peer-> vRecvGetData .begin (), it);
1831
+ peer. vRecvGetData .erase (peer. vRecvGetData .begin (), it);
1833
1832
1834
1833
if (!vNotFound.empty ()) {
1835
1834
// Let the peer know that we didn't find what it asked for, so it doesn't
@@ -2811,8 +2810,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2811
2810
LogPrint (BCLog::NET, " received getdata for: %s peer=%d\n " , vInv[0 ].ToString (), pfrom.GetId ());
2812
2811
}
2813
2812
2814
- peer->vRecvGetData .insert (peer->vRecvGetData .end (), vInv.begin (), vInv.end ());
2815
- ProcessGetData (pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2813
+ {
2814
+ LOCK (peer->m_getdata_requests_mutex );
2815
+ peer->vRecvGetData .insert (peer->vRecvGetData .end (), vInv.begin (), vInv.end ());
2816
+ ProcessGetData (pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2817
+ }
2818
+
2816
2819
return ;
2817
2820
}
2818
2821
@@ -2900,36 +2903,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
2900
2903
return ;
2901
2904
}
2902
2905
2903
- LOCK (cs_main);
2906
+ {
2907
+ LOCK (cs_main);
2904
2908
2905
- const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2906
- if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2907
- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2908
- return ;
2909
- }
2909
+ const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2910
+ if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2911
+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2912
+ return ;
2913
+ }
2910
2914
2911
- if (pindex->nHeight < ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2912
- // If an older block is requested (should never happen in practice,
2913
- // but can happen in tests) send a block response instead of a
2914
- // blocktxn response. Sending a full block response instead of a
2915
- // small blocktxn response is preferable in the case where a peer
2916
- // might maliciously send lots of getblocktxn requests to trigger
2917
- // expensive disk reads, because it will require the peer to
2918
- // actually receive all the data read from disk over the network.
2919
- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2920
- CInv inv;
2921
- inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
2922
- inv.hash = req.blockhash ;
2923
- peer->vRecvGetData .push_back (inv);
2924
- // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
2925
- return ;
2926
- }
2915
+ if (pindex->nHeight >= ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2916
+ CBlock block;
2917
+ bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2918
+ assert (ret);
2927
2919
2928
- CBlock block;
2929
- bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2930
- assert (ret);
2920
+ SendBlockTransactions (pfrom, block, req);
2921
+ return ;
2922
+ }
2923
+ }
2931
2924
2932
- SendBlockTransactions (pfrom, block, req);
2925
+ // If an older block is requested (should never happen in practice,
2926
+ // but can happen in tests) send a block response instead of a
2927
+ // blocktxn response. Sending a full block response instead of a
2928
+ // small blocktxn response is preferable in the case where a peer
2929
+ // might maliciously send lots of getblocktxn requests to trigger
2930
+ // expensive disk reads, because it will require the peer to
2931
+ // actually receive all the data read from disk over the network.
2932
+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2933
+ CInv inv;
2934
+ WITH_LOCK (cs_main, inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
2935
+ inv.hash = req.blockhash ;
2936
+ WITH_LOCK (peer->m_getdata_requests_mutex , peer->vRecvGetData .push_back (inv));
2937
+ // The message processing loop will go around again (without pausing) and we'll respond then
2933
2938
return ;
2934
2939
}
2935
2940
@@ -3879,8 +3884,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
3879
3884
PeerRef peer = GetPeerRef (pfrom->GetId ());
3880
3885
if (peer == nullptr ) return false ;
3881
3886
3882
- if (!peer->vRecvGetData .empty ()) {
3883
- ProcessGetData (*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3887
+ {
3888
+ LOCK (peer->m_getdata_requests_mutex );
3889
+ if (!peer->vRecvGetData .empty ()) {
3890
+ ProcessGetData (*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3891
+ }
3884
3892
}
3885
3893
3886
3894
{
@@ -3895,7 +3903,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
3895
3903
3896
3904
// this maintains the order of responses
3897
3905
// and prevents vRecvGetData to grow unbounded
3898
- if (!peer->vRecvGetData .empty ()) return true ;
3906
+ {
3907
+ LOCK (peer->m_getdata_requests_mutex );
3908
+ if (!peer->vRecvGetData .empty ()) return true ;
3909
+ }
3910
+
3899
3911
{
3900
3912
LOCK (g_cs_orphans);
3901
3913
if (!peer->m_orphan_work_set .empty ()) return true ;
@@ -3926,10 +3938,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
3926
3938
3927
3939
try {
3928
3940
ProcessMessage (*pfrom, msg_type, msg.m_recv , msg.m_time , interruptMsgProc);
3929
- if (interruptMsgProc)
3930
- return false ;
3931
- if (!peer->vRecvGetData .empty ())
3932
- fMoreWork = true ;
3941
+ if (interruptMsgProc) return false ;
3942
+ {
3943
+ LOCK (peer->m_getdata_requests_mutex );
3944
+ if (!peer->vRecvGetData .empty ()) fMoreWork = true ;
3945
+ }
3933
3946
} catch (const std::exception& e) {
3934
3947
LogPrint (BCLog::NET, " %s(%s, %u bytes): Exception '%s' (%s) caught\n " , __func__, SanitizeString (msg_type), nMessageSize, e.what (), typeid (e).name ());
3935
3948
} catch (...) {
0 commit comments