Skip to content
231 changes: 154 additions & 77 deletions ethreceipts/ethreceipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,30 @@ import (
)

var DefaultOptions = Options{
MaxConcurrentFetchReceiptWorkers: 100,
MaxConcurrentFilterWorkers: 50,
PastReceiptsCacheSize: 5_000,
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
Alerter: util.NoopAlerter(),
MaxConcurrentFetchReceiptWorkers: 100,
MaxConcurrentFilterWorkers: 200,
MaxConcurrentSearchOnChainWorkers: 15,
PastReceiptsCacheSize: 5_000,
NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality
FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed
Alerter: util.NoopAlerter(),
}

const (
maxFiltersPerListener = 1000
)

type Options struct {
// ..
MaxConcurrentFetchReceiptWorkers int

// ..
MaxConcurrentFilterWorkers int

// MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent
// on-chain searches (this is per subscriber)
MaxConcurrentSearchOnChainWorkers int

// ..
PastReceiptsCacheSize int

Expand Down Expand Up @@ -137,18 +146,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e
return nil, err
}

// max ~12s total wait time before giving up
br := breaker.New(log, 200*time.Millisecond, 1.2, 20)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original breaker retried 4 times max starting in 1 second, this has a similar total waiting time, but we don't wait that much, the objective is to maximize retries. Note that each retry has a 4 second hard timeout, so the max total time in the worst case scenario would be considerably longer (in case we we hit the 4 sec timeout), but also note that we're doing concurrent transaction receipt fetches now, and we won't block subscriptions that don't need to be blocked.


return &ReceiptsListener{
options: opts,
log: log,
alert: opts.Alerter,
provider: provider,
monitor: monitor,
br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries
br: br,
fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers),
pastReceipts: pastReceipts,
notFoundTxnHashes: notFoundTxnHashes,
subscribers: make([]*subscriber, 0),
registerFiltersCh: make(chan registerFilters, 1000),
registerFiltersCh: make(chan registerFilters, maxFiltersPerListener),
filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers),
}, nil
}
Expand All @@ -160,7 +172,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error {
var err error
l.chainID, err = getChainID(ctx, l.provider)
if err != nil {
l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error
return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumption won't make things work (AsMessage will fail, etc.). We must fail here until removing or replacing AsMessage.

}

if l.options.NumBlocksToFinality <= 0 {
Expand Down Expand Up @@ -190,6 +202,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error {
defer atomic.StoreInt32(&l.running, 0)

if err := l.lazyInit(ctx); err != nil {
slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error()))
return err
}

Expand Down Expand Up @@ -435,13 +448,26 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context
// it indicates that we have high conviction that the receipt should be available, as the monitor has found
// this transaction hash.
func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) {
l.fetchSem <- struct{}{}

resultCh := make(chan *types.Receipt)
errCh := make(chan error)
timeStart := time.Now()
for {
select {
case l.fetchSem <- struct{}{}:
goto start
case <-time.After(1 * time.Minute):
elapsed := time.Since(timeStart)
l.log.Warn(fmt.Sprintf("fetchTransactionReceipt(%s) waiting for fetch semaphore for %s", txnHash.String(), elapsed))
}
}

start:

defer close(resultCh)
defer close(errCh)
// channels to receive result or error: it could be difficult to coordinate
// closing them manually because we're also selecting on ctx.Done(), so we
// use buffered channels of size 1 and let them be garbage collected after
// this function returns.
resultCh := make(chan *types.Receipt, 1)
errCh := make(chan error, 1)

go func() {
defer func() {
Expand Down Expand Up @@ -484,22 +510,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash
defer clearTimeout()

receipt, err := l.provider.TransactionReceipt(tctx, txnHash)

if !forceFetch && errors.Is(err, ethereum.NotFound) {
// record the blockNum, maybe this receipt is just too new and nodes are telling
// us they can't find it yet, in which case we will rely on the monitor to
// clear this flag for us.
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
errCh <- err
return nil
} else if forceFetch && receipt == nil {
// force fetch, lets retry a number of times as the node may end up finding the receipt.
// txn has been found in the monitor with event added, but still haven't retrived the receipt.
// this could be that we're too fast and node isn't returning the receipt yet.
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash)
}
if err != nil {
if !forceFetch && errors.Is(err, ethereum.NotFound) {
// record the blockNum, maybe this receipt is just too new and nodes are telling
// us they can't find it yet, in which case we will rely on the monitor to
// clear this flag for us.
l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex))
l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum)
errCh <- err
return nil
}
if forceFetch {
return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err)
}
return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err)
}

Expand Down Expand Up @@ -647,9 +670,9 @@ func (l *ReceiptsListener) listener() error {
if reorg {
for _, list := range filters {
for _, filterer := range list {
if f, _ := filterer.(*filter); f != nil {
f.startBlockNum = latestBlockNum
f.lastMatchBlockNum = 0
if f, ok := filterer.(*filter); ok {
f.setStartBlockNum(latestBlockNum)
f.setLastMatchBlockNum(0)
}
}
}
Expand All @@ -666,12 +689,12 @@ func (l *ReceiptsListener) listener() error {
for y, matched := range list {
filterer := filters[x][y]
if matched || filterer.StartBlockNum() == 0 {
if f, _ := filterer.(*filter); f != nil {
if f.startBlockNum == 0 {
f.startBlockNum = latestBlockNum
if f, ok := filterer.(*filter); ok {
if f.StartBlockNum() == 0 {
f.setStartBlockNum(latestBlockNum)
}
if matched {
f.lastMatchBlockNum = latestBlockNum
f.setLastMatchBlockNum(latestBlockNum)
}
}
} else {
Expand All @@ -693,10 +716,8 @@ func (l *ReceiptsListener) listener() error {
subscriber := subscribers[x]
subscriber.RemoveFilter(filterer)

select {
case <-f.Exhausted():
default:
close(f.exhausted)
if f, ok := filterer.(*filter); ok {
f.closeExhausted()
}
}
}
Expand Down Expand Up @@ -772,14 +793,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
// match the receipts against the filters
var wg sync.WaitGroup
for i, sub := range subscribers {
wg.Add(1)
l.filterSem <- struct{}{}

wg.Add(1)
go func(i int, sub *subscriber) {
defer func() {
<-l.filterSem
wg.Done()
}()

// retry pending receipts first
retryCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
sub.retryPendingReceipts(retryCtx)
cancel()

// filter matcher
matched, err := sub.matchFilters(l.ctx, filterers[i], receipts)
if err != nil {
Expand All @@ -801,6 +828,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [
}

func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From sequential search to concurrent search. This is bounded by MaxConcurrentSearchOnChainWorkers.

// Collect eligible filters first
type filterWithHash struct {
filterer Filterer
txnHash common.Hash
}

eligible := make([]filterWithHash, 0, len(filterers))

for _, filterer := range filterers {
if !filterer.Options().SearchOnChain {
// skip filters which do not ask to search on chain
Expand All @@ -813,34 +848,67 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *
continue
}

r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false)
if !errors.Is(err, ethereum.NotFound) && err != nil {
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
}
if r == nil {
// unable to find the receipt on-chain, lets continue
continue
}
eligible = append(eligible, filterWithHash{filterer, *txnHashCond})
}

if f, ok := filterer.(*filter); ok {
f.lastMatchBlockNum = r.BlockNumber.Uint64()
}
if len(eligible) == 0 {
return nil
}

receipt := Receipt{
receipt: r,
// NOTE: we do not include the transaction at this point, as we don't have it.
// transaction: txn,
Final: l.isBlockFinal(r.BlockNumber),
}
// Process in batches with bounded concurrency using errgroup
sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers)
g, gctx := errgroup.WithContext(ctx)

// will always find the receipt, as it will be in our case previously found above.
// this is called so we can broadcast the match to the filterer's subscriber.
_, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt})
if err != nil {
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
}
for _, item := range eligible {
item := item // capture loop variable

g.Go(func() error {
select {
case sem <- struct{}{}:
defer func() {
<-sem
}()
case <-gctx.Done():
return gctx.Err()
}

r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false)
if !errors.Is(err, ethereum.NotFound) && err != nil {
l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err))
// Don't return error, just log and continue with other filters
return nil
}
if r == nil {
// unable to find the receipt on-chain, lets continue
return nil
}

if f, ok := item.filterer.(*filter); ok {
f.setLastMatchBlockNum(r.BlockNumber.Uint64())
}

receipt := Receipt{
receipt: r,
// NOTE: we do not include the transaction at this point, as we don't have it.
// transaction: txn,
Final: l.isBlockFinal(r.BlockNumber),
}

// will always find the receipt, as it will be in our case previously found above.
// this is called so we can broadcast the match to the filterer's subscriber.
_, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt})
if err != nil {
l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err))
// Don't return error, just log and continue
return nil
}

return nil
})
}

// Wait for all goroutines, but we're not propagating errors since we just log them
_ = g.Wait()
return nil
}

Expand Down Expand Up @@ -871,26 +939,35 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool {
}

func (l *ReceiptsListener) latestBlockNum() *big.Int {
// return immediately if the monitor has a latest block number
latestBlockNum := l.monitor.LatestBlockNum()
if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 {
err := l.br.Do(l.ctx, func() error {
block, err := l.provider.BlockByNumber(context.Background(), nil)
if err != nil {
return err
}
latestBlockNum = block.Number()
return nil
})
if err != nil || latestBlockNum == nil {
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
return latestBlockNum
}

// wait until monitor has a block num for us, up to a certain amount of time
maxWaitTime := 30 * time.Second
period := 250 * time.Millisecond
for {
time.Sleep(period)

latestBlockNum := l.monitor.LatestBlockNum()
if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 {
return latestBlockNum
}

maxWaitTime -= period
if maxWaitTime <= 0 {
l.log.Error("ethreceipts: latestBlockNum: monitor has no latest block number after waiting, returning 0")
return big.NewInt(0)
}
return latestBlockNum
}
return latestBlockNum
}

func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) {
var chainID *big.Int

// provide plenty of time for breaker to succeed
err := breaker.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
Expand All @@ -901,7 +978,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error
}
chainID = id
return nil
}, nil, 1*time.Second, 2, 3)
}, nil, 1*time.Second, 2, 10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to return an error, make sure we allow plenty of time for retrials.


if err != nil {
return nil, err
Expand Down
Loading
Loading