@@ -515,8 +515,10 @@ struct Peer {
515515 /* * Set of txids to reconsider once their parent transactions have been accepted **/
516516 std::set<uint256> m_orphan_work_set GUARDED_BY (g_cs_orphans);
517517
518+ /* * Protects vRecvGetData **/
519+ Mutex m_getdata_requests_mutex;
518520 /* * Work queue of items requested by this peer **/
519- std::deque<CInv> vRecvGetData;
521+ std::deque<CInv> vRecvGetData GUARDED_BY (m_getdata_requests_mutex) ;
520522
521523 Peer (NodeId id) : m_id(id) {}
522524};
@@ -1753,14 +1755,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
17531755 return {};
17541756}
17551757
1756- void static ProcessGetData (CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) LOCKS_EXCLUDED( cs_main)
1758+ void static ProcessGetData (CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(! cs_main, peer.m_getdata_requests_mutex )
17571759{
17581760 AssertLockNotHeld (cs_main);
17591761
1760- PeerRef peer = GetPeerRef (pfrom.GetId ());
1761- if (peer == nullptr ) return ;
1762-
1763- std::deque<CInv>::iterator it = peer->vRecvGetData .begin ();
1762+ std::deque<CInv>::iterator it = peer.vRecvGetData .begin ();
17641763 std::vector<CInv> vNotFound;
17651764 const CNetMsgMaker msgMaker (pfrom.GetCommonVersion ());
17661765
@@ -1772,7 +1771,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
17721771 // Process as many TX items from the front of the getdata queue as
17731772 // possible, since they're common and it's efficient to batch process
17741773 // them.
1775- while (it != peer-> vRecvGetData .end () && it->IsGenTxMsg ()) {
1774+ while (it != peer. vRecvGetData .end () && it->IsGenTxMsg ()) {
17761775 if (interruptMsgProc) return ;
17771776 // The send buffer provides backpressure. If there's no space in
17781777 // the buffer, pause processing until the next call.
@@ -1820,7 +1819,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
18201819
18211820 // Only process one BLOCK item per call, since they're uncommon and can be
18221821 // expensive to process.
1823- if (it != peer-> vRecvGetData .end () && !pfrom.fPauseSend ) {
1822+ if (it != peer. vRecvGetData .end () && !pfrom.fPauseSend ) {
18241823 const CInv &inv = *it++;
18251824 if (inv.IsGenBlkMsg ()) {
18261825 ProcessGetBlockData (pfrom, chainparams, inv, connman);
@@ -1829,7 +1828,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
18291828 // and continue processing the queue on the next call.
18301829 }
18311830
1832- peer-> vRecvGetData .erase (peer-> vRecvGetData .begin (), it);
1831+ peer. vRecvGetData .erase (peer. vRecvGetData .begin (), it);
18331832
18341833 if (!vNotFound.empty ()) {
18351834 // Let the peer know that we didn't find what it asked for, so it doesn't
@@ -2811,8 +2810,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
28112810 LogPrint (BCLog::NET, " received getdata for: %s peer=%d\n " , vInv[0 ].ToString (), pfrom.GetId ());
28122811 }
28132812
2814- peer->vRecvGetData .insert (peer->vRecvGetData .end (), vInv.begin (), vInv.end ());
2815- ProcessGetData (pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2813+ {
2814+ LOCK (peer->m_getdata_requests_mutex );
2815+ peer->vRecvGetData .insert (peer->vRecvGetData .end (), vInv.begin (), vInv.end ());
2816+ ProcessGetData (pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2817+ }
2818+
28162819 return ;
28172820 }
28182821
@@ -2900,36 +2903,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
29002903 return ;
29012904 }
29022905
2903- LOCK (cs_main);
2906+ {
2907+ LOCK (cs_main);
29042908
2905- const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2906- if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2907- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2908- return ;
2909- }
2909+ const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2910+ if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2911+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2912+ return ;
2913+ }
29102914
2911- if (pindex->nHeight < ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2912- // If an older block is requested (should never happen in practice,
2913- // but can happen in tests) send a block response instead of a
2914- // blocktxn response. Sending a full block response instead of a
2915- // small blocktxn response is preferable in the case where a peer
2916- // might maliciously send lots of getblocktxn requests to trigger
2917- // expensive disk reads, because it will require the peer to
2918- // actually receive all the data read from disk over the network.
2919- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2920- CInv inv;
2921- inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
2922- inv.hash = req.blockhash ;
2923- peer->vRecvGetData .push_back (inv);
2924- // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
2925- return ;
2926- }
2915+ if (pindex->nHeight >= ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2916+ CBlock block;
2917+ bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2918+ assert (ret);
29272919
2928- CBlock block;
2929- bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2930- assert (ret);
2920+ SendBlockTransactions (pfrom, block, req);
2921+ return ;
2922+ }
2923+ }
29312924
2932- SendBlockTransactions (pfrom, block, req);
2925+ // If an older block is requested (should never happen in practice,
2926+ // but can happen in tests) send a block response instead of a
2927+ // blocktxn response. Sending a full block response instead of a
2928+ // small blocktxn response is preferable in the case where a peer
2929+ // might maliciously send lots of getblocktxn requests to trigger
2930+ // expensive disk reads, because it will require the peer to
2931+ // actually receive all the data read from disk over the network.
2932+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2933+ CInv inv;
2934+ WITH_LOCK (cs_main, inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
2935+ inv.hash = req.blockhash ;
2936+ WITH_LOCK (peer->m_getdata_requests_mutex , peer->vRecvGetData .push_back (inv));
2937+ // The message processing loop will go around again (without pausing) and we'll respond then
29332938 return ;
29342939 }
29352940
@@ -3879,8 +3884,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
38793884 PeerRef peer = GetPeerRef (pfrom->GetId ());
38803885 if (peer == nullptr ) return false ;
38813886
3882- if (!peer->vRecvGetData .empty ()) {
3883- ProcessGetData (*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3887+ {
3888+ LOCK (peer->m_getdata_requests_mutex );
3889+ if (!peer->vRecvGetData .empty ()) {
3890+ ProcessGetData (*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3891+ }
38843892 }
38853893
38863894 {
@@ -3895,7 +3903,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
38953903
38963904 // this maintains the order of responses
38973905 // and prevents vRecvGetData to grow unbounded
3898- if (!peer->vRecvGetData .empty ()) return true ;
3906+ {
3907+ LOCK (peer->m_getdata_requests_mutex );
3908+ if (!peer->vRecvGetData .empty ()) return true ;
3909+ }
3910+
38993911 {
39003912 LOCK (g_cs_orphans);
39013913 if (!peer->m_orphan_work_set .empty ()) return true ;
@@ -3926,10 +3938,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
39263938
39273939 try {
39283940 ProcessMessage (*pfrom, msg_type, msg.m_recv , msg.m_time , interruptMsgProc);
3929- if (interruptMsgProc)
3930- return false ;
3931- if (!peer->vRecvGetData .empty ())
3932- fMoreWork = true ;
3941+ if (interruptMsgProc) return false ;
3942+ {
3943+ LOCK (peer->m_getdata_requests_mutex );
3944+ if (!peer->vRecvGetData .empty ()) fMoreWork = true ;
3945+ }
39333946 } catch (const std::exception& e) {
39343947 LogPrint (BCLog::NET, " %s(%s, %u bytes): Exception '%s' (%s) caught\n " , __func__, SanitizeString (msg_type), nMessageSize, e.what (), typeid (e).name ());
39353948 } catch (...) {
0 commit comments