Skip to content

Commit 9093498

Browse files
committed
ethreceipts: port #188
1 parent d841cfe commit 9093498

File tree

3 files changed

+883
-34
lines changed

3 files changed

+883
-34
lines changed

ethreceipts/ethreceipts.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ var DefaultOptions = Options{
3535
Alerter: util.NoopAlerter(),
3636
}
3737

38+
const (
39+
maxFiltersPerListener = 1000
40+
)
41+
3842
type Options struct {
3943
// ..
4044
MaxConcurrentFetchReceiptWorkers int
@@ -145,7 +149,7 @@ func NewReceiptsListener(log logger.Logger, provider ethrpc.Interface, monitor *
145149
pastReceipts: pastReceipts,
146150
notFoundTxnHashes: notFoundTxnHashes,
147151
subscribers: make([]*subscriber, 0),
148-
registerFiltersCh: make(chan registerFilters, 1000),
152+
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
149153
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
150154
}, nil
151155
}
@@ -382,11 +386,8 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
382386
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
383387
l.fetchSem <- struct{}{}
384388

385-
resultCh := make(chan *types.Receipt)
386-
errCh := make(chan error)
387-
388-
defer close(resultCh)
389-
defer close(errCh)
389+
resultCh := make(chan *types.Receipt, 1)
390+
errCh := make(chan error, 1)
390391

391392
go func() {
392393
defer func() {
@@ -632,6 +633,12 @@ func (l *ReceiptsListener) listener() error {
632633
panic("ethreceipts: unexpected")
633634
}
634635

636+
// Do not exhaust a TxnHash filter that has never matched.
637+
// Its historical search may have failed, and it should be allowed to keep waiting.
638+
if f.Cond().TxnHash != nil && f.LastMatchBlockNum() == 0 {
639+
continue
640+
}
641+
635642
if (f.Options().LimitOne && f.LastMatchBlockNum() == 0) || !f.Options().LimitOne {
636643
l.log.Debugf("filter exhausted! last block matched:%d maxWait:%d filterID:%d", filterer.LastMatchBlockNum(), maxWait, filterer.FilterID())
637644

@@ -711,6 +718,11 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
711718
wg.Done()
712719
}()
713720

721+
// retry pending receipts first
722+
retryCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
723+
sub.retryPendingReceipts(retryCtx)
724+
cancel()
725+
714726
// filter matcher
715727
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
716728
if err != nil {
@@ -822,6 +834,8 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int {
822834

823835
func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
824836
var chainID *big.Int
837+
838+
// provide plenty of time for breaker to succeed
825839
err := breaker.Do(ctx, func() error {
826840
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
827841
defer cancel()
@@ -832,7 +846,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
832846
}
833847
chainID = id
834848
return nil
835-
}, nil, 1*time.Second, 2, 3)
849+
}, nil, 1*time.Second, 2, 10)
836850

837851
if err != nil {
838852
return nil, err

0 commit comments

Comments
 (0)