Skip to content

Commit 4ba6a34

Browse files
committed
ethreceipts: improve search and receipt fetching with flaky providers
simplify, clean-up
1 parent 3d076ae commit 4ba6a34

File tree

6 files changed

+1083
-111
lines changed

6 files changed

+1083
-111
lines changed

ethreceipts/ethreceipts.go

Lines changed: 160 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,35 @@ import (
2828
)
2929

3030
var DefaultOptions = Options{
31-
MaxConcurrentFetchReceiptWorkers: 100,
32-
MaxConcurrentFilterWorkers: 50,
33-
PastReceiptsCacheSize: 5_000,
34-
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
35-
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
36-
Alerter: util.NoopAlerter(),
31+
MaxConcurrentFetchReceiptWorkers: 100,
32+
MaxConcurrentFilterWorkers: 200,
33+
MaxConcurrentSearchOnChainWorkers: 5,
34+
LatestBlockNumCacheTTL: 2 * time.Second,
35+
PastReceiptsCacheSize: 5_000,
36+
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
37+
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
38+
Alerter: util.NoopAlerter(),
3739
}
3840

41+
const (
42+
maxFiltersPerListener = 1000
43+
)
44+
3945
type Options struct {
4046
// ..
4147
MaxConcurrentFetchReceiptWorkers int
4248

4349
// ..
4450
MaxConcurrentFilterWorkers int
4551

52+
// MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent
53+
// on-chain searches (this is per subscriber)
54+
MaxConcurrentSearchOnChainWorkers int
55+
56+
// LatestBlockNumCacheTTL is the duration to cache the latest block number,
57+
// this prevents frequent calls to the monitor for latest block number
58+
LatestBlockNumCacheTTL time.Duration
59+
4660
// ..
4761
PastReceiptsCacheSize int
4862

@@ -95,6 +109,9 @@ type ReceiptsListener struct {
95109
ctxStop context.CancelFunc
96110
running int32
97111
mu sync.RWMutex
112+
113+
latestBlockNumCache atomic.Uint64
114+
latestBlockNumTime atomic.Int64
98115
}
99116

100117
var (
@@ -137,18 +154,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e
137154
return nil, err
138155
}
139156

157+
// max ~12s total wait time before giving up
158+
br := breaker.New(log, 200*time.Millisecond, 1.2, 20)
159+
140160
return &ReceiptsListener{
141161
options: opts,
142162
log: log,
143163
alert: opts.Alerter,
144164
provider: provider,
145165
monitor: monitor,
146-
br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries
166+
br: br,
147167
fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers),
148168
pastReceipts: pastReceipts,
149169
notFoundTxnHashes: notFoundTxnHashes,
150170
subscribers: make([]*subscriber, 0),
151-
registerFiltersCh: make(chan registerFilters, 1000),
171+
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
152172
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
153173
}, nil
154174
}
@@ -160,7 +180,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
160180
var err error
161181
l.chainID, err = getChainID(ctx, l.provider)
162182
if err != nil {
163-
l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error
183+
return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err)
164184
}
165185

166186
if l.options.NumBlocksToFinality <= 0 {
@@ -190,6 +210,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
190210
defer atomic.StoreInt32(&l.running, 0)
191211

192212
if err := l.lazyInit(ctx); err != nil {
213+
slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error()))
193214
return err
194215
}
195216

@@ -435,13 +456,17 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
435456
// it indicates that we have high conviction that the receipt should be available, as the monitor has found
436457
// this transaction hash.
437458
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
438-
l.fetchSem <- struct{}{}
439459

440-
resultCh := make(chan *types.Receipt)
441-
errCh := make(chan error)
460+
// TODO: this might block forever if all workers are busy, should we have a
461+
// timeout?
462+
l.fetchSem <- struct{}{}
442463

443-
defer close(resultCh)
444-
defer close(errCh)
464+
// channels to receive result or error: it could be difficult to coordinate
465+
// closing them manually because we're also selecting on ctx.Done(), so we
466+
// use buffered channels of size 1 and let them be garbage collected after
467+
// this function returns.
468+
resultCh := make(chan *types.Receipt, 1)
469+
errCh := make(chan error, 1)
445470

446471
go func() {
447472
defer func() {
@@ -484,22 +509,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash
484509
defer clearTimeout()
485510

486511
receipt, err := l.provider.TransactionReceipt(tctx, txnHash)
487-
488-
if !forceFetch && errors.Is(err, ethereum.NotFound) {
489-
// record the blockNum, maybe this receipt is just too new and nodes are telling
490-
// us they can't find it yet, in which case we will rely on the monitor to
491-
// clear this flag for us.
492-
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
493-
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
494-
errCh <- err
495-
return nil
496-
} else if forceFetch && receipt == nil {
497-
// force fetch, lets retry a number of times as the node may end up finding the receipt.
498-
// txn has been found in the monitor with event added, but still haven't retrived the receipt.
499-
// this could be that we're too fast and node isn't returning the receipt yet.
500-
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash)
501-
}
502512
if err != nil {
513+
if !forceFetch && errors.Is(err, ethereum.NotFound) {
514+
// record the blockNum, maybe this receipt is just too new and nodes are telling
515+
// us they can't find it yet, in which case we will rely on the monitor to
516+
// clear this flag for us.
517+
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
518+
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
519+
errCh <- err
520+
return nil
521+
}
522+
if forceFetch {
523+
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err)
524+
}
503525
return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err)
504526
}
505527

@@ -647,9 +669,9 @@ func (l *ReceiptsListener) listener() error {
647669
if reorg {
648670
for _, list := range filters {
649671
for _, filterer := range list {
650-
if f, _ := filterer.(*filter); f != nil {
651-
f.startBlockNum = latestBlockNum
652-
f.lastMatchBlockNum = 0
672+
if f, ok := filterer.(*filter); ok {
673+
f.setStartBlockNum(latestBlockNum)
674+
f.setLastMatchBlockNum(0)
653675
}
654676
}
655677
}
@@ -666,12 +688,12 @@ func (l *ReceiptsListener) listener() error {
666688
for y, matched := range list {
667689
filterer := filters[x][y]
668690
if matched || filterer.StartBlockNum() == 0 {
669-
if f, _ := filterer.(*filter); f != nil {
670-
if f.startBlockNum == 0 {
671-
f.startBlockNum = latestBlockNum
691+
if f, ok := filterer.(*filter); ok {
692+
if f.StartBlockNum() == 0 {
693+
f.setStartBlockNum(latestBlockNum)
672694
}
673695
if matched {
674-
f.lastMatchBlockNum = latestBlockNum
696+
f.setLastMatchBlockNum(latestBlockNum)
675697
}
676698
}
677699
} else {
@@ -693,10 +715,8 @@ func (l *ReceiptsListener) listener() error {
693715
subscriber := subscribers[x]
694716
subscriber.RemoveFilter(filterer)
695717

696-
select {
697-
case <-f.Exhausted():
698-
default:
699-
close(f.exhausted)
718+
if f, ok := filterer.(*filter); ok {
719+
f.closeExhausted()
700720
}
701721
}
702722
}
@@ -772,14 +792,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
772792
// match the receipts against the filters
773793
var wg sync.WaitGroup
774794
for i, sub := range subscribers {
775-
wg.Add(1)
776795
l.filterSem <- struct{}{}
796+
797+
wg.Add(1)
777798
go func(i int, sub *subscriber) {
778799
defer func() {
779800
<-l.filterSem
780801
wg.Done()
781802
}()
782803

804+
// retry pending receipts first
805+
retryCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
806+
sub.retryPendingReceipts(retryCtx)
807+
cancel()
808+
783809
// filter matcher
784810
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
785811
if err != nil {
@@ -801,6 +827,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
801827
}
802828

803829
func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error {
830+
// Collect eligible filters first
831+
type filterWithHash struct {
832+
filterer Filterer
833+
txnHash common.Hash
834+
}
835+
836+
eligible := make([]filterWithHash, 0, len(filterers))
837+
804838
for _, filterer := range filterers {
805839
if !filterer.Options().SearchOnChain {
806840
// skip filters which do not ask to search on chain
@@ -813,34 +847,67 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
813847
continue
814848
}
815849

816-
r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false)
817-
if !errors.Is(err, ethereum.NotFound) && err != nil {
818-
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
819-
}
820-
if r == nil {
821-
// unable to find the receipt on-chain, lets continue
822-
continue
823-
}
850+
eligible = append(eligible, filterWithHash{filterer, *txnHashCond})
851+
}
824852

825-
if f, ok := filterer.(*filter); ok {
826-
f.lastMatchBlockNum = r.BlockNumber.Uint64()
827-
}
853+
if len(eligible) == 0 {
854+
return nil
855+
}
828856

829-
receipt := Receipt{
830-
receipt: r,
831-
// NOTE: we do not include the transaction at this point, as we don't have it.
832-
// transaction: txn,
833-
Final: l.isBlockFinal(r.BlockNumber),
834-
}
857+
// Process in batches with bounded concurrency using errgroup
858+
sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers)
859+
g, gctx := errgroup.WithContext(ctx)
835860

836-
// will always find the receipt, as it will be in our case previously found above.
837-
// this is called so we can broadcast the match to the filterer's subscriber.
838-
_, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt})
839-
if err != nil {
840-
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
841-
}
861+
for _, item := range eligible {
862+
item := item // capture loop variable
863+
864+
g.Go(func() error {
865+
select {
866+
case sem <- struct{}{}:
867+
defer func() {
868+
<-sem
869+
}()
870+
case <-gctx.Done():
871+
return gctx.Err()
872+
}
873+
874+
r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false)
875+
if !errors.Is(err, ethereum.NotFound) && err != nil {
876+
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
877+
// Don't return error, just log and continue with other filters
878+
return nil
879+
}
880+
if r == nil {
881+
// unable to find the receipt on-chain, lets continue
882+
return nil
883+
}
884+
885+
if f, ok := item.filterer.(*filter); ok {
886+
f.setLastMatchBlockNum(r.BlockNumber.Uint64())
887+
}
888+
889+
receipt := Receipt{
890+
receipt: r,
891+
// NOTE: we do not include the transaction at this point, as we don't have it.
892+
// transaction: txn,
893+
Final: l.isBlockFinal(r.BlockNumber),
894+
}
895+
896+
// will always find the receipt, as it will be in our case previously found above.
897+
// this is called so we can broadcast the match to the filterer's subscriber.
898+
_, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt})
899+
if err != nil {
900+
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
901+
// Don't return error, just log and continue
902+
return nil
903+
}
904+
905+
return nil
906+
})
842907
}
843908

909+
// Wait for all goroutines, but we're not propagating errors since we just log them
910+
_ = g.Wait()
844911
return nil
845912
}
846913

@@ -871,10 +938,34 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool {
871938
}
872939

873940
func (l *ReceiptsListener) latestBlockNum() *big.Int {
941+
// Cache latest block number to avoid hammering monitor.LatestBlockNum()
942+
cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0)
943+
if time.Since(cachedTime) < l.options.LatestBlockNumCacheTTL {
944+
cachedNum := l.latestBlockNumCache.Load()
945+
if cachedNum > 0 {
946+
return big.NewInt(int64(cachedNum))
947+
}
948+
}
949+
950+
l.latestBlockNumTime.Store(time.Now().Unix())
951+
952+
latestBlockNum := l.fetchLatestBlockNum()
953+
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
954+
l.latestBlockNumCache.Store(latestBlockNum.Uint64())
955+
}
956+
957+
return latestBlockNum
958+
}
959+
960+
func (l *ReceiptsListener) fetchLatestBlockNum() *big.Int {
961+
timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
962+
defer cancel()
963+
874964
latestBlockNum := l.monitor.LatestBlockNum()
965+
875966
if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 {
876967
err := l.br.Do(l.ctx, func() error {
877-
block, err := l.provider.BlockByNumber(context.Background(), nil)
968+
block, err := l.provider.BlockByNumber(timeoutCtx, nil)
878969
if err != nil {
879970
return err
880971
}
@@ -886,11 +977,14 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
886977
}
887978
return latestBlockNum
888979
}
980+
889981
return latestBlockNum
890982
}
891983

892984
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
893985
var chainID *big.Int
986+
987+
// provide plenty of time for breaker to succeed
894988
err := breaker.Do(ctx, func() error {
895989
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
896990
defer cancel()
@@ -901,7 +995,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
901995
}
902996
chainID = id
903997
return nil
904-
}, nil, 1*time.Second, 2, 3)
998+
}, nil, 1*time.Second, 2, 10)
905999

9061000
if err != nil {
9071001
return nil, err

0 commit comments

Comments
 (0)