Skip to content

Commit c3dbf08

Browse files
committed
ethreceipts: improve interactions with flaky providers
1 parent 3d076ae commit c3dbf08

File tree

3 files changed

+677
-82
lines changed

3 files changed

+677
-82
lines changed

ethreceipts/ethreceipts.go

Lines changed: 151 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
stdlog "log"
78
"log/slog"
89
"math/big"
910
"runtime/debug"
@@ -28,21 +29,35 @@ import (
2829
)
2930

3031
var DefaultOptions = Options{
31-
MaxConcurrentFetchReceiptWorkers: 100,
32-
MaxConcurrentFilterWorkers: 50,
33-
PastReceiptsCacheSize: 5_000,
34-
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
35-
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
36-
Alerter: util.NoopAlerter(),
32+
MaxConcurrentFetchReceiptWorkers: 100,
33+
MaxConcurrentFilterWorkers: 200,
34+
MaxConcurrentSearchOnChainWorkers: 5,
35+
LatestBlockNumCacheTTL: 2 * time.Second,
36+
PastReceiptsCacheSize: 5_000,
37+
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
38+
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
39+
Alerter: util.NoopAlerter(),
3740
}
3841

42+
const (
43+
maxFiltersPerListener = 1000
44+
)
45+
3946
type Options struct {
4047
// ..
4148
MaxConcurrentFetchReceiptWorkers int
4249

4350
// ..
4451
MaxConcurrentFilterWorkers int
4552

53+
// MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent
54+
// on-chain searches (this is per subscriber)
55+
MaxConcurrentSearchOnChainWorkers int
56+
57+
// LatestBlockNumCacheTTL is the duration to cache the latest block number,
58+
// this prevents frequent calls to the monitor for latest block number
59+
LatestBlockNumCacheTTL time.Duration
60+
4661
// ..
4762
PastReceiptsCacheSize int
4863

@@ -95,6 +110,9 @@ type ReceiptsListener struct {
95110
ctxStop context.CancelFunc
96111
running int32
97112
mu sync.RWMutex
113+
114+
latestBlockNumCache atomic.Uint64
115+
latestBlockNumTime atomic.Int64
98116
}
99117

100118
var (
@@ -137,18 +155,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e
137155
return nil, err
138156
}
139157

158+
// max ~12s total wait time before giving up
159+
br := breaker.New(log, 200*time.Millisecond, 1.2, 20)
160+
140161
return &ReceiptsListener{
141162
options: opts,
142163
log: log,
143164
alert: opts.Alerter,
144165
provider: provider,
145166
monitor: monitor,
146-
br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries
167+
br: br,
147168
fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers),
148169
pastReceipts: pastReceipts,
149170
notFoundTxnHashes: notFoundTxnHashes,
150171
subscribers: make([]*subscriber, 0),
151-
registerFiltersCh: make(chan registerFilters, 1000),
172+
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
152173
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
153174
}, nil
154175
}
@@ -160,7 +181,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
160181
var err error
161182
l.chainID, err = getChainID(ctx, l.provider)
162183
if err != nil {
163-
l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error
184+
return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err)
164185
}
165186

166187
if l.options.NumBlocksToFinality <= 0 {
@@ -190,6 +211,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
190211
defer atomic.StoreInt32(&l.running, 0)
191212

192213
if err := l.lazyInit(ctx); err != nil {
214+
slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error()))
193215
return err
194216
}
195217

@@ -435,16 +457,19 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
435457
// it indicates that we have high conviction that the receipt should be available, as the monitor has found
436458
// this transaction hash.
437459
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
460+
stdlog.Printf("ethreceipts: fetchTransactionReceipt %s (forceFetch=%v)", txnHash.String(), forceFetch)
461+
438462
l.fetchSem <- struct{}{}
439463

440464
resultCh := make(chan *types.Receipt)
441465
errCh := make(chan error)
442466

443-
defer close(resultCh)
444-
defer close(errCh)
445-
446467
go func() {
468+
447469
defer func() {
470+
close(resultCh)
471+
close(errCh)
472+
448473
<-l.fetchSem
449474
}()
450475

@@ -484,22 +509,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash
484509
defer clearTimeout()
485510

486511
receipt, err := l.provider.TransactionReceipt(tctx, txnHash)
487-
488-
if !forceFetch && errors.Is(err, ethereum.NotFound) {
489-
// record the blockNum, maybe this receipt is just too new and nodes are telling
490-
// us they can't find it yet, in which case we will rely on the monitor to
491-
// clear this flag for us.
492-
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
493-
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
494-
errCh <- err
495-
return nil
496-
} else if forceFetch && receipt == nil {
497-
// force fetch, lets retry a number of times as the node may end up finding the receipt.
498-
// txn has been found in the monitor with event added, but still haven't retrived the receipt.
499-
// this could be that we're too fast and node isn't returning the receipt yet.
500-
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash)
501-
}
502512
if err != nil {
513+
if !forceFetch && errors.Is(err, ethereum.NotFound) {
514+
// record the blockNum, maybe this receipt is just too new and nodes are telling
515+
// us they can't find it yet, in which case we will rely on the monitor to
516+
// clear this flag for us.
517+
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
518+
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
519+
errCh <- err
520+
return nil
521+
}
522+
if forceFetch {
523+
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err)
524+
}
503525
return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err)
504526
}
505527

@@ -733,6 +755,8 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
733755

734756
// check each block against each subscriber X filter
735757
for _, block := range blocks {
758+
stdlog.Printf("ethreceipts: block[%d] -> filterers %d subscribers %d", block.Block.Number(), len(filterers), len(subscribers))
759+
736760
// report if the txn was removed
737761
reorged := block.Event == ethmonitor.Removed
738762

@@ -772,14 +796,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
772796
// match the receipts against the filters
773797
var wg sync.WaitGroup
774798
for i, sub := range subscribers {
775-
wg.Add(1)
776799
l.filterSem <- struct{}{}
800+
801+
wg.Add(1)
777802
go func(i int, sub *subscriber) {
778803
defer func() {
779804
<-l.filterSem
780805
wg.Done()
781806
}()
782807

808+
// retry pending receipts first
809+
retryCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
810+
sub.retryPendingReceipts(retryCtx)
811+
cancel()
812+
783813
// filter matcher
784814
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
785815
if err != nil {
@@ -801,6 +831,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
801831
}
802832

803833
func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error {
834+
// Collect eligible filters first
835+
type filterWithHash struct {
836+
filterer Filterer
837+
txnHash common.Hash
838+
}
839+
840+
eligible := make([]filterWithHash, 0, len(filterers))
841+
804842
for _, filterer := range filterers {
805843
if !filterer.Options().SearchOnChain {
806844
// skip filters which do not ask to search on chain
@@ -813,34 +851,68 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
813851
continue
814852
}
815853

816-
r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false)
817-
if !errors.Is(err, ethereum.NotFound) && err != nil {
818-
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
819-
}
820-
if r == nil {
821-
// unable to find the receipt on-chain, lets continue
822-
continue
823-
}
854+
eligible = append(eligible, filterWithHash{filterer, *txnHashCond})
855+
}
824856

825-
if f, ok := filterer.(*filter); ok {
826-
f.lastMatchBlockNum = r.BlockNumber.Uint64()
827-
}
857+
if len(eligible) == 0 {
858+
return nil
859+
}
828860

829-
receipt := Receipt{
830-
receipt: r,
831-
// NOTE: we do not include the transaction at this point, as we don't have it.
832-
// transaction: txn,
833-
Final: l.isBlockFinal(r.BlockNumber),
834-
}
861+
// Process in batches with bounded concurrency using errgroup
862+
sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers)
863+
g, gctx := errgroup.WithContext(ctx)
835864

836-
// will always find the receipt, as it will be in our case previously found above.
837-
// this is called so we can broadcast the match to the filterer's subscriber.
838-
_, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt})
839-
if err != nil {
840-
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
841-
}
865+
for _, item := range eligible {
866+
item := item // capture loop variable
867+
868+
g.Go(func() error {
869+
select {
870+
case sem <- struct{}{}:
871+
defer func() {
872+
<-sem
873+
}()
874+
case <-gctx.Done():
875+
return gctx.Err()
876+
}
877+
878+
r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false)
879+
if !errors.Is(err, ethereum.NotFound) && err != nil {
880+
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
881+
// Don't return error, just log and continue with other filters
882+
return nil
883+
}
884+
if r == nil {
885+
panic("ethreceipts: unexpected nil receipt with no error")
886+
// unable to find the receipt on-chain, lets continue
887+
return nil
888+
}
889+
890+
if f, ok := item.filterer.(*filter); ok {
891+
f.lastMatchBlockNum = r.BlockNumber.Uint64()
892+
}
893+
894+
receipt := Receipt{
895+
receipt: r,
896+
// NOTE: we do not include the transaction at this point, as we don't have it.
897+
// transaction: txn,
898+
Final: l.isBlockFinal(r.BlockNumber),
899+
}
900+
901+
// will always find the receipt, as it will be in our case previously found above.
902+
// this is called so we can broadcast the match to the filterer's subscriber.
903+
_, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt})
904+
if err != nil {
905+
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
906+
// Don't return error, just log and continue
907+
return nil
908+
}
909+
910+
return nil
911+
})
842912
}
843913

914+
// Wait for all goroutines, but we're not propagating errors since we just log them
915+
_ = g.Wait()
844916
return nil
845917
}
846918

@@ -871,10 +943,33 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool {
871943
}
872944

873945
func (l *ReceiptsListener) latestBlockNum() *big.Int {
946+
// Cache latest block number for up to 3 seconds to avoid hammering monitor.LatestBlockNum()
947+
cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0)
948+
if time.Since(cachedTime) < l.options.LatestBlockNumCacheTTL {
949+
cachedNum := l.latestBlockNumCache.Load()
950+
if cachedNum > 0 {
951+
return big.NewInt(int64(cachedNum))
952+
}
953+
}
954+
955+
l.latestBlockNumTime.Store(time.Now().Unix())
956+
latestBlockNum := l.fetchLatestBlockNum()
957+
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
958+
l.latestBlockNumCache.Store(latestBlockNum.Uint64())
959+
}
960+
961+
return latestBlockNum
962+
}
963+
964+
func (l *ReceiptsListener) fetchLatestBlockNum() *big.Int {
965+
timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
966+
defer cancel()
967+
874968
latestBlockNum := l.monitor.LatestBlockNum()
969+
875970
if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 {
876971
err := l.br.Do(l.ctx, func() error {
877-
block, err := l.provider.BlockByNumber(context.Background(), nil)
972+
block, err := l.provider.BlockByNumber(timeoutCtx, nil)
878973
if err != nil {
879974
return err
880975
}
@@ -886,11 +981,14 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
886981
}
887982
return latestBlockNum
888983
}
984+
889985
return latestBlockNum
890986
}
891987

892988
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
893989
var chainID *big.Int
990+
991+
// provide plenty of time for breaker to succeed
894992
err := breaker.Do(ctx, func() error {
895993
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
896994
defer cancel()
@@ -901,7 +999,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
901999
}
9021000
chainID = id
9031001
return nil
904-
}, nil, 1*time.Second, 2, 3)
1002+
}, nil, 1*time.Second, 2, 10)
9051003

9061004
if err != nil {
9071005
return nil, err

0 commit comments

Comments
 (0)