Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 21 additions & 86 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fetcher

import (
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
Expand Down Expand Up @@ -104,7 +103,6 @@ var (
txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
)
Expand Down Expand Up @@ -187,17 +185,15 @@ type TxFetcher struct {
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
// Stage 2: Transactions that are either waiting to be allocated
// to a peer or are already being fetched.
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
// previous stage to avoid having to duplicate (need it for DoS checks).
fetching map[common.Hash]string // Transaction set currently being retrieved
requests map[string]*txRequest // In-flight transaction retrievals
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
fetching map[common.Hash]string // Transaction set currently being retrieved
requests map[string]*txRequest // In-flight transaction retrievals

// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
Expand Down Expand Up @@ -231,10 +227,8 @@ func NewTxFetcherForTests(
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
hasTx: hasTx,
addTxs: addTxs,
Expand Down Expand Up @@ -457,33 +451,9 @@ func (f *TxFetcher) loop() {
}
)
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
if f.alternates[hash] != nil {
f.alternates[hash][ann.origin] = struct{}{}

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
// If the transaction is not downloading, but is already queued
// from a different peer, track it for the new peer too.
if f.announced[hash] != nil {
f.announced[hash][ann.origin] = struct{}{}

// If the transaction is already downloading or queued from a different peer,
// track it for the new peer
if f.announced(hash) {
// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &txMetadataWithSeq{
Expand Down Expand Up @@ -564,15 +534,14 @@ func (f *TxFetcher) loop() {

case <-waitTrigger:
// At least one transaction's waiting time ran out, push all expired
// ones into the retrieval queues
// ones into the announces
actives := make(map[string]struct{})
for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
// Transaction expired without propagation, schedule for retrieval
if f.announced[hash] != nil {
panic("announce tracker already contains waitlist item")
if f.announced(hash) {
panic("announced tracker already contains waitlist item")
}
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
Expand Down Expand Up @@ -616,18 +585,7 @@ func (f *TxFetcher) loop() {
}
}
// Move the delivery back from fetching to queued
if _, ok := f.announced[hash]; ok {
panic("announced tracker already contains alternate item")
}
if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
f.announced[hash] = f.alternates[hash]
}
delete(f.announced[hash], peer)
if len(f.announced[hash]) == 0 {
delete(f.announced, hash)
}
delete(f.announces[peer], hash)
delete(f.alternates, hash)
delete(f.fetching, hash)
}
if len(f.announces[peer]) == 0 {
Expand Down Expand Up @@ -699,8 +657,6 @@ func (f *TxFetcher) loop() {
delete(f.announces, peer)
}
}
delete(f.announced, hash)
delete(f.alternates, hash)

// If a transaction currently being fetched from a different
// origin was delivered (delivery stolen), mark it so the
Expand Down Expand Up @@ -752,20 +708,12 @@ func (f *TxFetcher) loop() {
}
if _, ok := delivered[hash]; !ok {
if i < cutoff {
delete(f.alternates[hash], delivery.origin)
delete(f.announces[delivery.origin], hash)
if len(f.announces[delivery.origin]) == 0 {
delete(f.announces, delivery.origin)
}
}
if len(f.alternates[hash]) > 0 {
if _, ok := f.announced[hash]; ok {
panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
}
f.announced[hash] = f.alternates[hash]
}
}
delete(f.alternates, hash)
delete(f.fetching, hash)
}
// Something was delivered, try to reschedule requests
Expand Down Expand Up @@ -797,28 +745,13 @@ func (f *TxFetcher) loop() {
continue
}
}
// Undelivered hash, reschedule if there's an alternative origin available
delete(f.alternates[hash], drop.peer)
if len(f.alternates[hash]) == 0 {
delete(f.alternates, hash)
} else {
f.announced[hash] = f.alternates[hash]
delete(f.alternates, hash)
}
delete(f.fetching, hash)
}
delete(f.requests, drop.peer)
}
// Clean up general announcement tracking
if _, ok := f.announces[drop.peer]; ok {
for hash := range f.announces[drop.peer] {
delete(f.announced[hash], drop.peer)
if len(f.announced[hash]) == 0 {
delete(f.announced, hash)
}
}
delete(f.announces, drop.peer)
}
delete(f.announces, drop.peer)

// If a request was cancelled, check if anything needs to be rescheduled
if request != nil {
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
Expand All @@ -832,7 +765,6 @@ func (f *TxFetcher) loop() {
txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
txFetcherQueueingHashes.Update(int64(len(f.announced)))
txFetcherFetchingPeers.Update(int64(len(f.requests)))
txFetcherFetchingHashes.Update(int64(len(f.fetching)))

Expand Down Expand Up @@ -942,12 +874,6 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
Expand Down Expand Up @@ -1034,3 +960,12 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

func (f *TxFetcher) announced(hash common.Hash) bool {
for _, hashes := range f.announces {
if hashes[hash] != nil {
return true
}
}
return false
}
57 changes: 3 additions & 54 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) {
{common.Hash{0x06}, types.LegacyTxType, 666},
},
}),
// Step 14
isScheduled{
tracking: map[string][]announce{
"A": {
Expand Down Expand Up @@ -2017,6 +2018,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
t.Errorf("step %d: peer %s extra in announces", i, peer)
}
}

// Check that all announces required to be fetching are in the
// appropriate sets
for peer, hashes := range step.fetching {
Expand Down Expand Up @@ -2062,31 +2064,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
t.Errorf("step %d: hash %x extra in fetching", i, hash)
}
}
for _, hashes := range step.fetching {
for _, hash := range hashes {
alternates := fetcher.alternates[hash]
if alternates == nil {
t.Errorf("step %d: hash %x missing from alternates", i, hash)
continue
}
for peer := range alternates {
if _, ok := fetcher.announces[peer]; !ok {
t.Errorf("step %d: peer %s extra in alternates", i, peer)
continue
}
if _, ok := fetcher.announces[peer][hash]; !ok {
t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer)
continue
}
}
for p := range fetcher.announced[hash] {
if _, ok := alternates[p]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p)
continue
}
}
}
}
for peer, hashes := range step.dangling {
request := fetcher.requests[peer]
if request == nil {
Expand All @@ -2104,34 +2081,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
}
}
// Check that all transaction announces that are scheduled for
// retrieval but not actively being downloaded are tracked only
// in the stage 2 `announced` map.
var queued []common.Hash
for _, announces := range step.tracking {
for _, ann := range announces {
var found bool
for _, hs := range step.fetching {
if slices.Contains(hs, ann.hash) {
found = true
break
}
}
if !found {
queued = append(queued, ann.hash)
}
}
}
for _, hash := range queued {
if _, ok := fetcher.announced[hash]; !ok {
t.Errorf("step %d: hash %x missing from announced", i, hash)
}
}
for hash := range fetcher.announced {
if !slices.Contains(queued, hash) {
t.Errorf("step %d: hash %x extra in announced", i, hash)
}
}

case isUnderpriced:
if fetcher.underpriced.Len() != int(step) {
Expand All @@ -2144,7 +2093,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
// After every step, cross validate the internal uniqueness invariants
// between stage one and stage two.
for hash := range fetcher.waittime {
if _, ok := fetcher.announced[hash]; ok {
if fetcher.announced(hash) {
t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash)
}
}
Expand Down