diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 3e050320e90..c0739789ee7 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -816,6 +816,10 @@ func (f *TxFetcher) loop() { if len(f.announced[hash]) == 0 { delete(f.announced, hash) } + delete(f.alternates[hash], drop.peer) + if len(f.alternates[hash]) == 0 { + delete(f.alternates, hash) + } } delete(f.announces, drop.peer) } @@ -879,7 +883,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { // This method is a bit "flaky" "by design". In theory the timeout timer only ever // should be rescheduled if some request is pending. In practice, a timeout will // cause the timer to be rescheduled every 5 secs (until the peer comes through or -// disconnects). This is a limitation of the fetcher code because we don't trac +// disconnects). This is a limitation of the fetcher code because we don't track // pending requests and timed out requests separately. Without double tracking, if // we simply didn't reschedule the timer on all-timeout then the timer would never // be set again since len(request) > 0 => something's running. diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0f05a1c995c..bb41f629320 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -1858,6 +1858,56 @@ func TestBlobTransactionAnnounce(t *testing.T) { }) } +func TestTransactionFetcherDropAlternates(t *testing.T) { + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + return NewTxFetcher( + func(common.Hash) bool { return false }, + func(txs []*types.Transaction) []error { + return make([]error, len(txs)) + }, + func(string, []common.Hash) error { return nil }, + nil, + ) + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + doWait{time: txArriveTimeout, step: true}, + doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + + isScheduled{ + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + "B": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {testTxsHashes[0]}, + }, + }, + doDrop("B"), + + isScheduled{ + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {testTxsHashes[0]}, + }, + }, + doDrop("A"), + isScheduled{ + tracking: nil, fetching: nil, + }, + }, + }) +} + func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) { t.Parallel() testTransactionFetcher(t, tt)