Skip to content

Commit 8a0ad02

Browse files
committed
eth/fetcher: separate hasTx from chainTx
expose the nuances of the "has" check to the fetcher. Signed-off-by: Csaba Kiraly <[email protected]>
1 parent 1691d39 commit 8a0ad02

File tree

4 files changed

+44
-15
lines changed

4 files changed

+44
-15
lines changed

eth/fetcher/tx_fetcher.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ type TxFetcher struct {
170170
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
171171

172172
// Callbacks
173-
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
173+
hasTx func(common.Hash) bool // Checks for tx in the local txpool
174+
chainTx func(common.Hash) bool // Check for tx on chain
174175
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
175176
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
176177
dropPeer func(string) // Drops a peer in case of announcement violation
@@ -183,14 +184,14 @@ type TxFetcher struct {
183184

184185
// NewTxFetcher creates a transaction fetcher to retrieve transaction
185186
// based on hash announcements.
186-
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
187-
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
187+
func NewTxFetcher(hasTx func(common.Hash) bool, chainTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
188+
return NewTxFetcherForTests(hasTx, chainTx, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
188189
}
189190

190191
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
191192
// a simulated version and the internal randomness with a deterministic one.
192193
func NewTxFetcherForTests(
193-
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
194+
hasTx func(common.Hash) bool, chainTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
194195
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
195196
return &TxFetcher{
196197
notify: make(chan *txAnnounce),
@@ -207,6 +208,7 @@ func NewTxFetcherForTests(
207208
alternates: make(map[common.Hash]map[string]struct{}),
208209
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
209210
hasTx: hasTx,
211+
chainTx: chainTx,
210212
addTxs: addTxs,
211213
fetchTxs: fetchTxs,
212214
dropPeer: dropPeer,
@@ -238,6 +240,9 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
238240
switch {
239241
case f.hasTx(hash):
240242
duplicate++
243+
case f.chainTx(hash):
244+
// we might count these per peer separately to detect spammer/stale peers
245+
duplicate++
241246
case f.isKnownUnderpriced(hash):
242247
underpriced++
243248
default:
@@ -313,7 +318,6 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
313318
otherreject int64
314319
)
315320
batch := txs[i:end]
316-
317321
for j, err := range f.addTxs(batch) {
318322
// Track the transaction hash if the price is too low for us.
319323
// Avoid re-request this transaction when we receive another

eth/fetcher/tx_fetcher_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func TestTransactionFetcherWaiting(t *testing.T) {
9393
testTransactionFetcherParallel(t, txFetcherTest{
9494
init: func() *TxFetcher {
9595
return NewTxFetcher(
96+
func(common.Hash) bool { return false },
9697
func(common.Hash) bool { return false },
9798
nil,
9899
func(string, []common.Hash) error { return nil },
@@ -295,6 +296,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) {
295296
testTransactionFetcherParallel(t, txFetcherTest{
296297
init: func() *TxFetcher {
297298
return NewTxFetcher(
299+
func(common.Hash) bool { return false },
298300
func(common.Hash) bool { return false },
299301
nil,
300302
func(string, []common.Hash) error { return nil },
@@ -385,6 +387,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) {
385387
testTransactionFetcherParallel(t, txFetcherTest{
386388
init: func() *TxFetcher {
387389
return NewTxFetcher(
390+
func(common.Hash) bool { return false },
388391
func(common.Hash) bool { return false },
389392
nil,
390393
func(string, []common.Hash) error { return nil },
@@ -490,6 +493,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) {
490493
testTransactionFetcherParallel(t, txFetcherTest{
491494
init: func() *TxFetcher {
492495
return NewTxFetcher(
496+
func(common.Hash) bool { return false },
493497
func(common.Hash) bool { return false },
494498
nil,
495499
func(origin string, hashes []common.Hash) error {
@@ -574,6 +578,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
574578
testTransactionFetcherParallel(t, txFetcherTest{
575579
init: func() *TxFetcher {
576580
return NewTxFetcher(
581+
func(common.Hash) bool { return false },
577582
func(common.Hash) bool { return false },
578583
func(txs []*types.Transaction) []error {
579584
return make([]error, len(txs))
@@ -618,6 +623,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
618623
testTransactionFetcherParallel(t, txFetcherTest{
619624
init: func() *TxFetcher {
620625
return NewTxFetcher(
626+
func(common.Hash) bool { return false },
621627
func(common.Hash) bool { return false },
622628
func(txs []*types.Transaction) []error {
623629
return make([]error, len(txs))
@@ -661,6 +667,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
661667
testTransactionFetcherParallel(t, txFetcherTest{
662668
init: func() *TxFetcher {
663669
return NewTxFetcher(
670+
func(common.Hash) bool { return false },
664671
func(common.Hash) bool { return false },
665672
func(txs []*types.Transaction) []error {
666673
return make([]error, len(txs))
@@ -722,6 +729,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
722729
testTransactionFetcherParallel(t, txFetcherTest{
723730
init: func() *TxFetcher {
724731
return NewTxFetcher(
732+
func(common.Hash) bool { return false },
725733
func(common.Hash) bool { return false },
726734
func(txs []*types.Transaction) []error {
727735
return make([]error, len(txs))
@@ -771,6 +779,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
771779
testTransactionFetcherParallel(t, txFetcherTest{
772780
init: func() *TxFetcher {
773781
return NewTxFetcher(
782+
func(common.Hash) bool { return false },
774783
func(common.Hash) bool { return false },
775784
func(txs []*types.Transaction) []error {
776785
return make([]error, len(txs))
@@ -827,6 +836,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) {
827836
testTransactionFetcherParallel(t, txFetcherTest{
828837
init: func() *TxFetcher {
829838
return NewTxFetcher(
839+
func(common.Hash) bool { return false },
830840
func(common.Hash) bool { return false },
831841
nil,
832842
func(string, []common.Hash) error { return nil },
@@ -897,6 +907,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
897907
testTransactionFetcherParallel(t, txFetcherTest{
898908
init: func() *TxFetcher {
899909
return NewTxFetcher(
910+
func(common.Hash) bool { return false },
900911
func(common.Hash) bool { return false },
901912
func(txs []*types.Transaction) []error {
902913
return make([]error, len(txs))
@@ -975,6 +986,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
975986
testTransactionFetcherParallel(t, txFetcherTest{
976987
init: func() *TxFetcher {
977988
return NewTxFetcher(
989+
func(common.Hash) bool { return false },
978990
func(common.Hash) bool { return false },
979991
nil,
980992
func(string, []common.Hash) error { return nil },
@@ -1053,6 +1065,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
10531065
testTransactionFetcherParallel(t, txFetcherTest{
10541066
init: func() *TxFetcher {
10551067
return NewTxFetcher(
1068+
func(common.Hash) bool { return false },
10561069
func(common.Hash) bool { return false },
10571070
nil,
10581071
func(string, []common.Hash) error { return nil },
@@ -1083,6 +1096,7 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
10831096
testTransactionFetcherParallel(t, txFetcherTest{
10841097
init: func() *TxFetcher {
10851098
return NewTxFetcher(
1099+
func(common.Hash) bool { return false },
10861100
func(common.Hash) bool { return false },
10871101
nil,
10881102
func(string, []common.Hash) error { return nil },
@@ -1200,6 +1214,7 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
12001214
testTransactionFetcherParallel(t, txFetcherTest{
12011215
init: func() *TxFetcher {
12021216
return NewTxFetcher(
1217+
func(common.Hash) bool { return false },
12031218
func(common.Hash) bool { return false },
12041219
nil,
12051220
func(string, []common.Hash) error { return nil },
@@ -1267,6 +1282,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
12671282
testTransactionFetcherParallel(t, txFetcherTest{
12681283
init: func() *TxFetcher {
12691284
return NewTxFetcher(
1285+
func(common.Hash) bool { return false },
12701286
func(common.Hash) bool { return false },
12711287
func(txs []*types.Transaction) []error {
12721288
errs := make([]error, len(txs))
@@ -1368,6 +1384,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
13681384
testTransactionFetcher(t, txFetcherTest{
13691385
init: func() *TxFetcher {
13701386
return NewTxFetcher(
1387+
func(common.Hash) bool { return false },
13711388
func(common.Hash) bool { return false },
13721389
func(txs []*types.Transaction) []error {
13731390
errs := make([]error, len(txs))
@@ -1400,6 +1417,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
14001417
testTransactionFetcherParallel(t, txFetcherTest{
14011418
init: func() *TxFetcher {
14021419
return NewTxFetcher(
1420+
func(common.Hash) bool { return false },
14031421
func(common.Hash) bool { return false },
14041422
func(txs []*types.Transaction) []error {
14051423
return make([]error, len(txs))
@@ -1459,6 +1477,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
14591477
testTransactionFetcherParallel(t, txFetcherTest{
14601478
init: func() *TxFetcher {
14611479
return NewTxFetcher(
1480+
func(common.Hash) bool { return false },
14621481
func(common.Hash) bool { return false },
14631482
func(txs []*types.Transaction) []error {
14641483
return make([]error, len(txs))
@@ -1533,6 +1552,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
15331552
testTransactionFetcherParallel(t, txFetcherTest{
15341553
init: func() *TxFetcher {
15351554
return NewTxFetcher(
1555+
func(common.Hash) bool { return false },
15361556
func(common.Hash) bool { return false },
15371557
func(txs []*types.Transaction) []error {
15381558
return make([]error, len(txs))
@@ -1579,6 +1599,7 @@ func TestInvalidAnnounceMetadata(t *testing.T) {
15791599
testTransactionFetcherParallel(t, txFetcherTest{
15801600
init: func() *TxFetcher {
15811601
return NewTxFetcher(
1602+
func(common.Hash) bool { return false },
15821603
func(common.Hash) bool { return false },
15831604
func(txs []*types.Transaction) []error {
15841605
return make([]error, len(txs))
@@ -1662,6 +1683,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
16621683
testTransactionFetcherParallel(t, txFetcherTest{
16631684
init: func() *TxFetcher {
16641685
return NewTxFetcher(
1686+
func(common.Hash) bool { return false },
16651687
func(common.Hash) bool { return false },
16661688
func(txs []*types.Transaction) []error {
16671689
return make([]error, len(txs))
@@ -1690,6 +1712,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
16901712
testTransactionFetcherParallel(t, txFetcherTest{
16911713
init: func() *TxFetcher {
16921714
return NewTxFetcher(
1715+
func(common.Hash) bool { return false },
16931716
func(common.Hash) bool { return false },
16941717
func(txs []*types.Transaction) []error {
16951718
return make([]error, len(txs))
@@ -1720,6 +1743,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
17201743
testTransactionFetcherParallel(t, txFetcherTest{
17211744
init: func() *TxFetcher {
17221745
return NewTxFetcher(
1746+
func(common.Hash) bool { return false },
17231747
func(common.Hash) bool { return false },
17241748
func(txs []*types.Transaction) []error {
17251749
return make([]error, len(txs))
@@ -1759,6 +1783,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
17591783
testTransactionFetcherParallel(t, txFetcherTest{
17601784
init: func() *TxFetcher {
17611785
return NewTxFetcher(
1786+
func(common.Hash) bool { return false },
17621787
func(common.Hash) bool { return false },
17631788
func(txs []*types.Transaction) []error {
17641789
return make([]error, len(txs))
@@ -1794,6 +1819,7 @@ func TestBlobTransactionAnnounce(t *testing.T) {
17941819
testTransactionFetcherParallel(t, txFetcherTest{
17951820
init: func() *TxFetcher {
17961821
return NewTxFetcher(
1822+
func(common.Hash) bool { return false },
17971823
func(common.Hash) bool { return false },
17981824
nil,
17991825
func(string, []common.Hash) error { return nil },
@@ -1862,6 +1888,7 @@ func TestTransactionFetcherDropAlternates(t *testing.T) {
18621888
testTransactionFetcherParallel(t, txFetcherTest{
18631889
init: func() *TxFetcher {
18641890
return NewTxFetcher(
1891+
func(common.Hash) bool { return false },
18651892
func(common.Hash) bool { return false },
18661893
func(txs []*types.Transaction) []error {
18671894
return make([]error, len(txs))
@@ -2245,6 +2272,7 @@ func TestTransactionForgotten(t *testing.T) {
22452272
}
22462273

22472274
fetcher := NewTxFetcherForTests(
2275+
func(common.Hash) bool { return false },
22482276
func(common.Hash) bool { return false },
22492277
func(txs []*types.Transaction) []error {
22502278
errs := make([]error, len(txs))

eth/handler.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,13 @@ func newHandler(config *handlerConfig) (*handler, error) {
206206
return h.txpool.Add(txs, false)
207207
}
208208
hasTx := func(hash common.Hash) bool {
209-
if h.txpool.Has(hash) {
210-
return true
211-
}
212-
// check on chain as well (no need to check limbo separately, as chain checks limbo too)
213-
if h.chain.HasCanonicalTransaction(hash, false) {
214-
return true
215-
}
216-
// tx not found
217-
return false
209+
return h.txpool.Has(hash)
210+
}
211+
chainTx := func(hash common.Hash) bool {
212+
// check on chain (no need to check limbo separately, as chain checks limbo too)
213+
return h.chain.HasCanonicalTransaction(hash, false)
218214
}
219-
h.txFetcher = fetcher.NewTxFetcher(hasTx, addTxs, fetchTx, h.removePeer)
215+
h.txFetcher = fetcher.NewTxFetcher(hasTx, chainTx, addTxs, fetchTx, h.removePeer)
220216
return h, nil
221217
}
222218

tests/fuzzers/txfetcher/txfetcher_fuzzer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func fuzz(input []byte) int {
7878
rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!!
7979

8080
f := fetcher.NewTxFetcherForTests(
81+
func(common.Hash) bool { return false },
8182
func(common.Hash) bool { return false },
8283
func(txs []*types.Transaction) []error {
8384
return make([]error, len(txs))

0 commit comments

Comments
 (0)