@@ -446,6 +446,14 @@ struct Peer {
446446 /* * Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
447447 bool m_should_discourage GUARDED_BY (m_misbehavior_mutex){false };
448448
449+ /* * Set of txids to reconsider once their parent transactions have been accepted **/
450+ std::set<uint256> m_orphan_work_set GUARDED_BY (g_cs_orphans);
451+
452+ /* * Protects m_getdata_requests **/
453+ Mutex m_getdata_requests_mutex;
454+ /* * Work queue of items requested by this peer **/
455+ std::deque<CInv> m_getdata_requests GUARDED_BY (m_getdata_requests_mutex);
456+
449457 Peer (NodeId id) : m_id(id) {}
450458};
451459
@@ -1654,11 +1662,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
16541662 return {};
16551663}
16561664
1657- void static ProcessGetData (CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) LOCKS_EXCLUDED( cs_main)
1665+ void static ProcessGetData (CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool >& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(! cs_main, peer.m_getdata_requests_mutex )
16581666{
16591667 AssertLockNotHeld (cs_main);
16601668
1661- std::deque<CInv>::iterator it = pfrom. vRecvGetData .begin ();
1669+ std::deque<CInv>::iterator it = peer. m_getdata_requests .begin ();
16621670 std::vector<CInv> vNotFound;
16631671 const CNetMsgMaker msgMaker (pfrom.GetCommonVersion ());
16641672
@@ -1670,7 +1678,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
16701678 // Process as many TX items from the front of the getdata queue as
16711679 // possible, since they're common and it's efficient to batch process
16721680 // them.
1673- while (it != pfrom. vRecvGetData .end () && it->IsGenTxMsg ()) {
1681+ while (it != peer. m_getdata_requests .end () && it->IsGenTxMsg ()) {
16741682 if (interruptMsgProc) return ;
16751683 // The send buffer provides backpressure. If there's no space in
16761684 // the buffer, pause processing until the next call.
@@ -1718,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
17181726
17191727 // Only process one BLOCK item per call, since they're uncommon and can be
17201728 // expensive to process.
1721- if (it != pfrom. vRecvGetData .end () && !pfrom.fPauseSend ) {
1729+ if (it != peer. m_getdata_requests .end () && !pfrom.fPauseSend ) {
17221730 const CInv &inv = *it++;
17231731 if (inv.IsGenBlkMsg ()) {
17241732 ProcessGetBlockData (pfrom, chainparams, inv, connman);
@@ -1727,7 +1735,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
17271735 // and continue processing the queue on the next call.
17281736 }
17291737
1730- pfrom. vRecvGetData .erase (pfrom. vRecvGetData .begin (), it);
1738+ peer. m_getdata_requests .erase (peer. m_getdata_requests .begin (), it);
17311739
17321740 if (!vNotFound.empty ()) {
17331741 // Let the peer know that we didn't find what it asked for, so it doesn't
@@ -2270,6 +2278,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
22702278 return ;
22712279 }
22722280
2281+ PeerRef peer = GetPeerRef (pfrom.GetId ());
2282+ if (peer == nullptr ) return ;
22732283
22742284 if (msg_type == NetMsgType::VERSION) {
22752285 // Each connection can only send one version message
@@ -2708,8 +2718,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
27082718 LogPrint (BCLog::NET, " received getdata for: %s peer=%d\n " , vInv[0 ].ToString (), pfrom.GetId ());
27092719 }
27102720
2711- pfrom.vRecvGetData .insert (pfrom.vRecvGetData .end (), vInv.begin (), vInv.end ());
2712- ProcessGetData (pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2721+ {
2722+ LOCK (peer->m_getdata_requests_mutex );
2723+ peer->m_getdata_requests .insert (peer->m_getdata_requests .end (), vInv.begin (), vInv.end ());
2724+ ProcessGetData (pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
2725+ }
2726+
27132727 return ;
27142728 }
27152729
@@ -2797,36 +2811,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
27972811 return ;
27982812 }
27992813
2800- LOCK (cs_main);
2814+ {
2815+ LOCK (cs_main);
28012816
2802- const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2803- if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2804- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2805- return ;
2806- }
2817+ const CBlockIndex* pindex = LookupBlockIndex (req.blockhash );
2818+ if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
2819+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block we don't have\n " , pfrom.GetId ());
2820+ return ;
2821+ }
28072822
2808- if (pindex->nHeight < ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2809- // If an older block is requested (should never happen in practice,
2810- // but can happen in tests) send a block response instead of a
2811- // blocktxn response. Sending a full block response instead of a
2812- // small blocktxn response is preferable in the case where a peer
2813- // might maliciously send lots of getblocktxn requests to trigger
2814- // expensive disk reads, because it will require the peer to
2815- // actually receive all the data read from disk over the network.
2816- LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2817- CInv inv;
2818- inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
2819- inv.hash = req.blockhash ;
2820- pfrom.vRecvGetData .push_back (inv);
2821- // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
2822- return ;
2823- }
2823+ if (pindex->nHeight >= ::ChainActive ().Height () - MAX_BLOCKTXN_DEPTH) {
2824+ CBlock block;
2825+ bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2826+ assert (ret);
28242827
2825- CBlock block;
2826- bool ret = ReadBlockFromDisk (block, pindex, m_chainparams.GetConsensus ());
2827- assert (ret);
2828+ SendBlockTransactions (pfrom, block, req);
2829+ return ;
2830+ }
2831+ }
28282832
2829- SendBlockTransactions (pfrom, block, req);
2833+ // If an older block is requested (should never happen in practice,
2834+ // but can happen in tests) send a block response instead of a
2835+ // blocktxn response. Sending a full block response instead of a
2836+ // small blocktxn response is preferable in the case where a peer
2837+ // might maliciously send lots of getblocktxn requests to trigger
2838+ // expensive disk reads, because it will require the peer to
2839+ // actually receive all the data read from disk over the network.
2840+ LogPrint (BCLog::NET, " Peer %d sent us a getblocktxn for a block > %i deep\n " , pfrom.GetId (), MAX_BLOCKTXN_DEPTH);
2841+ CInv inv;
2842+ WITH_LOCK (cs_main, inv.type = State (pfrom.GetId ())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
2843+ inv.hash = req.blockhash ;
2844+ WITH_LOCK (peer->m_getdata_requests_mutex , peer->m_getdata_requests .push_back (inv));
2845+ // The message processing loop will go around again (without pausing) and we'll respond then
28302846 return ;
28312847 }
28322848
@@ -2961,7 +2977,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
29612977 auto it_by_prev = mapOrphanTransactionsByPrev.find (COutPoint (txid, i));
29622978 if (it_by_prev != mapOrphanTransactionsByPrev.end ()) {
29632979 for (const auto & elem : it_by_prev->second ) {
2964- pfrom. orphan_work_set .insert (elem->first );
2980+ peer-> m_orphan_work_set .insert (elem->first );
29652981 }
29662982 }
29672983 }
@@ -2978,7 +2994,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
29782994 }
29792995
29802996 // Recursively process any orphan transactions that depended on this one
2981- ProcessOrphanTx (pfrom. orphan_work_set );
2997+ ProcessOrphanTx (peer-> m_orphan_work_set );
29822998 }
29832999 else if (state.GetResult () == TxValidationResult::TX_MISSING_INPUTS)
29843000 {
@@ -3773,21 +3789,37 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
37733789{
37743790 bool fMoreWork = false ;
37753791
3776- if (! pfrom->vRecvGetData . empty ())
3777- ProcessGetData (*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc) ;
3792+ PeerRef peer = GetPeerRef ( pfrom->GetId ());
3793+ if (peer == nullptr ) return false ;
37783794
3779- if (!pfrom->orphan_work_set .empty ()) {
3795+ {
3796+ LOCK (peer->m_getdata_requests_mutex );
3797+ if (!peer->m_getdata_requests .empty ()) {
3798+ ProcessGetData (*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
3799+ }
3800+ }
3801+
3802+ {
37803803 LOCK2 (cs_main, g_cs_orphans);
3781- ProcessOrphanTx (pfrom->orphan_work_set );
3804+ if (!peer->m_orphan_work_set .empty ()) {
3805+ ProcessOrphanTx (peer->m_orphan_work_set );
3806+ }
37823807 }
37833808
37843809 if (pfrom->fDisconnect )
37853810 return false ;
37863811
37873812 // this maintains the order of responses
3788- // and prevents vRecvGetData to grow unbounded
3789- if (!pfrom->vRecvGetData .empty ()) return true ;
3790- if (!pfrom->orphan_work_set .empty ()) return true ;
3813+ // and prevents m_getdata_requests to grow unbounded
3814+ {
3815+ LOCK (peer->m_getdata_requests_mutex );
3816+ if (!peer->m_getdata_requests .empty ()) return true ;
3817+ }
3818+
3819+ {
3820+ LOCK (g_cs_orphans);
3821+ if (!peer->m_orphan_work_set .empty ()) return true ;
3822+ }
37913823
37923824 // Don't bother if send buffer is too full to respond anyway
37933825 if (pfrom->fPauseSend )
@@ -3814,10 +3846,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
38143846
38153847 try {
38163848 ProcessMessage (*pfrom, msg_type, msg.m_recv , msg.m_time , interruptMsgProc);
3817- if (interruptMsgProc)
3818- return false ;
3819- if (!pfrom->vRecvGetData .empty ())
3820- fMoreWork = true ;
3849+ if (interruptMsgProc) return false ;
3850+ {
3851+ LOCK (peer->m_getdata_requests_mutex );
3852+ if (!peer->m_getdata_requests .empty ()) fMoreWork = true ;
3853+ }
38213854 } catch (const std::exception& e) {
38223855 LogPrint (BCLog::NET, " %s(%s, %u bytes): Exception '%s' (%s) caught\n " , __func__, SanitizeString (msg_type), nMessageSize, e.what (), typeid (e).name ());
38233856 } catch (...) {
0 commit comments