@@ -64,6 +64,21 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60;
64
64
// / Age after which a block is considered historical for purposes of rate
65
65
// / limiting block relay. Set to one week, denominated in seconds.
66
66
static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60 ;
67
+ /* * Maximum number of in-flight transactions from a peer */
68
+ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100 ;
69
+ /* * Maximum number of announced transactions from a peer */
70
+ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
71
+ /* * How many microseconds to delay requesting transactions from inbound peers */
72
+ static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ;
73
+ /* * How long to wait (in microseconds) before downloading a transaction from an additional peer */
74
+ static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ;
75
+ /* * Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
76
+ static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ;
77
+ static_assert (INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
78
+ " To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY" );
79
+ /* * Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
80
+ static const unsigned int MAX_GETDATA_SZ = 1000 ;
81
+
67
82
68
83
struct COrphanTx {
69
84
// When modifying, adapt the copy of this definition in tests/DoS_tests.
@@ -274,6 +289,66 @@ struct CNodeState {
274
289
// ! Time of last new block announcement
275
290
int64_t m_last_block_announcement;
276
291
292
+ /*
293
+ * State associated with transaction download.
294
+ *
295
+ * Tx download algorithm:
296
+ *
297
+ * When inv comes in, queue up (process_time, txid) inside the peer's
298
+ * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
299
+ * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
300
+ *
301
+ * The process_time for a transaction is set to nNow for outbound peers,
302
+ * nNow + 2 seconds for inbound peers. This is the time at which we'll
303
+ * consider trying to request the transaction from the peer in
304
+ * SendMessages(). The delay for inbound peers is to allow outbound peers
305
+ * a chance to announce before we request from inbound peers, to prevent
306
+ * an adversary from using inbound connections to blind us to a
307
+ * transaction (InvBlock).
308
+ *
309
+ * When we call SendMessages() for a given peer,
310
+ * we will loop over the transactions in m_tx_process_time, looking
311
+ * at the transactions whose process_time <= nNow. We'll request each
312
+ * such transaction that we don't have already and that hasn't been
313
+ * requested from another peer recently, up until we hit the
314
+ * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
315
+ * g_already_asked_for for each requested txid, storing the time of the
316
+ * GETDATA request. We use g_already_asked_for to coordinate transaction
317
+ * requests amongst our peers.
318
+ *
319
+ * For transactions that we still need but we have already recently
320
+ * requested from some other peer, we'll reinsert (process_time, txid)
321
+ * back into the peer's m_tx_process_time at the point in the future at
322
+ * which the most recent GETDATA request would time out (ie
323
+ * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
324
+ * We add an additional delay for inbound peers, again to prefer
325
+ * attempting download from outbound peers first.
326
+ * We also add an extra small random delay up to 2 seconds
327
+ * to avoid biasing some peers over others. (e.g., due to fixed ordering
328
+ * of peer processing in ThreadMessageHandler).
329
+ *
330
+ * When we receive a transaction from a peer, we remove the txid from the
331
+ * peer's m_tx_in_flight set and from their recently announced set
332
+ * (m_tx_announced). We also clear g_already_asked_for for that entry, so
333
+ * that if somehow the transaction is not accepted but also not added to
334
+ * the reject filter, then we will eventually redownload from other
335
+ * peers.
336
+ */
337
+ struct TxDownloadState {
338
+ /* Track when to attempt download of announced transactions (process
339
+ * time in micros -> txid)
340
+ */
341
+ std::multimap<int64_t , uint256> m_tx_process_time;
342
+
343
+ // ! Store all the transactions a peer has recently announced
344
+ std::set<uint256> m_tx_announced;
345
+
346
+ // ! Store transactions which were requested by us
347
+ std::set<uint256> m_tx_in_flight;
348
+ };
349
+
350
+ TxDownloadState m_tx_download;
351
+
277
352
CNodeState (CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
278
353
fCurrentlyConnected = false ;
279
354
nMisbehavior = 0 ;
@@ -301,6 +376,9 @@ struct CNodeState {
301
376
}
302
377
};
303
378
379
+ // Keeps track of the time (in microseconds) when transactions were requested last time
380
+ limitedmap<uint256, int64_t > g_already_asked_for GUARDED_BY (cs_main)(MAX_INV_SZ);
381
+
304
382
/* * Map maintaining per-node state. */
305
383
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY (cs_main);
306
384
@@ -591,6 +669,58 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
591
669
}
592
670
}
593
671
672
+ void EraseTxRequest (const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
673
+ {
674
+ g_already_asked_for.erase (txid);
675
+ }
676
+
677
+ int64_t GetTxRequestTime (const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
678
+ {
679
+ auto it = g_already_asked_for.find (txid);
680
+ if (it != g_already_asked_for.end ()) {
681
+ return it->second ;
682
+ }
683
+ return 0 ;
684
+ }
685
+
686
+ void UpdateTxRequestTime (const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
687
+ {
688
+ auto it = g_already_asked_for.find (txid);
689
+ if (it == g_already_asked_for.end ()) {
690
+ g_already_asked_for.insert (std::make_pair (txid, request_time));
691
+ } else {
692
+ g_already_asked_for.update (it, request_time);
693
+ }
694
+ }
695
+
696
+
697
+ void RequestTx (CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
698
+ {
699
+ CNodeState::TxDownloadState& peer_download_state = state->m_tx_download ;
700
+ if (peer_download_state.m_tx_announced .size () >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced .count (txid)) {
701
+ // Too many queued announcements from this peer, or we already have
702
+ // this announcement
703
+ return ;
704
+ }
705
+ peer_download_state.m_tx_announced .insert (txid);
706
+
707
+ int64_t process_time;
708
+ int64_t last_request_time = GetTxRequestTime (txid);
709
+ // First time requesting this tx
710
+ if (last_request_time == 0 ) {
711
+ process_time = nNow;
712
+ } else {
713
+ // Randomize the delay to avoid biasing some peers over others (such as due to
714
+ // fixed ordering of peer processing in ThreadMessageHandler)
715
+ process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand (MAX_GETDATA_RANDOM_DELAY);
716
+ }
717
+
718
+ // We delay processing announcements from non-preferred (eg inbound) peers
719
+ if (!state->fPreferredDownload ) process_time += INBOUND_PEER_TX_DELAY;
720
+
721
+ peer_download_state.m_tx_process_time .emplace (process_time, txid);
722
+ }
723
+
594
724
} // namespace
595
725
596
726
// This function is used for testing the stale tip eviction logic, see
@@ -1945,6 +2075,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
1945
2075
LOCK (cs_main);
1946
2076
1947
2077
uint32_t nFetchFlags = GetFetchFlags (pfrom);
2078
+ int64_t nNow = GetTimeMicros ();
1948
2079
1949
2080
for (CInv &inv : vInv)
1950
2081
{
@@ -1976,7 +2107,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
1976
2107
if (fBlocksOnly ) {
1977
2108
LogPrint (BCLog::NET, " transaction (%s) inv sent in violation of protocol peer=%d\n " , inv.hash .ToString (), pfrom->GetId ());
1978
2109
} else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload ()) {
1979
- pfrom->AskFor ( inv);
2110
+ RequestTx ( State ( pfrom->GetId ()), inv. hash , nNow );
1980
2111
}
1981
2112
}
1982
2113
}
@@ -2211,8 +2342,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
2211
2342
bool fMissingInputs = false ;
2212
2343
CValidationState state;
2213
2344
2214
- pfrom->setAskFor .erase (inv.hash );
2215
- mapAlreadyAskedFor.erase (inv.hash );
2345
+ CNodeState* nodestate = State (pfrom->GetId ());
2346
+ nodestate->m_tx_download .m_tx_announced .erase (inv.hash );
2347
+ nodestate->m_tx_download .m_tx_in_flight .erase (inv.hash );
2348
+ EraseTxRequest (inv.hash );
2216
2349
2217
2350
std::list<CTransactionRef> lRemovedTxn;
2218
2351
@@ -2303,10 +2436,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
2303
2436
}
2304
2437
if (!fRejectedParents ) {
2305
2438
uint32_t nFetchFlags = GetFetchFlags (pfrom);
2439
+ int64_t nNow = GetTimeMicros ();
2440
+
2306
2441
for (const CTxIn& txin : tx.vin ) {
2307
2442
CInv _inv (MSG_TX | nFetchFlags, txin.prevout .hash );
2308
2443
pfrom->AddInventoryKnown (_inv);
2309
- if (!AlreadyHave (_inv)) pfrom->AskFor ( _inv);
2444
+ if (!AlreadyHave (_inv)) RequestTx ( State ( pfrom->GetId ()), _inv. hash , nNow );
2310
2445
}
2311
2446
AddOrphanTx (ptx, pfrom->GetId ());
2312
2447
@@ -3731,24 +3866,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
3731
3866
//
3732
3867
// Message: getdata (non-blocks)
3733
3868
//
3734
- while (!pto->mapAskFor .empty () && (*pto->mapAskFor .begin ()).first <= nNow)
3735
- {
3736
- const CInv& inv = (*pto->mapAskFor .begin ()).second ;
3737
- if (!AlreadyHave (inv))
3738
- {
3739
- LogPrint (BCLog::NET, " Requesting %s peer=%d\n " , inv.ToString (), pto->GetId ());
3740
- vGetData.push_back (inv);
3741
- if (vGetData.size () >= 1000 )
3742
- {
3743
- connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
3744
- vGetData.clear ();
3869
+ auto & tx_process_time = state.m_tx_download .m_tx_process_time ;
3870
+ while (!tx_process_time.empty () && tx_process_time.begin ()->first <= nNow && state.m_tx_download .m_tx_in_flight .size () < MAX_PEER_TX_IN_FLIGHT) {
3871
+ const uint256& txid = tx_process_time.begin ()->second ;
3872
+ CInv inv (MSG_TX | GetFetchFlags (pto), txid);
3873
+ if (!AlreadyHave (inv)) {
3874
+ // If this transaction was last requested more than 1 minute ago,
3875
+ // then request.
3876
+ int64_t last_request_time = GetTxRequestTime (inv.hash );
3877
+ if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
3878
+ LogPrint (BCLog::NET, " Requesting %s peer=%d\n " , inv.ToString (), pto->GetId ());
3879
+ vGetData.push_back (inv);
3880
+ if (vGetData.size () >= MAX_GETDATA_SZ) {
3881
+ connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
3882
+ vGetData.clear ();
3883
+ }
3884
+ UpdateTxRequestTime (inv.hash , nNow);
3885
+ state.m_tx_download .m_tx_in_flight .insert (inv.hash );
3886
+ } else {
3887
+ // This transaction is in flight from someone else; queue
3888
+ // up processing to happen after the download times out
3889
+ // (with a slight delay for inbound peers, to prefer
3890
+ // requests to outbound peers).
3891
+ RequestTx (&state, txid, nNow);
3745
3892
}
3746
3893
} else {
3747
- // If we're not going to ask, don't expect a response.
3748
- pto->setAskFor .erase (inv.hash );
3894
+ // We have already seen this transaction, no need to download.
3895
+ state.m_tx_download .m_tx_announced .erase (inv.hash );
3896
+ state.m_tx_download .m_tx_in_flight .erase (inv.hash );
3749
3897
}
3750
- pto-> mapAskFor .erase (pto-> mapAskFor .begin ());
3898
+ tx_process_time .erase (tx_process_time .begin ());
3751
3899
}
3900
+
3901
+
3752
3902
if (!vGetData.empty ())
3753
3903
connman->PushMessage (pto, msgMaker.Make (NetMsgType::GETDATA, vGetData));
3754
3904
0 commit comments