@@ -68,11 +68,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
68
68
/* * Maximum number of announced transactions from a peer */
69
69
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
70
70
/* * How many microseconds to delay requesting transactions from inbound peers */
71
- static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ;
71
+ static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ; // 2 seconds
72
72
/* * How long to wait (in microseconds) before downloading a transaction from an additional peer */
73
- static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ;
73
+ static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ; // 1 minute
74
74
/* * Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
75
- static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ;
75
+ static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ; // 2 seconds
76
+ /* * How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
77
+ static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
76
78
static_assert (INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
77
79
" To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY" );
78
80
/* * Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
@@ -343,8 +345,11 @@ struct CNodeState {
343
345
// ! Store all the transactions a peer has recently announced
344
346
std::set<uint256> m_tx_announced;
345
347
346
- // ! Store transactions which were requested by us
347
- std::set<uint256> m_tx_in_flight;
348
+ // ! Store transactions which were requested by us, with timestamp
349
+ std::map<uint256, int64_t > m_tx_in_flight;
350
+
351
+ // ! Periodically check for stuck getdata requests
352
+ int64_t m_check_expiry_timer{0 };
348
353
};
349
354
350
355
TxDownloadState m_tx_download;
@@ -702,30 +707,40 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
702
707
}
703
708
}
704
709
705
-
706
- void RequestTx (CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
710
+ int64_t CalculateTxGetDataTime (const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
707
711
{
708
- CNodeState::TxDownloadState& peer_download_state = state->m_tx_download ;
709
- if (peer_download_state.m_tx_announced .size () >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced .count (txid)) {
710
- // Too many queued announcements from this peer, or we already have
711
- // this announcement
712
- return ;
713
- }
714
- peer_download_state.m_tx_announced .insert (txid);
715
-
716
712
int64_t process_time;
717
713
int64_t last_request_time = GetTxRequestTime (txid);
718
714
// First time requesting this tx
719
715
if (last_request_time == 0 ) {
720
- process_time = nNow ;
716
+ process_time = current_time ;
721
717
} else {
722
718
// Randomize the delay to avoid biasing some peers over others (such as due to
723
719
// fixed ordering of peer processing in ThreadMessageHandler)
724
720
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand (MAX_GETDATA_RANDOM_DELAY);
725
721
}
726
722
727
- // We delay processing announcements from non-preferred (eg inbound) peers
728
- if (!state->fPreferredDownload ) process_time += INBOUND_PEER_TX_DELAY;
723
+ // We delay processing announcements from inbound peers
724
+ if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
725
+
726
+ return process_time;
727
+ }
728
+
729
+ void RequestTx (CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
730
+ {
731
+ CNodeState::TxDownloadState& peer_download_state = state->m_tx_download ;
732
+ if (peer_download_state.m_tx_announced .size () >= MAX_PEER_TX_ANNOUNCEMENTS ||
733
+ peer_download_state.m_tx_process_time .size () >= MAX_PEER_TX_ANNOUNCEMENTS ||
734
+ peer_download_state.m_tx_announced .count (txid)) {
735
+ // Too many queued announcements from this peer, or we already have
736
+ // this announcement
737
+ return ;
738
+ }
739
+ peer_download_state.m_tx_announced .insert (txid);
740
+
741
+ // Calculate the time to try requesting this transaction. Use
742
+ // fPreferredDownload as a proxy for outbound peers.
743
+ int64_t process_time = CalculateTxGetDataTime (txid, nNow, !state->fPreferredDownload );
729
744
730
745
peer_download_state.m_tx_process_time .emplace (process_time, txid);
731
746
}
@@ -1544,12 +1559,19 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
1544
1559
1545
1560
if (!vNotFound.empty ()) {
1546
1561
// Let the peer know that we didn't find what it asked for, so it doesn't
1547
- // have to wait around forever. Currently only SPV clients actually care
1548
- // about this message: it's needed when they are recursively walking the
1549
- // dependencies of relevant unconfirmed transactions. SPV clients want to
1550
- // do that because they want to know about (and store and rebroadcast and
1551
- // risk analyze) the dependencies of transactions relevant to them, without
1552
- // having to download the entire memory pool.
1562
+ // have to wait around forever.
1563
+ // SPV clients care about this message: it's needed when they are
1564
+ // recursively walking the dependencies of relevant unconfirmed
1565
+ // transactions. SPV clients want to do that because they want to know
1566
+ // about (and store and rebroadcast and risk analyze) the dependencies
1567
+ // of transactions relevant to them, without having to download the
1568
+ // entire memory pool.
1569
+ // Also, other nodes can use these messages to automatically request a
1570
+ // transaction from some other peer that annnounced it, and stop
1571
+ // waiting for us to respond.
1572
+ // In normal operation, we often send NOTFOUND messages for parents of
1573
+ // transactions that we relay; if a peer is missing a parent, they may
1574
+ // assume we have them and request the parents from us.
1553
1575
connman->PushMessage (pfrom, msgMaker.Make (NetMsgType::NOTFOUND, vNotFound));
1554
1576
}
1555
1577
}
@@ -3146,8 +3168,27 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
3146
3168
}
3147
3169
3148
3170
if (strCommand == NetMsgType::NOTFOUND) {
3149
- // We do not care about the NOTFOUND message, but logging an Unknown Command
3150
- // message would be undesirable as we transmit it ourselves.
3171
+ // Remove the NOTFOUND transactions from the peer
3172
+ LOCK (cs_main);
3173
+ CNodeState *state = State (pfrom->GetId ());
3174
+ std::vector<CInv> vInv;
3175
+ vRecv >> vInv;
3176
+ if (vInv.size () <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
3177
+ for (CInv &inv : vInv) {
3178
+ if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) {
3179
+ // If we receive a NOTFOUND message for a txid we requested, erase
3180
+ // it from our data structures for this peer.
3181
+ auto in_flight_it = state->m_tx_download .m_tx_in_flight .find (inv.hash );
3182
+ if (in_flight_it == state->m_tx_download .m_tx_in_flight .end ()) {
3183
+ // Skip any further work if this is a spurious NOTFOUND
3184
+ // message.
3185
+ continue ;
3186
+ }
3187
+ state->m_tx_download .m_tx_in_flight .erase (in_flight_it);
3188
+ state->m_tx_download .m_tx_announced .erase (inv.hash );
3189
+ }
3190
+ }
3191
+ }
3151
3192
return true ;
3152
3193
}
3153
3194
@@ -3945,9 +3986,33 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
3945
3986
//
3946
3987
// Message: getdata (non-blocks)
3947
3988
//
3989
+
3990
+ // For robustness, expire old requests after a long timeout, so that
3991
+ // we can resume downloading transactions from a peer even if they
3992
+ // were unresponsive in the past.
3993
+ // Eventually we should consider disconnecting peers, but this is
3994
+ // conservative.
3995
+ if (state.m_tx_download .m_check_expiry_timer <= nNow) {
3996
+ for (auto it=state.m_tx_download .m_tx_in_flight .begin (); it != state.m_tx_download .m_tx_in_flight .end ();) {
3997
+ if (it->second <= nNow - TX_EXPIRY_INTERVAL) {
3998
+ LogPrint (BCLog::NET, " timeout of inflight tx %s from peer=%d\n " , it->first .ToString (), pto->GetId ());
3999
+ state.m_tx_download .m_tx_announced .erase (it->first );
4000
+ state.m_tx_download .m_tx_in_flight .erase (it++);
4001
+ } else {
4002
+ ++it;
4003
+ }
4004
+ }
4005
+ // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
4006
+ // so that we're not doing this for all peers at the same time.
4007
+ state.m_tx_download .m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand (TX_EXPIRY_INTERVAL);
4008
+ }
4009
+
3948
4010
auto & tx_process_time = state.m_tx_download .m_tx_process_time ;
3949
4011
while (!tx_process_time.empty () && tx_process_time.begin ()->first <= nNow && state.m_tx_download .m_tx_in_flight .size () < MAX_PEER_TX_IN_FLIGHT) {
3950
- const uint256& txid = tx_process_time.begin ()->second ;
4012
+ const uint256 txid = tx_process_time.begin ()->second ;
4013
+ // Erase this entry from tx_process_time (it may be added back for
4014
+ // processing at a later time, see below)
4015
+ tx_process_time.erase (tx_process_time.begin ());
3951
4016
CInv inv (MSG_TX | GetFetchFlags (pto), txid);
3952
4017
if (!AlreadyHave (inv)) {
3953
4018
// If this transaction was last requested more than 1 minute ago,
@@ -3961,20 +4026,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
3961
4026
vGetData.clear ();
3962
4027
}
3963
4028
UpdateTxRequestTime (inv.hash , nNow);
3964
- state.m_tx_download .m_tx_in_flight .insert (inv.hash );
4029
+ state.m_tx_download .m_tx_in_flight .emplace (inv.hash , nNow );
3965
4030
} else {
3966
4031
// This transaction is in flight from someone else; queue
3967
4032
// up processing to happen after the download times out
3968
4033
// (with a slight delay for inbound peers, to prefer
3969
4034
// requests to outbound peers).
3970
- RequestTx (&state, txid, nNow);
4035
+ int64_t next_process_time = CalculateTxGetDataTime (txid, nNow, !state.fPreferredDownload );
4036
+ tx_process_time.emplace (next_process_time, txid);
3971
4037
}
3972
4038
} else {
3973
4039
// We have already seen this transaction, no need to download.
3974
4040
state.m_tx_download .m_tx_announced .erase (inv.hash );
3975
4041
state.m_tx_download .m_tx_in_flight .erase (inv.hash );
3976
4042
}
3977
- tx_process_time.erase (tx_process_time.begin ());
3978
4043
}
3979
4044
3980
4045
0 commit comments