Skip to content

Commit 3cb414a

Browse files
committed
ethreceipts: improve search and receipt fetching with flaky providers
1 parent 3d076ae commit 3cb414a

File tree

6 files changed

+1087
-108
lines changed

6 files changed

+1087
-108
lines changed

ethreceipts/ethreceipts.go

Lines changed: 160 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,35 @@ import (
2828
)
2929

3030
var DefaultOptions = Options{
31-
MaxConcurrentFetchReceiptWorkers: 100,
32-
MaxConcurrentFilterWorkers: 50,
33-
PastReceiptsCacheSize: 5_000,
34-
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
35-
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
36-
Alerter: util.NoopAlerter(),
31+
MaxConcurrentFetchReceiptWorkers: 100,
32+
MaxConcurrentFilterWorkers: 200,
33+
MaxConcurrentSearchOnChainWorkers: 5,
34+
LatestBlockNumCacheTTL: 2 * time.Second,
35+
PastReceiptsCacheSize: 5_000,
36+
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
37+
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
38+
Alerter: util.NoopAlerter(),
3739
}
3840

41+
const (
42+
maxFiltersPerListener = 1000
43+
)
44+
3945
type Options struct {
4046
// ..
4147
MaxConcurrentFetchReceiptWorkers int
4248

4349
// ..
4450
MaxConcurrentFilterWorkers int
4551

52+
// MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent
53+
// on-chain searches (this is per subscriber)
54+
MaxConcurrentSearchOnChainWorkers int
55+
56+
// LatestBlockNumCacheTTL is the duration to cache the latest block number,
57+
// this prevents frequent calls to the monitor for latest block number
58+
LatestBlockNumCacheTTL time.Duration
59+
4660
// ..
4761
PastReceiptsCacheSize int
4862

@@ -95,6 +109,9 @@ type ReceiptsListener struct {
95109
ctxStop context.CancelFunc
96110
running int32
97111
mu sync.RWMutex
112+
113+
latestBlockNumCache atomic.Uint64
114+
latestBlockNumTime atomic.Int64
98115
}
99116

100117
var (
@@ -137,18 +154,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e
137154
return nil, err
138155
}
139156

157+
// max ~12s total wait time before giving up
158+
br := breaker.New(log, 200*time.Millisecond, 1.2, 20)
159+
140160
return &ReceiptsListener{
141161
options: opts,
142162
log: log,
143163
alert: opts.Alerter,
144164
provider: provider,
145165
monitor: monitor,
146-
br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries
166+
br: br,
147167
fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers),
148168
pastReceipts: pastReceipts,
149169
notFoundTxnHashes: notFoundTxnHashes,
150170
subscribers: make([]*subscriber, 0),
151-
registerFiltersCh: make(chan registerFilters, 1000),
171+
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
152172
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
153173
}, nil
154174
}
@@ -160,7 +180,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
160180
var err error
161181
l.chainID, err = getChainID(ctx, l.provider)
162182
if err != nil {
163-
l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error
183+
return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err)
164184
}
165185

166186
if l.options.NumBlocksToFinality <= 0 {
@@ -190,6 +210,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
190210
defer atomic.StoreInt32(&l.running, 0)
191211

192212
if err := l.lazyInit(ctx); err != nil {
213+
slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error()))
193214
return err
194215
}
195216

@@ -295,7 +316,6 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
295316

296317
sub := l.Subscribe(query)
297318

298-
// Use a WaitGroup to ensure the goroutine cleans up before the function returns
299319
var wg sync.WaitGroup
300320

301321
exhausted := make(chan struct{})
@@ -435,13 +455,17 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
435455
// it indicates that we have high conviction that the receipt should be available, as the monitor has found
436456
// this transaction hash.
437457
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
438-
l.fetchSem <- struct{}{}
439458

440-
resultCh := make(chan *types.Receipt)
441-
errCh := make(chan error)
459+
// TODO: this might block forever if all workers are busy, should we have a
460+
// timeout?
461+
l.fetchSem <- struct{}{}
442462

443-
defer close(resultCh)
444-
defer close(errCh)
463+
// channels to receive result or error: it could be difficult to coordinate
464+
// closing them manually because we're also selecting on ctx.Done(), so we
465+
// use buffered channels of size 1 and let them be garbage collected after
466+
// this function returns.
467+
resultCh := make(chan *types.Receipt, 1)
468+
errCh := make(chan error, 1)
445469

446470
go func() {
447471
defer func() {
@@ -484,22 +508,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash
484508
defer clearTimeout()
485509

486510
receipt, err := l.provider.TransactionReceipt(tctx, txnHash)
487-
488-
if !forceFetch && errors.Is(err, ethereum.NotFound) {
489-
// record the blockNum, maybe this receipt is just too new and nodes are telling
490-
// us they can't find it yet, in which case we will rely on the monitor to
491-
// clear this flag for us.
492-
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
493-
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
494-
errCh <- err
495-
return nil
496-
} else if forceFetch && receipt == nil {
497-
// force fetch, lets retry a number of times as the node may end up finding the receipt.
498-
// txn has been found in the monitor with event added, but still haven't retrived the receipt.
499-
// this could be that we're too fast and node isn't returning the receipt yet.
500-
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash)
501-
}
502511
if err != nil {
512+
if !forceFetch && errors.Is(err, ethereum.NotFound) {
513+
// record the blockNum, maybe this receipt is just too new and nodes are telling
514+
// us they can't find it yet, in which case we will rely on the monitor to
515+
// clear this flag for us.
516+
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
517+
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
518+
errCh <- err
519+
return nil
520+
}
521+
if forceFetch {
522+
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err)
523+
}
503524
return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err)
504525
}
505526

@@ -647,9 +668,9 @@ func (l *ReceiptsListener) listener() error {
647668
if reorg {
648669
for _, list := range filters {
649670
for _, filterer := range list {
650-
if f, _ := filterer.(*filter); f != nil {
651-
f.startBlockNum = latestBlockNum
652-
f.lastMatchBlockNum = 0
671+
if f, ok := filterer.(*filter); ok {
672+
f.setStartBlockNum(latestBlockNum)
673+
f.setLastMatchBlockNum(0)
653674
}
654675
}
655676
}
@@ -666,12 +687,12 @@ func (l *ReceiptsListener) listener() error {
666687
for y, matched := range list {
667688
filterer := filters[x][y]
668689
if matched || filterer.StartBlockNum() == 0 {
669-
if f, _ := filterer.(*filter); f != nil {
670-
if f.startBlockNum == 0 {
671-
f.startBlockNum = latestBlockNum
690+
if f, ok := filterer.(*filter); ok {
691+
if f.StartBlockNum() == 0 {
692+
f.setStartBlockNum(latestBlockNum)
672693
}
673694
if matched {
674-
f.lastMatchBlockNum = latestBlockNum
695+
f.setLastMatchBlockNum(latestBlockNum)
675696
}
676697
}
677698
} else {
@@ -693,10 +714,8 @@ func (l *ReceiptsListener) listener() error {
693714
subscriber := subscribers[x]
694715
subscriber.RemoveFilter(filterer)
695716

696-
select {
697-
case <-f.Exhausted():
698-
default:
699-
close(f.exhausted)
717+
if f, ok := filterer.(*filter); ok {
718+
f.closeExhausted()
700719
}
701720
}
702721
}
@@ -772,14 +791,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
772791
// match the receipts against the filters
773792
var wg sync.WaitGroup
774793
for i, sub := range subscribers {
775-
wg.Add(1)
776794
l.filterSem <- struct{}{}
795+
796+
wg.Add(1)
777797
go func(i int, sub *subscriber) {
778798
defer func() {
779799
<-l.filterSem
780800
wg.Done()
781801
}()
782802

803+
// retry pending receipts first
804+
retryCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
805+
sub.retryPendingReceipts(retryCtx)
806+
cancel()
807+
783808
// filter matcher
784809
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
785810
if err != nil {
@@ -801,6 +826,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
801826
}
802827

803828
func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error {
829+
// Collect eligible filters first
830+
type filterWithHash struct {
831+
filterer Filterer
832+
txnHash common.Hash
833+
}
834+
835+
eligible := make([]filterWithHash, 0, len(filterers))
836+
804837
for _, filterer := range filterers {
805838
if !filterer.Options().SearchOnChain {
806839
// skip filters which do not ask to search on chain
@@ -813,34 +846,67 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
813846
continue
814847
}
815848

816-
r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false)
817-
if !errors.Is(err, ethereum.NotFound) && err != nil {
818-
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
819-
}
820-
if r == nil {
821-
// unable to find the receipt on-chain, lets continue
822-
continue
823-
}
849+
eligible = append(eligible, filterWithHash{filterer, *txnHashCond})
850+
}
824851

825-
if f, ok := filterer.(*filter); ok {
826-
f.lastMatchBlockNum = r.BlockNumber.Uint64()
827-
}
852+
if len(eligible) == 0 {
853+
return nil
854+
}
828855

829-
receipt := Receipt{
830-
receipt: r,
831-
// NOTE: we do not include the transaction at this point, as we don't have it.
832-
// transaction: txn,
833-
Final: l.isBlockFinal(r.BlockNumber),
834-
}
856+
// Process in batches with bounded concurrency using errgroup
857+
sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers)
858+
g, gctx := errgroup.WithContext(ctx)
835859

836-
// will always find the receipt, as it will be in our case previously found above.
837-
// this is called so we can broadcast the match to the filterer's subscriber.
838-
_, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt})
839-
if err != nil {
840-
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
841-
}
860+
for _, item := range eligible {
861+
item := item // capture loop variable
862+
863+
g.Go(func() error {
864+
select {
865+
case sem <- struct{}{}:
866+
defer func() {
867+
<-sem
868+
}()
869+
case <-gctx.Done():
870+
return gctx.Err()
871+
}
872+
873+
r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false)
874+
if !errors.Is(err, ethereum.NotFound) && err != nil {
875+
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
876+
// Don't return error, just log and continue with other filters
877+
return nil
878+
}
879+
if r == nil {
880+
// unable to find the receipt on-chain, lets continue
881+
return nil
882+
}
883+
884+
if f, ok := item.filterer.(*filter); ok {
885+
f.setLastMatchBlockNum(r.BlockNumber.Uint64())
886+
}
887+
888+
receipt := Receipt{
889+
receipt: r,
890+
// NOTE: we do not include the transaction at this point, as we don't have it.
891+
// transaction: txn,
892+
Final: l.isBlockFinal(r.BlockNumber),
893+
}
894+
895+
// will always find the receipt, as it will be in our case previously found above.
896+
// this is called so we can broadcast the match to the filterer's subscriber.
897+
_, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt})
898+
if err != nil {
899+
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
900+
// Don't return error, just log and continue
901+
return nil
902+
}
903+
904+
return nil
905+
})
842906
}
843907

908+
// Wait for all goroutines, but we're not propagating errors since we just log them
909+
_ = g.Wait()
844910
return nil
845911
}
846912

@@ -871,10 +937,34 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool {
871937
}
872938

873939
func (l *ReceiptsListener) latestBlockNum() *big.Int {
940+
// Cache latest block number to avoid hammering monitor.LatestBlockNum()
941+
cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0)
942+
if time.Since(cachedTime) < l.options.LatestBlockNumCacheTTL {
943+
cachedNum := l.latestBlockNumCache.Load()
944+
if cachedNum > 0 {
945+
return big.NewInt(int64(cachedNum))
946+
}
947+
}
948+
949+
l.latestBlockNumTime.Store(time.Now().Unix())
950+
951+
latestBlockNum := l.fetchLatestBlockNum()
952+
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
953+
l.latestBlockNumCache.Store(latestBlockNum.Uint64())
954+
}
955+
956+
return latestBlockNum
957+
}
958+
959+
func (l *ReceiptsListener) fetchLatestBlockNum() *big.Int {
960+
timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
961+
defer cancel()
962+
874963
latestBlockNum := l.monitor.LatestBlockNum()
964+
875965
if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 {
876966
err := l.br.Do(l.ctx, func() error {
877-
block, err := l.provider.BlockByNumber(context.Background(), nil)
967+
block, err := l.provider.BlockByNumber(timeoutCtx, nil)
878968
if err != nil {
879969
return err
880970
}
@@ -886,11 +976,14 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
886976
}
887977
return latestBlockNum
888978
}
979+
889980
return latestBlockNum
890981
}
891982

892983
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
893984
var chainID *big.Int
985+
986+
// provide plenty of time for breaker to succeed
894987
err := breaker.Do(ctx, func() error {
895988
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
896989
defer cancel()
@@ -901,7 +994,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
901994
}
902995
chainID = id
903996
return nil
904-
}, nil, 1*time.Second, 2, 3)
997+
}, nil, 1*time.Second, 2, 10)
905998

906999
if err != nil {
9071000
return nil, err

0 commit comments

Comments
 (0)