Skip to content

Commit 075a475

Browse files
authored
ethreceipts: improve interactions with flaky providers (#188)
1 parent c200f94 commit 075a475

File tree

6 files changed

+1083
-119
lines changed

6 files changed

+1083
-119
lines changed

ethreceipts/ethreceipts.go

Lines changed: 154 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,30 @@ import (
2828
)
2929

3030
var DefaultOptions = Options{
31-
MaxConcurrentFetchReceiptWorkers: 100,
32-
MaxConcurrentFilterWorkers: 50,
33-
PastReceiptsCacheSize: 5_000,
34-
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
35-
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
36-
Alerter: util.NoopAlerter(),
31+
MaxConcurrentFetchReceiptWorkers: 100,
32+
MaxConcurrentFilterWorkers: 200,
33+
MaxConcurrentSearchOnChainWorkers: 15,
34+
PastReceiptsCacheSize: 5_000,
35+
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
36+
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
37+
Alerter: util.NoopAlerter(),
3738
}
3839

40+
const (
41+
maxFiltersPerListener = 1000
42+
)
43+
3944
type Options struct {
4045
// ..
4146
MaxConcurrentFetchReceiptWorkers int
4247

4348
// ..
4449
MaxConcurrentFilterWorkers int
4550

51+
// MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent
52+
// on-chain searches (this is per subscriber)
53+
MaxConcurrentSearchOnChainWorkers int
54+
4655
// ..
4756
PastReceiptsCacheSize int
4857

@@ -137,18 +146,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e
137146
return nil, err
138147
}
139148

149+
// max ~12s total wait time before giving up
150+
br := breaker.New(log, 200*time.Millisecond, 1.2, 20)
151+
140152
return &ReceiptsListener{
141153
options: opts,
142154
log: log,
143155
alert: opts.Alerter,
144156
provider: provider,
145157
monitor: monitor,
146-
br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries
158+
br: br,
147159
fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers),
148160
pastReceipts: pastReceipts,
149161
notFoundTxnHashes: notFoundTxnHashes,
150162
subscribers: make([]*subscriber, 0),
151-
registerFiltersCh: make(chan registerFilters, 1000),
163+
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
152164
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
153165
}, nil
154166
}
@@ -160,7 +172,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
160172
var err error
161173
l.chainID, err = getChainID(ctx, l.provider)
162174
if err != nil {
163-
l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error
175+
return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err)
164176
}
165177

166178
if l.options.NumBlocksToFinality <= 0 {
@@ -190,6 +202,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
190202
defer atomic.StoreInt32(&l.running, 0)
191203

192204
if err := l.lazyInit(ctx); err != nil {
205+
slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error()))
193206
return err
194207
}
195208

@@ -435,13 +448,26 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
435448
// it indicates that we have high conviction that the receipt should be available, as the monitor has found
436449
// this transaction hash.
437450
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
438-
l.fetchSem <- struct{}{}
439451

440-
resultCh := make(chan *types.Receipt)
441-
errCh := make(chan error)
452+
timeStart := time.Now()
453+
for {
454+
select {
455+
case l.fetchSem <- struct{}{}:
456+
goto start
457+
case <-time.After(1 * time.Minute):
458+
elapsed := time.Since(timeStart)
459+
l.log.Warn(fmt.Sprintf("fetchTransactionReceipt(%s) waiting for fetch semaphore for %s", txnHash.String(), elapsed))
460+
}
461+
}
462+
463+
start:
442464

443-
defer close(resultCh)
444-
defer close(errCh)
465+
// channels to receive result or error: it could be difficult to coordinate
466+
// closing them manually because we're also selecting on ctx.Done(), so we
467+
// use buffered channels of size 1 and let them be garbage collected after
468+
// this function returns.
469+
resultCh := make(chan *types.Receipt, 1)
470+
errCh := make(chan error, 1)
445471

446472
go func() {
447473
defer func() {
@@ -484,22 +510,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash
484510
defer clearTimeout()
485511

486512
receipt, err := l.provider.TransactionReceipt(tctx, txnHash)
487-
488-
if !forceFetch && errors.Is(err, ethereum.NotFound) {
489-
// record the blockNum, maybe this receipt is just too new and nodes are telling
490-
// us they can't find it yet, in which case we will rely on the monitor to
491-
// clear this flag for us.
492-
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
493-
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
494-
errCh <- err
495-
return nil
496-
} else if forceFetch && receipt == nil {
497-
// force fetch, lets retry a number of times as the node may end up finding the receipt.
498-
// txn has been found in the monitor with event added, but still haven't retrived the receipt.
499-
// this could be that we're too fast and node isn't returning the receipt yet.
500-
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash)
501-
}
502513
if err != nil {
514+
if !forceFetch && errors.Is(err, ethereum.NotFound) {
515+
// record the blockNum, maybe this receipt is just too new and nodes are telling
516+
// us they can't find it yet, in which case we will rely on the monitor to
517+
// clear this flag for us.
518+
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
519+
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
520+
errCh <- err
521+
return nil
522+
}
523+
if forceFetch {
524+
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err)
525+
}
503526
return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err)
504527
}
505528

@@ -647,9 +670,9 @@ func (l *ReceiptsListener) listener() error {
647670
if reorg {
648671
for _, list := range filters {
649672
for _, filterer := range list {
650-
if f, _ := filterer.(*filter); f != nil {
651-
f.startBlockNum = latestBlockNum
652-
f.lastMatchBlockNum = 0
673+
if f, ok := filterer.(*filter); ok {
674+
f.setStartBlockNum(latestBlockNum)
675+
f.setLastMatchBlockNum(0)
653676
}
654677
}
655678
}
@@ -666,12 +689,12 @@ func (l *ReceiptsListener) listener() error {
666689
for y, matched := range list {
667690
filterer := filters[x][y]
668691
if matched || filterer.StartBlockNum() == 0 {
669-
if f, _ := filterer.(*filter); f != nil {
670-
if f.startBlockNum == 0 {
671-
f.startBlockNum = latestBlockNum
692+
if f, ok := filterer.(*filter); ok {
693+
if f.StartBlockNum() == 0 {
694+
f.setStartBlockNum(latestBlockNum)
672695
}
673696
if matched {
674-
f.lastMatchBlockNum = latestBlockNum
697+
f.setLastMatchBlockNum(latestBlockNum)
675698
}
676699
}
677700
} else {
@@ -693,10 +716,8 @@ func (l *ReceiptsListener) listener() error {
693716
subscriber := subscribers[x]
694717
subscriber.RemoveFilter(filterer)
695718

696-
select {
697-
case <-f.Exhausted():
698-
default:
699-
close(f.exhausted)
719+
if f, ok := filterer.(*filter); ok {
720+
f.closeExhausted()
700721
}
701722
}
702723
}
@@ -772,14 +793,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
772793
// match the receipts against the filters
773794
var wg sync.WaitGroup
774795
for i, sub := range subscribers {
775-
wg.Add(1)
776796
l.filterSem <- struct{}{}
797+
798+
wg.Add(1)
777799
go func(i int, sub *subscriber) {
778800
defer func() {
779801
<-l.filterSem
780802
wg.Done()
781803
}()
782804

805+
// retry pending receipts first
806+
retryCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
807+
sub.retryPendingReceipts(retryCtx)
808+
cancel()
809+
783810
// filter matcher
784811
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
785812
if err != nil {
@@ -801,6 +828,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
801828
}
802829

803830
func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error {
831+
// Collect eligible filters first
832+
type filterWithHash struct {
833+
filterer Filterer
834+
txnHash common.Hash
835+
}
836+
837+
eligible := make([]filterWithHash, 0, len(filterers))
838+
804839
for _, filterer := range filterers {
805840
if !filterer.Options().SearchOnChain {
806841
// skip filters which do not ask to search on chain
@@ -813,34 +848,67 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
813848
continue
814849
}
815850

816-
r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false)
817-
if !errors.Is(err, ethereum.NotFound) && err != nil {
818-
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
819-
}
820-
if r == nil {
821-
// unable to find the receipt on-chain, lets continue
822-
continue
823-
}
851+
eligible = append(eligible, filterWithHash{filterer, *txnHashCond})
852+
}
824853

825-
if f, ok := filterer.(*filter); ok {
826-
f.lastMatchBlockNum = r.BlockNumber.Uint64()
827-
}
854+
if len(eligible) == 0 {
855+
return nil
856+
}
828857

829-
receipt := Receipt{
830-
receipt: r,
831-
// NOTE: we do not include the transaction at this point, as we don't have it.
832-
// transaction: txn,
833-
Final: l.isBlockFinal(r.BlockNumber),
834-
}
858+
// Process in batches with bounded concurrency using errgroup
859+
sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers)
860+
g, gctx := errgroup.WithContext(ctx)
835861

836-
// will always find the receipt, as it will be in our case previously found above.
837-
// this is called so we can broadcast the match to the filterer's subscriber.
838-
_, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt})
839-
if err != nil {
840-
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
841-
}
862+
for _, item := range eligible {
863+
item := item // capture loop variable
864+
865+
g.Go(func() error {
866+
select {
867+
case sem <- struct{}{}:
868+
defer func() {
869+
<-sem
870+
}()
871+
case <-gctx.Done():
872+
return gctx.Err()
873+
}
874+
875+
r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false)
876+
if !errors.Is(err, ethereum.NotFound) && err != nil {
877+
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
878+
// Don't return error, just log and continue with other filters
879+
return nil
880+
}
881+
if r == nil {
882+
// unable to find the receipt on-chain, lets continue
883+
return nil
884+
}
885+
886+
if f, ok := item.filterer.(*filter); ok {
887+
f.setLastMatchBlockNum(r.BlockNumber.Uint64())
888+
}
889+
890+
receipt := Receipt{
891+
receipt: r,
892+
// NOTE: we do not include the transaction at this point, as we don't have it.
893+
// transaction: txn,
894+
Final: l.isBlockFinal(r.BlockNumber),
895+
}
896+
897+
// will always find the receipt, as it will be in our case previously found above.
898+
// this is called so we can broadcast the match to the filterer's subscriber.
899+
_, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt})
900+
if err != nil {
901+
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
902+
// Don't return error, just log and continue
903+
return nil
904+
}
905+
906+
return nil
907+
})
842908
}
843909

910+
// Wait for all goroutines, but we're not propagating errors since we just log them
911+
_ = g.Wait()
844912
return nil
845913
}
846914

@@ -871,26 +939,35 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool {
871939
}
872940

873941
func (l *ReceiptsListener) latestBlockNum() *big.Int {
942+
// return immediately if the monitor has a latest block number
874943
latestBlockNum := l.monitor.LatestBlockNum()
875-
if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 {
876-
err := l.br.Do(l.ctx, func() error {
877-
block, err := l.provider.BlockByNumber(context.Background(), nil)
878-
if err != nil {
879-
return err
880-
}
881-
latestBlockNum = block.Number()
882-
return nil
883-
})
884-
if err != nil || latestBlockNum == nil {
944+
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
945+
return latestBlockNum
946+
}
947+
948+
// wait until monitor has a block num for us, up to a certain amount of time
949+
maxWaitTime := 30 * time.Second
950+
period := 250 * time.Millisecond
951+
for {
952+
time.Sleep(period)
953+
954+
latestBlockNum := l.monitor.LatestBlockNum()
955+
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
956+
return latestBlockNum
957+
}
958+
959+
maxWaitTime -= period
960+
if maxWaitTime <= 0 {
961+
l.log.Error("ethreceipts: latestBlockNum: monitor has no latest block number after waiting, returning 0")
885962
return big.NewInt(0)
886963
}
887-
return latestBlockNum
888964
}
889-
return latestBlockNum
890965
}
891966

892967
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
893968
var chainID *big.Int
969+
970+
// provide plenty of time for breaker to succeed
894971
err := breaker.Do(ctx, func() error {
895972
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
896973
defer cancel()
@@ -901,7 +978,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
901978
}
902979
chainID = id
903980
return nil
904-
}, nil, 1*time.Second, 2, 3)
981+
}, nil, 1*time.Second, 2, 10)
905982

906983
if err != nil {
907984
return nil, err

0 commit comments

Comments
 (0)