diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 3e050320e90..2622412eb62 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -18,7 +18,6 @@ package fetcher import ( "errors" - "fmt" "math" mrand "math/rand" "sort" @@ -104,7 +103,6 @@ var ( txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil) txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil) txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil) - txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil) txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil) txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil) ) @@ -187,17 +185,15 @@ type TxFetcher struct { waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection) - // Stage 2: Queue of transactions that waiting to be allocated to some peer - // to be retrieved directly. + // Stage 2: Transactions that are either waiting to be allocated + // to a peer or are already being fetched. announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the // previous stage to avoid having to duplicate (need it for DoS checks). - fetching map[common.Hash]string // Transaction set currently being retrieved - requests map[string]*txRequest // In-flight transaction retrievals - alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails + fetching map[common.Hash]string // Transaction set currently being retrieved + requests map[string]*txRequest // In-flight transaction retrievals // Callbacks hasTx func(common.Hash) bool // Retrieves a tx from the local txpool @@ -231,10 +227,8 @@ func NewTxFetcherForTests( waittime: make(map[common.Hash]mclock.AbsTime), waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), hasTx: hasTx, addTxs: addTxs, @@ -457,33 +451,9 @@ func (f *TxFetcher) loop() { } ) for i, hash := range ann.hashes { - // If the transaction is already downloading, add it to the list - // of possible alternates (in case the current retrieval fails) and - // also account it for the peer. - if f.alternates[hash] != nil { - f.alternates[hash][ann.origin] = struct{}{} - - // Stage 2 and 3 share the set of origins per tx - if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = &txMetadataWithSeq{ - txMetadata: ann.metas[i], - seq: nextSeq(), - } - } else { - f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ - hash: { - txMetadata: ann.metas[i], - seq: nextSeq(), - }, - } - } - continue - } - // If the transaction is not downloading, but is already queued - // from a different peer, track it for the new peer too. - if f.announced[hash] != nil { - f.announced[hash][ann.origin] = struct{}{} - + // If the transaction is already downloading or queued from a different peer, + // track it for the new peer + if f.announced(hash) { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { announces[hash] = &txMetadataWithSeq{ @@ -564,15 +534,14 @@ func (f *TxFetcher) loop() { case <-waitTrigger: // At least one transaction's waiting time ran out, push all expired - // ones into the retrieval queues + // ones into the announces actives := make(map[string]struct{}) for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { // Transaction expired without propagation, schedule for retrieval - if f.announced[hash] != nil { - panic("announce tracker already contains waitlist item") + if f.announced(hash) { + panic("announced tracker already contains waitlist item") } - f.announced[hash] = f.waitlist[hash] for peer := range f.waitlist[hash] { if announces := f.announces[peer]; announces != nil { announces[hash] = f.waitslots[peer][hash] @@ -616,18 +585,7 @@ func (f *TxFetcher) loop() { } } // Move the delivery back from fetching to queued - if _, ok := f.announced[hash]; ok { - panic("announced tracker already contains alternate item") - } - if f.alternates[hash] != nil { // nil if tx was broadcast during fetch - f.announced[hash] = f.alternates[hash] - } - delete(f.announced[hash], peer) - if len(f.announced[hash]) == 0 { - delete(f.announced, hash) - } delete(f.announces[peer], hash) - delete(f.alternates, hash) delete(f.fetching, hash) } if len(f.announces[peer]) == 0 { @@ -699,8 +657,6 @@ func (f *TxFetcher) loop() { delete(f.announces, peer) } } - delete(f.announced, hash) - delete(f.alternates, hash) // If a transaction currently being fetched from a different // origin was delivered (delivery stolen), mark it so the @@ -752,20 +708,12 @@ func (f *TxFetcher) loop() { } if _, ok := delivered[hash]; !ok { if i < cutoff { - delete(f.alternates[hash], delivery.origin) delete(f.announces[delivery.origin], hash) if len(f.announces[delivery.origin]) == 0 { delete(f.announces, delivery.origin) } } - if len(f.alternates[hash]) > 0 { - if _, ok := f.announced[hash]; ok { - panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash])) - } - f.announced[hash] = f.alternates[hash] - } } - delete(f.alternates, hash) delete(f.fetching, hash) } // Something was delivered, try to reschedule requests @@ -797,28 +745,13 @@ func (f *TxFetcher) loop() { continue } } - // Undelivered hash, reschedule if there's an alternative origin available - delete(f.alternates[hash], drop.peer) - if len(f.alternates[hash]) == 0 { - delete(f.alternates, hash) - } else { - f.announced[hash] = f.alternates[hash] - delete(f.alternates, hash) - } delete(f.fetching, hash) } delete(f.requests, drop.peer) } // Clean up general announcement tracking - if _, ok := f.announces[drop.peer]; ok { - for hash := range f.announces[drop.peer] { - delete(f.announced[hash], drop.peer) - if len(f.announced[hash]) == 0 { - delete(f.announced, hash) - } - } - delete(f.announces, drop.peer) - } + delete(f.announces, drop.peer) + // If a request was cancelled, check if anything needs to be rescheduled if request != nil { f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) @@ -832,7 +765,6 @@ func (f *TxFetcher) loop() { txFetcherWaitingPeers.Update(int64(len(f.waitslots))) txFetcherWaitingHashes.Update(int64(len(f.waitlist))) txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests))) - txFetcherQueueingHashes.Update(int64(len(f.announced))) txFetcherFetchingPeers.Update(int64(len(f.requests))) txFetcherFetchingHashes.Update(int64(len(f.fetching))) @@ -942,12 +874,6 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // Mark the hash as fetching and stash away possible alternates f.fetching[hash] = peer - if _, ok := f.alternates[hash]; ok { - panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash])) - } - f.alternates[hash] = f.announced[hash] - delete(f.announced, hash) - // Accumulate the hash and stop if the limit was reached hashes = append(hashes, hash) if len(hashes) >= maxTxRetrievals { @@ -1034,3 +960,12 @@ func rotateStrings(slice []string, n int) { slice[i] = orig[(i+n)%len(orig)] } } + +func (f *TxFetcher) announced(hash common.Hash) bool { + for _, hashes := range f.announces { + if hashes[hash] != nil { + return true + } + } + return false +} diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0f05a1c995c..abea46302fd 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -459,6 +459,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { {common.Hash{0x06}, types.LegacyTxType, 666}, }, }), + // Step 14 isScheduled{ tracking: map[string][]announce{ "A": { @@ -2017,6 +2018,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: peer %s extra in announces", i, peer) } } + // Check that all announces required to be fetching are in the // appropriate sets for peer, hashes := range step.fetching { @@ -2062,31 +2064,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: hash %x extra in fetching", i, hash) } } - for _, hashes := range step.fetching { - for _, hash := range hashes { - alternates := fetcher.alternates[hash] - if alternates == nil { - t.Errorf("step %d: hash %x missing from alternates", i, hash) - continue - } - for peer := range alternates { - if _, ok := fetcher.announces[peer]; !ok { - t.Errorf("step %d: peer %s extra in alternates", i, peer) - continue - } - if _, ok := fetcher.announces[peer][hash]; !ok { - t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer) - continue - } - } - for p := range fetcher.announced[hash] { - if _, ok := alternates[p]; !ok { - t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p) - continue - } - } - } - } for peer, hashes := range step.dangling { request := fetcher.requests[peer] if request == nil { @@ -2104,34 +2081,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } } } - // Check that all transaction announces that are scheduled for - // retrieval but not actively being downloaded are tracked only - // in the stage 2 `announced` map. - var queued []common.Hash - for _, announces := range step.tracking { - for _, ann := range announces { - var found bool - for _, hs := range step.fetching { - if slices.Contains(hs, ann.hash) { - found = true - break - } - } - if !found { - queued = append(queued, ann.hash) - } - } - } - for _, hash := range queued { - if _, ok := fetcher.announced[hash]; !ok { - t.Errorf("step %d: hash %x missing from announced", i, hash) - } - } - for hash := range fetcher.announced { - if !slices.Contains(queued, hash) { - t.Errorf("step %d: hash %x extra in announced", i, hash) - } - } case isUnderpriced: if fetcher.underpriced.Len() != int(step) { @@ -2144,7 +2093,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { // After every step, cross validate the internal uniqueness invariants // between stage one and stage two. for hash := range fetcher.waittime { - if _, ok := fetcher.announced[hash]; ok { + if fetcher.announced(hash) { t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash) } }