From 4ba6a343f0ecc0f535682dbf33f495ec4d94db95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Fri, 10 Oct 2025 11:38:55 -0600 Subject: [PATCH 01/11] ethreceipts: improve search and receipt fetching with flaky providers simplify, clean-up --- ethreceipts/ethreceipts.go | 226 ++++++++---- ethreceipts/ethreceipts_test.go | 599 +++++++++++++++++++++++++++++++- ethreceipts/filterer.go | 27 +- ethreceipts/subscription.go | 313 +++++++++++++++-- ethrpc/ethrpc.go | 27 +- ethrpc/option.go | 2 +- 6 files changed, 1083 insertions(+), 111 deletions(-) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index 5ed1202c..5ba0594b 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -28,14 +28,20 @@ import ( ) var DefaultOptions = Options{ - MaxConcurrentFetchReceiptWorkers: 100, - MaxConcurrentFilterWorkers: 50, - PastReceiptsCacheSize: 5_000, - NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality - FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed - Alerter: util.NoopAlerter(), + MaxConcurrentFetchReceiptWorkers: 100, + MaxConcurrentFilterWorkers: 200, + MaxConcurrentSearchOnChainWorkers: 5, + LatestBlockNumCacheTTL: 2 * time.Second, + PastReceiptsCacheSize: 5_000, + NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality + FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed + Alerter: util.NoopAlerter(), } +const ( + maxFiltersPerListener = 1000 +) + type Options struct { // .. MaxConcurrentFetchReceiptWorkers int @@ -43,6 +49,14 @@ type Options struct { // .. MaxConcurrentFilterWorkers int + // MaxConcurrentSearchOnChainWorkers is the maximum amount of concurrent + // on-chain searches (this is per subscriber) + MaxConcurrentSearchOnChainWorkers int + + // LatestBlockNumCacheTTL is the duration to cache the latest block number, + // this prevents frequent calls to the monitor for latest block number + LatestBlockNumCacheTTL time.Duration + // .. PastReceiptsCacheSize int @@ -95,6 +109,9 @@ type ReceiptsListener struct { ctxStop context.CancelFunc running int32 mu sync.RWMutex + + latestBlockNumCache atomic.Uint64 + latestBlockNumTime atomic.Int64 } var ( @@ -137,18 +154,21 @@ func NewReceiptsListener(log *slog.Logger, provider ethrpc.Interface, monitor *e return nil, err } + // max ~12s total wait time before giving up + br := breaker.New(log, 200*time.Millisecond, 1.2, 20) + return &ReceiptsListener{ options: opts, log: log, alert: opts.Alerter, provider: provider, monitor: monitor, - br: breaker.New(log, 1*time.Second, 2, 4), // max 4 retries + br: br, fetchSem: make(chan struct{}, opts.MaxConcurrentFetchReceiptWorkers), pastReceipts: pastReceipts, notFoundTxnHashes: notFoundTxnHashes, subscribers: make([]*subscriber, 0), - registerFiltersCh: make(chan registerFilters, 1000), + registerFiltersCh: make(chan registerFilters, maxFiltersPerListener), filterSem: make(chan struct{}, opts.MaxConcurrentFilterWorkers), }, nil } @@ -160,7 +180,7 @@ func (l *ReceiptsListener) lazyInit(ctx context.Context) error { var err error l.chainID, err = getChainID(ctx, l.provider) if err != nil { - l.chainID = big.NewInt(1) // assume mainnet in case of unlikely error + return fmt.Errorf("ethreceipts: failed to get chainID from provider: %w", err) } if l.options.NumBlocksToFinality <= 0 { @@ -190,6 +210,7 @@ func (l *ReceiptsListener) Run(ctx context.Context) error { defer atomic.StoreInt32(&l.running, 0) if err := l.lazyInit(ctx); err != nil { + slog.Error("ethreceipts: lazyInit failed", slog.String("error", err.Error())) return err } @@ -435,13 +456,17 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context // it indicates that we have high conviction that the receipt should be available, as the monitor has found // this transaction hash. func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) { - l.fetchSem <- struct{}{} - resultCh := make(chan *types.Receipt) - errCh := make(chan error) + // TODO: this might block forever if all workers are busy, should we have a + // timeout? + l.fetchSem <- struct{}{} - defer close(resultCh) - defer close(errCh) + // channels to receive result or error: it could be difficult to coordinate + // closing them manually because we're also selecting on ctx.Done(), so we + // use buffered channels of size 1 and let them be garbage collected after + // this function returns. + resultCh := make(chan *types.Receipt, 1) + errCh := make(chan error, 1) go func() { defer func() { @@ -484,22 +509,19 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash defer clearTimeout() receipt, err := l.provider.TransactionReceipt(tctx, txnHash) - - if !forceFetch && errors.Is(err, ethereum.NotFound) { - // record the blockNum, maybe this receipt is just too new and nodes are telling - // us they can't find it yet, in which case we will rely on the monitor to - // clear this flag for us. - l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex)) - l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum) - errCh <- err - return nil - } else if forceFetch && receipt == nil { - // force fetch, lets retry a number of times as the node may end up finding the receipt. - // txn has been found in the monitor with event added, but still haven't retrived the receipt. - // this could be that we're too fast and node isn't returning the receipt yet. - return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s", txnHash) - } if err != nil { + if !forceFetch && errors.Is(err, ethereum.NotFound) { + // record the blockNum, maybe this receipt is just too new and nodes are telling + // us they can't find it yet, in which case we will rely on the monitor to + // clear this flag for us. + l.log.Debug(fmt.Sprintf("fetchTransactionReceipt(%s) receipt not found -- flagging in notFoundTxnHashes cache", txnHashHex)) + l.notFoundTxnHashes.Set(ctx, txnHashHex, latestBlockNum) + errCh <- err + return nil + } + if forceFetch { + return fmt.Errorf("forceFetch enabled, but failed to fetch receipt %s: %w", txnHash, err) + } return superr.Wrap(fmt.Errorf("failed to fetch receipt %s", txnHash), err) } @@ -647,9 +669,9 @@ func (l *ReceiptsListener) listener() error { if reorg { for _, list := range filters { for _, filterer := range list { - if f, _ := filterer.(*filter); f != nil { - f.startBlockNum = latestBlockNum - f.lastMatchBlockNum = 0 + if f, ok := filterer.(*filter); ok { + f.setStartBlockNum(latestBlockNum) + f.setLastMatchBlockNum(0) } } } @@ -666,12 +688,12 @@ func (l *ReceiptsListener) listener() error { for y, matched := range list { filterer := filters[x][y] if matched || filterer.StartBlockNum() == 0 { - if f, _ := filterer.(*filter); f != nil { - if f.startBlockNum == 0 { - f.startBlockNum = latestBlockNum + if f, ok := filterer.(*filter); ok { + if f.StartBlockNum() == 0 { + f.setStartBlockNum(latestBlockNum) } if matched { - f.lastMatchBlockNum = latestBlockNum + f.setLastMatchBlockNum(latestBlockNum) } } } else { @@ -693,10 +715,8 @@ func (l *ReceiptsListener) listener() error { subscriber := subscribers[x] subscriber.RemoveFilter(filterer) - select { - case <-f.Exhausted(): - default: - close(f.exhausted) + if f, ok := filterer.(*filter); ok { + f.closeExhausted() } } } @@ -772,14 +792,20 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [ // match the receipts against the filters var wg sync.WaitGroup for i, sub := range subscribers { - wg.Add(1) l.filterSem <- struct{}{} + + wg.Add(1) go func(i int, sub *subscriber) { defer func() { <-l.filterSem wg.Done() }() + // retry pending receipts first + retryCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + sub.retryPendingReceipts(retryCtx) + cancel() + // filter matcher matched, err := sub.matchFilters(l.ctx, filterers[i], receipts) if err != nil { @@ -801,6 +827,14 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [ } func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber *subscriber, filterers []Filterer) error { + // Collect eligible filters first + type filterWithHash struct { + filterer Filterer + txnHash common.Hash + } + + eligible := make([]filterWithHash, 0, len(filterers)) + for _, filterer := range filterers { if !filterer.Options().SearchOnChain { // skip filters which do not ask to search on chain @@ -813,34 +847,67 @@ func (l *ReceiptsListener) searchFilterOnChain(ctx context.Context, subscriber * continue } - r, err := l.fetchTransactionReceipt(ctx, *txnHashCond, false) - if !errors.Is(err, ethereum.NotFound) && err != nil { - l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err)) - } - if r == nil { - // unable to find the receipt on-chain, lets continue - continue - } + eligible = append(eligible, filterWithHash{filterer, *txnHashCond}) + } - if f, ok := filterer.(*filter); ok { - f.lastMatchBlockNum = r.BlockNumber.Uint64() - } + if len(eligible) == 0 { + return nil + } - receipt := Receipt{ - receipt: r, - // NOTE: we do not include the transaction at this point, as we don't have it. - // transaction: txn, - Final: l.isBlockFinal(r.BlockNumber), - } + // Process in batches with bounded concurrency using errgroup + sem := make(chan struct{}, l.options.MaxConcurrentSearchOnChainWorkers) + g, gctx := errgroup.WithContext(ctx) - // will always find the receipt, as it will be in our case previously found above. - // this is called so we can broadcast the match to the filterer's subscriber. - _, err = subscriber.matchFilters(ctx, []Filterer{filterer}, []Receipt{receipt}) - if err != nil { - l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err)) - } + for _, item := range eligible { + item := item // capture loop variable + + g.Go(func() error { + select { + case sem <- struct{}{}: + defer func() { + <-sem + }() + case <-gctx.Done(): + return gctx.Err() + } + + r, err := l.fetchTransactionReceipt(gctx, item.txnHash, false) + if !errors.Is(err, ethereum.NotFound) && err != nil { + l.log.Error(fmt.Sprintf("searchFilterOnChain fetchTransactionReceipt failed: %v", err)) + // Don't return error, just log and continue with other filters + return nil + } + if r == nil { + // unable to find the receipt on-chain, lets continue + return nil + } + + if f, ok := item.filterer.(*filter); ok { + f.setLastMatchBlockNum(r.BlockNumber.Uint64()) + } + + receipt := Receipt{ + receipt: r, + // NOTE: we do not include the transaction at this point, as we don't have it. + // transaction: txn, + Final: l.isBlockFinal(r.BlockNumber), + } + + // will always find the receipt, as it will be in our case previously found above. + // this is called so we can broadcast the match to the filterer's subscriber. + _, err = subscriber.matchFilters(gctx, []Filterer{item.filterer}, []Receipt{receipt}) + if err != nil { + l.log.Error(fmt.Sprintf("searchFilterOnChain matchFilters failed: %v", err)) + // Don't return error, just log and continue + return nil + } + + return nil + }) } + // Wait for all goroutines, but we're not propagating errors since we just log them + _ = g.Wait() return nil } @@ -871,10 +938,34 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool { } func (l *ReceiptsListener) latestBlockNum() *big.Int { + // Cache latest block number to avoid hammering monitor.LatestBlockNum() + cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0) + if time.Since(cachedTime) < l.options.LatestBlockNumCacheTTL { + cachedNum := l.latestBlockNumCache.Load() + if cachedNum > 0 { + return big.NewInt(int64(cachedNum)) + } + } + + l.latestBlockNumTime.Store(time.Now().Unix()) + + latestBlockNum := l.fetchLatestBlockNum() + if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 { + l.latestBlockNumCache.Store(latestBlockNum.Uint64()) + } + + return latestBlockNum +} + +func (l *ReceiptsListener) fetchLatestBlockNum() *big.Int { + timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second) + defer cancel() + latestBlockNum := l.monitor.LatestBlockNum() + if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 { err := l.br.Do(l.ctx, func() error { - block, err := l.provider.BlockByNumber(context.Background(), nil) + block, err := l.provider.BlockByNumber(timeoutCtx, nil) if err != nil { return err } @@ -886,11 +977,14 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int { } return latestBlockNum } + return latestBlockNum } func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { var chainID *big.Int + + // provide plenty of time for breaker to succeed err := breaker.Do(ctx, func() error { ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() @@ -901,7 +995,7 @@ func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error } chainID = id return nil - }, nil, 1*time.Second, 2, 3) + }, nil, 1*time.Second, 2, 10) if err != nil { return nil, err diff --git a/ethreceipts/ethreceipts_test.go b/ethreceipts/ethreceipts_test.go index dd6a8091..1952d12e 100644 --- a/ethreceipts/ethreceipts_test.go +++ b/ethreceipts/ethreceipts_test.go @@ -6,6 +6,8 @@ import ( "fmt" "log/slog" "math/big" + "math/rand" + "net/http" "os" "sync" "sync/atomic" @@ -15,6 +17,7 @@ import ( "github.com/0xsequence/ethkit" "github.com/0xsequence/ethkit/ethmonitor" "github.com/0xsequence/ethkit/ethreceipts" + "github.com/0xsequence/ethkit/ethrpc" "github.com/0xsequence/ethkit/ethtest" "github.com/0xsequence/ethkit/ethtxn" "github.com/0xsequence/ethkit/go-ethereum/common" @@ -24,13 +27,63 @@ import ( ) var ( - testchain *ethtest.Testchain - log *slog.Logger + testchain *ethtest.Testchain + testchainOptions ethtest.TestchainOptions + + log *slog.Logger ) +type flakyRoundTripper struct { + started time.Time + rt http.RoundTripper + failureRate float32 + failures uint64 + times uint64 +} + +func newFlakyRoundTripper(rt http.RoundTripper, failureRate float32) *flakyRoundTripper { + return &flakyRoundTripper{ + rt: rt, + started: time.Now(), + failureRate: failureRate, + } +} + +func (f *flakyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + times := atomic.AddUint64(&f.times, 1) + + // Introduce forced delay + delay := time.Duration(rand.Intn(200)) * time.Millisecond + time.Sleep(delay) + + if rand.Float32() < f.failureRate { + failures := atomic.AddUint64(&f.failures, 1) + return nil, fmt.Errorf("round trip network error. failed %d times out of %d", failures, times) + } + + // Proceed with the actual request + return f.rt.RoundTrip(req) +} + +func newProvider(t *testing.T) *ethrpc.Provider { + provider, err := ethrpc.NewProvider(testchainOptions.NodeURL) + require.NoError(t, err) + + return provider +} + +func newFlakyHTTPClient(failureRate float32) *http.Client { + return &http.Client{ + Timeout: 10 * time.Second, + Transport: newFlakyRoundTripper(http.DefaultTransport, failureRate), + } +} + func init() { + testchainOptions = ethtest.DefaultTestchainOptions + var err error - testchain, err = ethtest.NewTestchain() + testchain, err = ethtest.NewTestchain(testchainOptions) if err != nil { panic(err) } @@ -58,7 +111,7 @@ func TestFetchTransactionReceiptBasic(t *testing.T) { provider := testchain.Provider monitorOptions := ethmonitor.DefaultOptions - // monitorOptions.Logger = log + //monitorOptions.Logger = log monitorOptions.WithLogs = true monitorOptions.BlockRetentionLimit = 1000 @@ -623,3 +676,541 @@ loop: require.Equal(t, matchedCount, len(erc20Receipts)*2) } + +func TestFiltersAddDeadlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := testchain.Provider + + monitorOptions := ethmonitor.DefaultOptions + monitorOptions.WithLogs = true + monitorOptions.BlockRetentionLimit = 100 + + monitor, err := ethmonitor.NewMonitor(provider, monitorOptions) + assert.NoError(t, err) + + go func() { + err := monitor.Run(ctx) + if err != nil { + t.Error(err) + } + }() + + listenerOptions := ethreceipts.DefaultOptions + listenerOptions.NumBlocksToFinality = 10 + + receiptsListener, err := ethreceipts.NewReceiptsListener(log, provider, monitor, listenerOptions) + assert.NoError(t, err) + + // Don't start the listener's Run() to make registerFiltersCh not be consumed + // This simulates a slow consumer scenario + + deadlockDetected := make(chan bool, 1) + + go func() { + // Wait 5 minutes before assuming deadlock + time.Sleep(300 * time.Second) + + select { + case deadlockDetected <- true: + default: + } + }() + + // Create many subscribers that will all try to add filters + var wg sync.WaitGroup + + // First, fill up the registerFiltersCh buffer (capacity 1000) + sub := receiptsListener.Subscribe() + for i := 0; i < 1001; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + // This should block on the 1001st call while holding s.mu + hash := ethkit.Hash{byte(i / 256), byte(i % 256)} + sub.AddFilter(ethreceipts.FilterTxnHash(hash)) + }(i) + } + + // Now try to access the subscriber's filters from another goroutine + // This should deadlock if AddFilter is stuck holding the lock + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + + // This will try to acquire s.mu.Lock() + filters := sub.Filters() + t.Logf("Got %d filters", len(filters)) + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + t.Log("Test completed without deadlock") + case <-deadlockDetected: + t.Fatal("Deadlock detected - AddFilter blocked while holding lock") + } +} + +func TestFlakyProvider(t *testing.T) { + t.Run("Wait for txn receipts with a healthy monitor and a healthy provider", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + goodProvider := newProvider(t) + + monitorOptions := ethmonitor.DefaultOptions + monitorOptions.WithLogs = true + + // monitor running with a good provider + monitor, err := ethmonitor.NewMonitor(goodProvider, monitorOptions) + require.NoError(t, err) + + go func() { + err := monitor.Run(ctx) + require.NoError(t, err) + }() + + listenerOptions := ethreceipts.DefaultOptions + listenerOptions.FilterMaxWaitNumBlocks = 1 + + // receipts listener running with a healthy provider initially + receiptsListener, err := ethreceipts.NewReceiptsListener(log, goodProvider, monitor, listenerOptions) + require.NoError(t, err) + + go func() { + err := receiptsListener.Run(ctx) + require.NoError(t, err) + }() + + // Wait for services to be ready + time.Sleep(2 * time.Second) + + walletA, _ := testchain.DummyWallet(1) + testchain.MustFundAddress(walletA.Address()) + + walletB, _ := testchain.DummyWallet(uint64(rand.Int63n(1000))) + walletBAddress := walletB.Address() + + nonce, err := walletA.GetNonce(ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + + for i := 0; i < 50; i++ { + txr := ðtxn.TransactionRequest{ + To: &walletBAddress, + ETHValue: ethtest.ETHValue(0.01), + GasLimit: 120_000, + Nonce: big.NewInt(int64(nonce + uint64(i))), + } + + signedTxn, err := walletA.NewTransaction(ctx, txr) + require.NoError(t, err) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + // Delay sending the transaction to ensure the subscriber is ready + time.Sleep(2 * time.Second) + + // Using the trusted provider here to ensure txn is sent + ethtxn.SendTransaction(ctx, goodProvider, signedTxn) + require.NoError(t, err) + }(signedTxn) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + receiptsFilter := ethreceipts.FilterTxnHash( + signedTxn.Hash(), + ) + + sub := receiptsListener.Subscribe( + receiptsFilter, + ) + defer sub.Unsubscribe() + + start := time.Now() + select { + case <-ctx.Done(): + t.Fatalf("Context done: %v", ctx.Err()) + case <-sub.Done(): + t.Fatal("Subscription closed unexpectedly") + case receipt := <-sub.TransactionReceipt(): + activeSubs := receiptsListener.NumSubscribers() + t.Logf("Filter matched txn %s after %s, active subs: %d", signedTxn.Hash().String(), time.Since(start), activeSubs) + require.Equal(t, signedTxn.Hash(), receipt.TransactionHash()) + require.Equal(t, uint64(1), receipt.Status()) + case <-time.After(300 * time.Second): + t.Fatal("Timeout waiting for filter to match txn") + } + }(signedTxn) + + } + + t.Logf("Waiting for all goroutines to complete...") + wg.Wait() + }) + + t.Run("Wait for txn receipts with a flaky monitor and a healthy provider", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + goodProvider = newProvider(t) + flakyProvider = newProvider(t) + ) + + monitorOptions := ethmonitor.DefaultOptions + monitorOptions.WithLogs = true + + // monitor running with a flaky provider + monitor, err := ethmonitor.NewMonitor(flakyProvider, monitorOptions) + require.NoError(t, err) + + go func() { + err := monitor.Run(ctx) + require.NoError(t, err) + }() + + listenerOptions := ethreceipts.DefaultOptions + listenerOptions.FilterMaxWaitNumBlocks = 1 + + // receipts listener running with a healthy provider initially + receiptsListener, err := ethreceipts.NewReceiptsListener(log, goodProvider, monitor, listenerOptions) + require.NoError(t, err) + + go func() { + err := receiptsListener.Run(ctx) + require.NoError(t, err) + }() + + // Wait for services to be ready + time.Sleep(2 * time.Second) + + // Replace provider's HTTP client with a flaky one + t.Logf("Setting provider to flaky state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(1.0)) + + go func() { + // After 20 seconds, restore the provider to more reliable state + time.Sleep(20 * time.Second) + t.Logf("Restoring provider to healthy state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(0.0)) + }() + + walletA, _ := testchain.DummyWallet(1) + testchain.MustFundAddress(walletA.Address()) + + walletB, _ := testchain.DummyWallet(uint64(rand.Int63n(1000))) + walletBAddress := walletB.Address() + + nonce, err := walletA.GetNonce(ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Add a bunch of subscribers that will each send a txn and wait for it + for i := 0; i < 50; i++ { + txr := ðtxn.TransactionRequest{ + To: &walletBAddress, + ETHValue: ethtest.ETHValue(0.01), + GasLimit: 120_000, + Nonce: big.NewInt(int64(nonce + uint64(i))), + } + + signedTxn, err := walletA.NewTransaction(ctx, txr) + require.NoError(t, err) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + // Delay sending the transaction to ensure the subscriber is ready + time.Sleep(2 * time.Second) + + // Using the trusted provider here to ensure txn is sent + ethtxn.SendTransaction(ctx, goodProvider, signedTxn) + require.NoError(t, err) + }(signedTxn) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + receiptsFilter := ethreceipts.FilterTxnHash( + signedTxn.Hash(), + ) + + sub := receiptsListener.Subscribe( + receiptsFilter, + ) + defer sub.Unsubscribe() + + start := time.Now() + select { + case <-ctx.Done(): + t.Fatalf("Context done: %v", ctx.Err()) + case <-sub.Done(): + t.Fatal("Subscription closed unexpectedly") + case receipt := <-sub.TransactionReceipt(): + activeSubs := receiptsListener.NumSubscribers() + t.Logf("Filter matched txn %s after %s, active subs: %d", signedTxn.Hash().String(), time.Since(start), activeSubs) + require.Equal(t, signedTxn.Hash(), receipt.TransactionHash()) + require.Equal(t, uint64(1), receipt.Status()) + case <-time.After(300 * time.Second): + t.Fatal("Timeout waiting for filter to match txn") + } + }(signedTxn) + + } + + t.Logf("Waiting for all goroutines to complete...") + wg.Wait() + }) + + t.Run("Wait for txn receipts with a healthy monitor and a flaky provider", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + goodProvider = newProvider(t) + flakyProvider = newProvider(t) + ) + + monitorOptions := ethmonitor.DefaultOptions + monitorOptions.WithLogs = true + + // monitor running with a good provider + monitor, err := ethmonitor.NewMonitor(goodProvider, monitorOptions) + require.NoError(t, err) + + go func() { + err := monitor.Run(ctx) + require.NoError(t, err) + }() + + listenerOptions := ethreceipts.DefaultOptions + listenerOptions.FilterMaxWaitNumBlocks = 1 + + // receipts listener running with a healthy provider initially + receiptsListener, err := ethreceipts.NewReceiptsListener(log, flakyProvider, monitor, listenerOptions) + require.NoError(t, err) + + go func() { + err := receiptsListener.Run(ctx) + require.NoError(t, err) + }() + + // Wait for services to be ready + time.Sleep(2 * time.Second) + + // Replace provider's HTTP client with a flaky one + t.Logf("Setting provider to flaky state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(1.0)) + + go func() { + // After 20 seconds, restore the provider to more reliable state + time.Sleep(20 * time.Second) + t.Logf("Restoring provider to healthy state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(0.0)) + }() + + walletA, _ := testchain.DummyWallet(1) + testchain.MustFundAddress(walletA.Address()) + + walletB, _ := testchain.DummyWallet(uint64(rand.Int63n(1000))) + walletBAddress := walletB.Address() + + nonce, err := walletA.GetNonce(ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Add a bunch of subscribers that will each send a txn and wait for it + for i := 0; i < 50; i++ { + txr := ðtxn.TransactionRequest{ + To: &walletBAddress, + ETHValue: ethtest.ETHValue(0.01), + GasLimit: 120_000, + Nonce: big.NewInt(int64(nonce + uint64(i))), + } + + signedTxn, err := walletA.NewTransaction(ctx, txr) + require.NoError(t, err) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + // Delay sending the transaction to ensure the subscriber is ready + time.Sleep(2 * time.Second) + + // Using the trusted provider here to ensure txn is sent + ethtxn.SendTransaction(ctx, goodProvider, signedTxn) + require.NoError(t, err) + }(signedTxn) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + receiptsFilter := ethreceipts.FilterTxnHash( + signedTxn.Hash(), + ) + + sub := receiptsListener.Subscribe( + receiptsFilter, + ) + defer sub.Unsubscribe() + + start := time.Now() + select { + case <-ctx.Done(): + t.Fatalf("Context done: %v", ctx.Err()) + case <-sub.Done(): + t.Fatal("Subscription closed unexpectedly") + case receipt := <-sub.TransactionReceipt(): + activeSubs := receiptsListener.NumSubscribers() + t.Logf("Filter matched txn %s after %s, active subs: %d", signedTxn.Hash().String(), time.Since(start), activeSubs) + require.Equal(t, signedTxn.Hash(), receipt.TransactionHash()) + require.Equal(t, uint64(1), receipt.Status()) + case <-time.After(300 * time.Second): + t.Fatal("Timeout waiting for filter to match txn") + } + }(signedTxn) + + } + + t.Logf("Waiting for all goroutines to complete...") + wg.Wait() + }) + + t.Run("Wait for txn receipts with a flaky monitor and a flaky provider", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + flakyProvider = newProvider(t) + goodProvider = newProvider(t) // for sending transactions only + ) + + monitorOptions := ethmonitor.DefaultOptions + monitorOptions.WithLogs = true + + // monitor running with a good provider + monitor, err := ethmonitor.NewMonitor(flakyProvider, monitorOptions) + require.NoError(t, err) + + go func() { + err := monitor.Run(ctx) + require.NoError(t, err) + }() + + listenerOptions := ethreceipts.DefaultOptions + listenerOptions.FilterMaxWaitNumBlocks = 1 + + // receipts listener running with a healthy provider initially + receiptsListener, err := ethreceipts.NewReceiptsListener(log, flakyProvider, monitor, listenerOptions) + require.NoError(t, err) + + go func() { + err := receiptsListener.Run(ctx) + require.NoError(t, err) + }() + + // Wait for services to be ready + time.Sleep(2 * time.Second) + + // Replace provider's HTTP client with a flaky one + t.Logf("Setting provider to flaky state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(1.0)) + + go func() { + // After 20 seconds, restore the provider to more reliable state + time.Sleep(20 * time.Second) + t.Logf("Restoring provider to healthy state") + flakyProvider.SetHTTPClient(newFlakyHTTPClient(0.0)) + }() + + walletA, _ := testchain.DummyWallet(1) + testchain.MustFundAddress(walletA.Address()) + + walletB, _ := testchain.DummyWallet(uint64(rand.Int63n(1000))) + walletBAddress := walletB.Address() + + nonce, err := walletA.GetNonce(ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Add a bunch of subscribers that will each send a txn and wait for it + for i := 0; i < 50; i++ { + txr := ðtxn.TransactionRequest{ + To: &walletBAddress, + ETHValue: ethtest.ETHValue(0.01), + GasLimit: 120_000, + Nonce: big.NewInt(int64(nonce + uint64(i))), + } + + signedTxn, err := walletA.NewTransaction(ctx, txr) + require.NoError(t, err) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + // Delay sending the transaction to ensure the subscriber is ready + time.Sleep(2 * time.Second) + + // Using the trusted provider here to ensure txn is sent + ethtxn.SendTransaction(ctx, goodProvider, signedTxn) + require.NoError(t, err) + }(signedTxn) + + wg.Add(1) + go func(signedTxn *types.Transaction) { + defer wg.Done() + + receiptsFilter := ethreceipts.FilterTxnHash( + signedTxn.Hash(), + ) + + sub := receiptsListener.Subscribe( + receiptsFilter, + ) + defer sub.Unsubscribe() + + start := time.Now() + select { + case <-ctx.Done(): + t.Fatalf("Context done: %v", ctx.Err()) + case <-sub.Done(): + t.Fatal("Subscription closed unexpectedly") + case receipt := <-sub.TransactionReceipt(): + activeSubs := receiptsListener.NumSubscribers() + t.Logf("Filter matched txn %s after %s, active subs: %d", signedTxn.Hash().String(), time.Since(start), activeSubs) + require.Equal(t, signedTxn.Hash(), receipt.TransactionHash()) + require.Equal(t, uint64(1), receipt.Status()) + case <-time.After(300 * time.Second): + t.Fatal("Timeout waiting for filter to match txn") + } + }(signedTxn) + + } + + t.Logf("Waiting for all goroutines to complete...") + wg.Wait() + }) +} diff --git a/ethreceipts/filterer.go b/ethreceipts/filterer.go index efe487e3..959c6a27 100644 --- a/ethreceipts/filterer.go +++ b/ethreceipts/filterer.go @@ -2,6 +2,8 @@ package ethreceipts import ( "context" + "sync" + "sync/atomic" "github.com/0xsequence/ethkit" "github.com/0xsequence/ethkit/go-ethereum/core/types" @@ -159,13 +161,14 @@ type filter struct { cond FilterCond // startBlockNum is the first block number observed once filter is active - startBlockNum uint64 + startBlockNum atomic.Uint64 // lastMatchBlockNum is the block number where a last match occured - lastMatchBlockNum uint64 + lastMatchBlockNum atomic.Uint64 // exhausted signals if the filter hit MaxWait - exhausted chan struct{} + exhausted chan struct{} + exhaustedOnce sync.Once } var ( @@ -254,11 +257,25 @@ func (f *filter) Match(ctx context.Context, receipt Receipt) (bool, error) { } func (f *filter) StartBlockNum() uint64 { - return f.startBlockNum + return f.startBlockNum.Load() } func (f *filter) LastMatchBlockNum() uint64 { - return f.lastMatchBlockNum + return f.lastMatchBlockNum.Load() +} + +func (f *filter) setStartBlockNum(num uint64) { + f.startBlockNum.Store(num) +} + +func (f *filter) setLastMatchBlockNum(num uint64) { + f.lastMatchBlockNum.Store(num) +} + +func (f *filter) closeExhausted() { + f.exhaustedOnce.Do(func() { + close(f.exhausted) + }) } func (f *filter) Exhausted() <-chan struct{} { diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index ef22ef36..0b6e3bde 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -2,12 +2,32 @@ package ethreceipts import ( "context" + "errors" "fmt" "math/big" "sync" + "time" + "github.com/0xsequence/ethkit/go-ethereum" + "github.com/0xsequence/ethkit/go-ethereum/common" "github.com/goware/channel" "github.com/goware/superr" + "golang.org/x/sync/errgroup" +) + +const ( + maxConcurrentReceiptFetches = 10 + maxConcurrentReceiptRetries = 10 + + // After this many attempts, we give up retrying a receipt fetch. + maxReceiptRetryAttempts = 20 + + // Maximum number of pending receipts to track for retries. + maxPendingReceipts = 5000 +) + +var ( + maxWaitBetweenRetries = 5 * time.Minute ) type Subscription interface { @@ -23,6 +43,13 @@ type Subscription interface { var _ Subscription = &subscriber{} +type pendingReceipt struct { + receipt Receipt + filterer Filterer + attempts int + nextRetryAt time.Time +} + type subscriber struct { listener *ReceiptsListener ch channel.Channel[Receipt] @@ -31,6 +58,9 @@ type subscriber struct { filters []Filterer finalizer *finalizer mu sync.Mutex + + pendingReceipts map[common.Hash]*pendingReceipt + retryMu sync.Mutex } type registerFilters struct { @@ -63,9 +93,6 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) { return } - s.mu.Lock() - defer s.mu.Unlock() - filters := make([]Filterer, len(filterQueries)) for i, query := range filterQueries { filterer, ok := query.(Filterer) @@ -75,10 +102,27 @@ func (s *subscriber) AddFilter(filterQueries ...FilterQuery) { filters[i] = filterer } + s.mu.Lock() + + if len(s.filters)+len(filters) > maxFiltersPerListener { + // too many filters, ignore the extra filter. not ideal, but better than + // deadlocking + s.listener.log.Warn(fmt.Sprintf("ethreceipts: subscriber has too many filters (%d), ignoring extra", len(s.filters)+len(filters))) + // TODO: maybe return an error or force-unsubscribe instead? + s.mu.Unlock() + return + } + s.filters = append(s.filters, filters...) + s.mu.Unlock() // TODO: maybe add non-blocking push structure like in relayer queue - s.listener.registerFiltersCh <- registerFilters{subscriber: s, filters: filters} + select { + case s.listener.registerFiltersCh <- registerFilters{subscriber: s, filters: filters}: + // ok + default: + s.listener.log.Warn("ethreceipts: listener registerFiltersCh full, dropping filter register") + } } func (s *subscriber) RemoveFilter(filter Filterer) { @@ -102,6 +146,15 @@ func (s *subscriber) ClearFilters() { func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, receipts []Receipt) ([]bool, error) { oks := make([]bool, len(filterers)) + // Collect matches that need receipt fetching + type matchedReceipt struct { + receipt Receipt + filtererIdx int + filterer Filterer + } + var toFetch []matchedReceipt + + // First pass: find all matches for _, receipt := range receipts { for i, filterer := range filterers { matched, err := filterer.Match(ctx, receipt) @@ -116,25 +169,56 @@ func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, rec // its a match oks[i] = true - receipt := receipt // copy - receipt.Filter = filterer - // fetch transaction receipt if its not been marked as reorged if !receipt.Reorged { - r, err := s.listener.fetchTransactionReceipt(ctx, receipt.TransactionHash(), true) - if err != nil { - // TODO: is this fine to return error..? its a bit abrupt. - // Options are to set FailedFetch bool on the Receipt, and still send to s.ch, - // or just log the error and continue to the next receipt - return oks, superr.Wrap(fmt.Errorf("failed to fetch txn %s receipt", receipt.TransactionHash()), err) + toFetch = append(toFetch, matchedReceipt{ + receipt: receipt, + filtererIdx: i, + filterer: filterer, + }) + } + } + } + + if len(toFetch) == 0 { + return oks, nil + } + + // Fetch receipts concurrently + sem := make(chan struct{}, maxConcurrentReceiptFetches) + g, gctx := errgroup.WithContext(ctx) + + for _, item := range toFetch { + item := item // capture loop variable + g.Go(func() error { + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-gctx.Done(): + return gctx.Err() + } + + // Fetch transaction receipt + r, err := s.listener.fetchTransactionReceipt(gctx, item.receipt.TransactionHash(), true) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + // not found, don't retry + return superr.Wrap(fmt.Errorf("txn %s not found", item.receipt.TransactionHash()), err) } - receipt.receipt = r - receipt.logs = r.Logs + + // might be a provider issue, add to pending receipts for retry + s.addPendingReceipt(item.receipt, item.filterer) + return superr.Wrap(fmt.Errorf("failed to fetch txn %s receipt", item.receipt.TransactionHash()), err) } + // Update receipt with fetched data + item.receipt.receipt = r + item.receipt.logs = r.Logs + item.receipt.Filter = item.filterer + // Finality enqueue if filter asked to Finalize, and receipt isn't already final - if !receipt.Reorged && !receipt.Final && filterer.Options().Finalize { - s.finalizer.enqueue(filterer.FilterID(), receipt, receipt.BlockNumber()) + if !item.receipt.Final && item.filterer.Options().Finalize { + s.finalizer.enqueue(item.filterer.FilterID(), item.receipt, item.receipt.BlockNumber()) } // LimitOne will auto unsubscribe now if were not also waiting for finalizer, @@ -144,20 +228,27 @@ func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, rec // because its possible that it can reorg and we have to fetch it again after being re-mined. // So we only remove the filter now if the filter finalizer isn't used, otherwise the // finalizer will remove the LimitOne filter - toFinalize := filterer.Options().Finalize && !receipt.Final - if !receipt.Reorged && filterer.Options().LimitOne && !toFinalize { - s.RemoveFilter(receipt.Filter) + toFinalize := item.filterer.Options().Finalize && !item.receipt.Final + if item.filterer.Options().LimitOne && !toFinalize { + s.RemoveFilter(item.receipt.Filter) } // Check if receipt is already final, in case comes from cache when // previously final was not toggled. - if s.listener.isBlockFinal(receipt.BlockNumber()) { - receipt.Final = true + if s.listener.isBlockFinal(item.receipt.BlockNumber()) { + item.receipt.Final = true } - // Broadcast to subscribers - s.ch.Send(receipt) - } + // Broadcast to subscribers (needs mutex as multiple goroutines may send) + s.ch.Send(item.receipt) + + return nil + }) + } + + // Wait for all fetches to complete + if err := g.Wait(); err != nil { + return oks, err } return oks, nil @@ -198,3 +289,175 @@ func (s *subscriber) finalizeReceipts(blockNum *big.Int) error { return nil } + +func (s *subscriber) addPendingReceipt(receipt Receipt, filterer Filterer) { + s.retryMu.Lock() + defer s.retryMu.Unlock() + + txHash := receipt.TransactionHash() + + if s.pendingReceipts == nil { + // lazy init + s.pendingReceipts = make(map[common.Hash]*pendingReceipt) + } + + if len(s.pendingReceipts) >= maxPendingReceipts { + s.listener.log.Error( + "Pending receipts queue is full, dropping new receipt", + "txHash", txHash.String(), + "queueSize", len(s.pendingReceipts), + ) + return + } + + if _, exists := s.pendingReceipts[txHash]; exists { + // already pending, skip + return + } + + s.pendingReceipts[txHash] = &pendingReceipt{ + receipt: receipt, + filterer: filterer, + attempts: 1, + nextRetryAt: time.Now().Add(1 * time.Second), // first retry after 1s + } + + s.listener.log.Info(fmt.Sprintf("ethreceipts: added pending receipt for txn %s", txHash.Hex())) +} + +func (s *subscriber) retryPendingReceipts(ctx context.Context) { + s.retryMu.Lock() + + // Create a snapshot of receipts that are due for retry + var toRetry []*pendingReceipt + now := time.Now() + + for _, pending := range s.pendingReceipts { + if now.After(pending.nextRetryAt) { + // Claim this item for retry by pushing the nextRetryAt into the future, + // this prevents other concurrent retryPendingReceipts calls from picking + // it up. + pending.nextRetryAt = time.Now().Add(10 * time.Minute) + toRetry = append(toRetry, pending) + } + } + s.retryMu.Unlock() + + if len(toRetry) == 0 { + return + } + + s.listener.log.Info(fmt.Sprintf("ethreceipts: retrying %d pending receipts", len(toRetry))) + + // Collect receipts that are due for retry + sem := make(chan struct{}, maxConcurrentReceiptRetries) + var wg sync.WaitGroup + + for _, pending := range toRetry { + wg.Add(1) + go func(p *pendingReceipt) { + defer wg.Done() + + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-ctx.Done(): + // If context is cancelled, release the claim so the item can be retried later. + s.retryMu.Lock() + if current, ok := s.pendingReceipts[p.receipt.TransactionHash()]; ok && current == p { + current.nextRetryAt = time.Now() // Reschedule immediately. + } + s.retryMu.Unlock() + return + } + + // Attempt to fetch the receipt + txHash := p.receipt.TransactionHash() + r, err := s.listener.fetchTransactionReceipt(ctx, txHash, true) + + s.retryMu.Lock() + defer s.retryMu.Unlock() + + // Check if the item still exists and is the same one we claimed. + currentPending, exists := s.pendingReceipts[txHash] + if !exists || currentPending != p { + s.listener.log.Debug("Pending receipt is stale or already processed, skipping retry", "txHash", txHash.String()) + return + } + + if err != nil { + if errors.Is(err, ethereum.NotFound) { + // Transaction genuinely doesn't exist - remove from queue + delete(s.pendingReceipts, txHash) + s.listener.log.Debug("Receipt not found after retry, removing from queue", "txHash", txHash.String()) + return + } + + // Provider error - update retry state directly on the pointer. + currentPending.attempts++ + if currentPending.attempts >= maxReceiptRetryAttempts { + delete(s.pendingReceipts, txHash) + s.listener.log.Error( + "Failed to fetch receipt after max retries", + "txHash", txHash.String(), + "attempts", currentPending.attempts, + "error", err, + ) + // TODO: perhaps we should close the subscription here as we failed + // to deliver a receipt after many attempts? + return + } + + // Exponential backoff for next retry + backoff := time.Duration(1< maxWaitBetweenRetries { + backoff = maxWaitBetweenRetries + } + currentPending.nextRetryAt = time.Now().Add(backoff) + + s.listener.log.Debug( + "Receipt fetch failed, will retry", + "txHash", txHash.String(), + "attempt", currentPending.attempts, + "nextRetryIn", backoff, + ) + return + } + + // Remove from pending list + delete(s.pendingReceipts, txHash) + + // Update receipt with fetched data + p.receipt.receipt = r + p.receipt.logs = r.Logs + p.receipt.Filter = p.filterer + + // Check finality + if s.listener.isBlockFinal(r.BlockNumber) { + p.receipt.Final = true + } + + // Handle finalization queue if needed + if !p.receipt.Final && p.filterer.Options().Finalize { + s.finalizer.enqueue(p.filterer.FilterID(), p.receipt, r.BlockNumber) + } + + // Handle LimitOne filter removal + toFinalize := p.filterer.Options().Finalize && !p.receipt.Final + if p.filterer.Options().LimitOne && !toFinalize { + s.RemoveFilter(p.filterer) + } + + // Send to subscriber + s.ch.Send(p.receipt) + + s.listener.log.Info( + "Successfully fetched receipt after retry", + "txHash", txHash.String(), + "attempts", currentPending.attempts, + ) + }(pending) + } + + wg.Wait() +} diff --git a/ethrpc/ethrpc.go b/ethrpc/ethrpc.go index 0865374f..233391ec 100644 --- a/ethrpc/ethrpc.go +++ b/ethrpc/ethrpc.go @@ -26,10 +26,12 @@ import ( ) type Provider struct { - log *slog.Logger // TODO: not used.. - nodeURL string - nodeWSURL string - httpClient httpClient + log *slog.Logger // TODO: not used.. + nodeURL string + nodeWSURL string + + httpClient atomic.Value // stores an object that satisfies the httpClient interface + br breaker.Breaker jwtToken string // optional streamClosers []StreamCloser @@ -56,7 +58,7 @@ func NewProvider(nodeURL string, options ...Option) (*Provider, error) { opt(p) } - if p.httpClient == nil { + if p.httpClient.Load() == nil { httpTransport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ @@ -72,11 +74,11 @@ func NewProvider(nodeURL string, options ...Option) (*Provider, error) { ExpectContinueTimeout: 1 * time.Second, ResponseHeaderTimeout: 30 * time.Second, } - httpClient := &http.Client{ + client := &http.Client{ Transport: httpTransport, Timeout: 35 * time.Second, } - p.httpClient = httpClient + p.httpClient.Store(client) } return p, nil @@ -106,8 +108,12 @@ type StreamUnsubscriber interface { Unsubscribe() } -func (s *Provider) SetHTTPClient(httpClient *http.Client) { - s.httpClient = httpClient +func (s *Provider) SetHTTPClient(client httpClient) { + s.httpClient.Store(client) +} + +func (p *Provider) getHTTPClient() httpClient { + return p.httpClient.Load().(httpClient) } func (p *Provider) StrictnessLevel() StrictnessLevel { @@ -147,7 +153,8 @@ func (p *Provider) Do(ctx context.Context, calls ...Call) ([]byte, error) { req.Header.Set("Authorization", fmt.Sprintf("BEARER %s", p.jwtToken)) } - res, err := p.httpClient.Do(req) + httpClient := p.getHTTPClient() + res, err := httpClient.Do(req) if err != nil { return nil, superr.Wrap(ErrRequestFail, fmt.Errorf("failed to send request: %w", err)) } diff --git a/ethrpc/option.go b/ethrpc/option.go index 5ee0617a..0697e49d 100644 --- a/ethrpc/option.go +++ b/ethrpc/option.go @@ -25,7 +25,7 @@ func WithStreaming(nodeWebsocketURL string) Option { func WithHTTPClient(c httpClient) Option { return func(p *Provider) { - p.httpClient = c + p.SetHTTPClient(c) } } From 4c076e5b63bc1a835c9852916e1f3ee1edc84258 Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 27 Oct 2025 14:21:06 -0400 Subject: [PATCH 02/11] little updates --- ethreceipts/filterer.go | 12 ++++++------ ethreceipts/subscription.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ethreceipts/filterer.go b/ethreceipts/filterer.go index 959c6a27..a63ece89 100644 --- a/ethreceipts/filterer.go +++ b/ethreceipts/filterer.go @@ -161,10 +161,10 @@ type filter struct { cond FilterCond // startBlockNum is the first block number observed once filter is active - startBlockNum atomic.Uint64 + startBlockNum uint64 // lastMatchBlockNum is the block number where a last match occured - lastMatchBlockNum atomic.Uint64 + lastMatchBlockNum uint64 // exhausted signals if the filter hit MaxWait exhausted chan struct{} @@ -257,19 +257,19 @@ func (f *filter) Match(ctx context.Context, receipt Receipt) (bool, error) { } func (f *filter) StartBlockNum() uint64 { - return f.startBlockNum.Load() + return atomic.LoadUint64(&f.startBlockNum) } func (f *filter) LastMatchBlockNum() uint64 { - return f.lastMatchBlockNum.Load() + return atomic.LoadUint64(&f.lastMatchBlockNum) } func (f *filter) setStartBlockNum(num uint64) { - f.startBlockNum.Store(num) + atomic.StoreUint64(&f.startBlockNum, num) } func (f *filter) setLastMatchBlockNum(num uint64) { - f.lastMatchBlockNum.Store(num) + atomic.StoreUint64(&f.lastMatchBlockNum, num) } func (f *filter) closeExhausted() { diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index 0b6e3bde..2dc26a59 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -43,13 +43,6 @@ type Subscription interface { var _ Subscription = &subscriber{} -type pendingReceipt struct { - receipt Receipt - filterer Filterer - attempts int - nextRetryAt time.Time -} - type subscriber struct { listener *ReceiptsListener ch channel.Channel[Receipt] @@ -63,6 +56,13 @@ type subscriber struct { retryMu sync.Mutex } +type pendingReceipt struct { + receipt Receipt + filterer Filterer + attempts int + nextRetryAt time.Time +} + type registerFilters struct { subscriber *subscriber filters []Filterer @@ -208,7 +208,7 @@ func (s *subscriber) matchFilters(ctx context.Context, filterers []Filterer, rec // might be a provider issue, add to pending receipts for retry s.addPendingReceipt(item.receipt, item.filterer) - return superr.Wrap(fmt.Errorf("failed to fetch txn %s receipt", item.receipt.TransactionHash()), err) + return superr.Wrap(fmt.Errorf("failed to fetch txn %s receipt due to node issue", item.receipt.TransactionHash()), err) } // Update receipt with fetched data From c0dde44b695eaadfea0da4ed6a5af38fb925dd50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 13:00:59 -0600 Subject: [PATCH 03/11] changes after code review --- ethreceipts/ethreceipts.go | 15 +++++++++++---- ethreceipts/subscription.go | 32 ++++++++++++++++---------------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index 5ba0594b..c56a2261 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -30,7 +30,7 @@ import ( var DefaultOptions = Options{ MaxConcurrentFetchReceiptWorkers: 100, MaxConcurrentFilterWorkers: 200, - MaxConcurrentSearchOnChainWorkers: 5, + MaxConcurrentSearchOnChainWorkers: 15, LatestBlockNumCacheTTL: 2 * time.Second, PastReceiptsCacheSize: 5_000, NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality @@ -457,9 +457,16 @@ func (l *ReceiptsListener) FetchTransactionReceiptWithFilter(ctx context.Context // this transaction hash. func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash common.Hash, forceFetch bool) (*types.Receipt, error) { - // TODO: this might block forever if all workers are busy, should we have a - // timeout? - l.fetchSem <- struct{}{} + timeStart := time.Now() + for { + select { + case l.fetchSem <- struct{}{}: + break + case <-time.After(1 * time.Minute): + elapsed := time.Since(timeStart) + l.log.Warn(fmt.Sprintf("fetchTransactionReceipt(%s) waiting for fetch semaphore for %s", txnHash.String(), elapsed)) + } + } // channels to receive result or error: it could be difficult to coordinate // closing them manually because we're also selecting on ctx.Done(), so we diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index 2dc26a59..b1fa5634 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -294,7 +294,7 @@ func (s *subscriber) addPendingReceipt(receipt Receipt, filterer Filterer) { s.retryMu.Lock() defer s.retryMu.Unlock() - txHash := receipt.TransactionHash() + txnHash := receipt.TransactionHash() if s.pendingReceipts == nil { // lazy init @@ -304,25 +304,25 @@ func (s *subscriber) addPendingReceipt(receipt Receipt, filterer Filterer) { if len(s.pendingReceipts) >= maxPendingReceipts { s.listener.log.Error( "Pending receipts queue is full, dropping new receipt", - "txHash", txHash.String(), + "txnHash", txnHash.String(), "queueSize", len(s.pendingReceipts), ) return } - if _, exists := s.pendingReceipts[txHash]; exists { + if _, exists := s.pendingReceipts[txnHash]; exists { // already pending, skip return } - s.pendingReceipts[txHash] = &pendingReceipt{ + s.pendingReceipts[txnHash] = &pendingReceipt{ receipt: receipt, filterer: filterer, attempts: 1, nextRetryAt: time.Now().Add(1 * time.Second), // first retry after 1s } - s.listener.log.Info(fmt.Sprintf("ethreceipts: added pending receipt for txn %s", txHash.Hex())) + s.listener.log.Info(fmt.Sprintf("ethreceipts: added pending receipt for txn %s", txnHash.Hex())) } func (s *subscriber) retryPendingReceipts(ctx context.Context) { @@ -372,34 +372,34 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { } // Attempt to fetch the receipt - txHash := p.receipt.TransactionHash() - r, err := s.listener.fetchTransactionReceipt(ctx, txHash, true) + txnHash := p.receipt.TransactionHash() + r, err := s.listener.fetchTransactionReceipt(ctx, txnHash, true) s.retryMu.Lock() defer s.retryMu.Unlock() // Check if the item still exists and is the same one we claimed. - currentPending, exists := s.pendingReceipts[txHash] + currentPending, exists := s.pendingReceipts[txnHash] if !exists || currentPending != p { - s.listener.log.Debug("Pending receipt is stale or already processed, skipping retry", "txHash", txHash.String()) + s.listener.log.Debug("Pending receipt is stale or already processed, skipping retry", "txnHash", txnHash.String()) return } if err != nil { if errors.Is(err, ethereum.NotFound) { // Transaction genuinely doesn't exist - remove from queue - delete(s.pendingReceipts, txHash) - s.listener.log.Debug("Receipt not found after retry, removing from queue", "txHash", txHash.String()) + delete(s.pendingReceipts, txnHash) + s.listener.log.Debug("Receipt not found after retry, removing from queue", "txnHash", txnHash.String()) return } // Provider error - update retry state directly on the pointer. currentPending.attempts++ if currentPending.attempts >= maxReceiptRetryAttempts { - delete(s.pendingReceipts, txHash) + delete(s.pendingReceipts, txnHash) s.listener.log.Error( "Failed to fetch receipt after max retries", - "txHash", txHash.String(), + "txnHash", txnHash.String(), "attempts", currentPending.attempts, "error", err, ) @@ -417,7 +417,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { s.listener.log.Debug( "Receipt fetch failed, will retry", - "txHash", txHash.String(), + "txnHash", txnHash.String(), "attempt", currentPending.attempts, "nextRetryIn", backoff, ) @@ -425,7 +425,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { } // Remove from pending list - delete(s.pendingReceipts, txHash) + delete(s.pendingReceipts, txnHash) // Update receipt with fetched data p.receipt.receipt = r @@ -453,7 +453,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { s.listener.log.Info( "Successfully fetched receipt after retry", - "txHash", txHash.String(), + "txnHash", txnHash.String(), "attempts", currentPending.attempts, ) }(pending) From 39d43c9e83cf823a89173552cee1f07c3f1ee931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 13:05:37 -0600 Subject: [PATCH 04/11] changes after code review --- ethreceipts/ethreceipts.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index c56a2261..bd033186 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -461,13 +461,15 @@ func (l *ReceiptsListener) fetchTransactionReceipt(ctx context.Context, txnHash for { select { case l.fetchSem <- struct{}{}: - break + goto start case <-time.After(1 * time.Minute): elapsed := time.Since(timeStart) l.log.Warn(fmt.Sprintf("fetchTransactionReceipt(%s) waiting for fetch semaphore for %s", txnHash.String(), elapsed)) } } +start: + // channels to receive result or error: it could be difficult to coordinate // closing them manually because we're also selecting on ctx.Done(), so we // use buffered channels of size 1 and let them be garbage collected after @@ -809,7 +811,7 @@ func (l *ReceiptsListener) processBlocks(blocks ethmonitor.Blocks, subscribers [ }() // retry pending receipts first - retryCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + retryCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second) sub.retryPendingReceipts(retryCtx) cancel() From 73fe43510a74d7080c11f4182714180eb4308a69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 13:23:49 -0600 Subject: [PATCH 05/11] change Info to Warn --- ethreceipts/subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index b1fa5634..6225970c 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -347,7 +347,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { return } - s.listener.log.Info(fmt.Sprintf("ethreceipts: retrying %d pending receipts", len(toRetry))) + s.listener.log.Warn(fmt.Sprintf("ethreceipts: retrying %d pending receipts", len(toRetry))) // Collect receipts that are due for retry sem := make(chan struct{}, maxConcurrentReceiptRetries) From 06a73d28026f35628129c016c17f16334d47dfb3 Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 27 Oct 2025 15:38:59 -0400 Subject: [PATCH 06/11] improve latestBlockNum --- ethreceipts/ethreceipts.go | 56 ++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index bd033186..1a8f0636 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -31,7 +31,6 @@ var DefaultOptions = Options{ MaxConcurrentFetchReceiptWorkers: 100, MaxConcurrentFilterWorkers: 200, MaxConcurrentSearchOnChainWorkers: 15, - LatestBlockNumCacheTTL: 2 * time.Second, PastReceiptsCacheSize: 5_000, NumBlocksToFinality: 0, // value of <=0 here will select from ethrpc.Networks[chainID].NumBlocksToFinality FilterMaxWaitNumBlocks: 0, // value of 0 here means no limit, and will listen until manually unsubscribed @@ -53,10 +52,6 @@ type Options struct { // on-chain searches (this is per subscriber) MaxConcurrentSearchOnChainWorkers int - // LatestBlockNumCacheTTL is the duration to cache the latest block number, - // this prevents frequent calls to the monitor for latest block number - LatestBlockNumCacheTTL time.Duration - // .. PastReceiptsCacheSize int @@ -110,6 +105,8 @@ type ReceiptsListener struct { running int32 mu sync.RWMutex + // latestBlockNumCache caches the latest block number only for the instance + // when we can't get the latest block number from the monitor (ie. monitor is just starting) latestBlockNumCache atomic.Uint64 latestBlockNumTime atomic.Int64 } @@ -947,46 +944,41 @@ func (l *ReceiptsListener) isBlockFinal(blockNum *big.Int) bool { } func (l *ReceiptsListener) latestBlockNum() *big.Int { - // Cache latest block number to avoid hammering monitor.LatestBlockNum() + // return immediately if the monitor has a latest block number + latestBlockNum := l.monitor.LatestBlockNum() + if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 { + return latestBlockNum + } + + // fetch it from the node directly, and cache for very short period + const latestBlockNumCacheTTL = 500 * time.Millisecond + cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0) - if time.Since(cachedTime) < l.options.LatestBlockNumCacheTTL { + if time.Since(cachedTime) < latestBlockNumCacheTTL { cachedNum := l.latestBlockNumCache.Load() if cachedNum > 0 { return big.NewInt(int64(cachedNum)) } } - l.latestBlockNumTime.Store(time.Now().Unix()) - - latestBlockNum := l.fetchLatestBlockNum() - if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 { - l.latestBlockNumCache.Store(latestBlockNum.Uint64()) - } - - return latestBlockNum -} - -func (l *ReceiptsListener) fetchLatestBlockNum() *big.Int { timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second) defer cancel() - latestBlockNum := l.monitor.LatestBlockNum() - - if latestBlockNum == nil || latestBlockNum.Cmp(big.NewInt(0)) == 0 { - err := l.br.Do(l.ctx, func() error { - block, err := l.provider.BlockByNumber(timeoutCtx, nil) - if err != nil { - return err - } - latestBlockNum = block.Number() - return nil - }) - if err != nil || latestBlockNum == nil { - return big.NewInt(0) + err := l.br.Do(l.ctx, func() error { + block, err := l.provider.BlockByNumber(timeoutCtx, nil) + if err != nil { + return err } - return latestBlockNum + latestBlockNum = block.Number() + return nil + }) + if err != nil || latestBlockNum == nil { + latestBlockNum = big.NewInt(0) } + l.latestBlockNumTime.Store(time.Now().Unix()) + l.latestBlockNumCache.Store(latestBlockNum.Uint64()) + return latestBlockNum } From 2034c33185ebf542df00208aefe54222ee07407b Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 27 Oct 2025 15:46:45 -0400 Subject: [PATCH 07/11] comment --- ethreceipts/subscription.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index 6225970c..1266703f 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -347,6 +347,8 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { return } + // Log warning here, as we treat any need for retrying to fetch a receipt as a warning, + // and indicates some kind of node/provider issue. s.listener.log.Warn(fmt.Sprintf("ethreceipts: retrying %d pending receipts", len(toRetry))) // Collect receipts that are due for retry From 08dd402ecfa9b63b08e504ca823f44250147dd46 Mon Sep 17 00:00:00 2001 From: Peter Kieltyka Date: Mon, 27 Oct 2025 15:54:37 -0400 Subject: [PATCH 08/11] simplify latestBlockNum, and use a single source of truth --- ethreceipts/ethreceipts.go | 42 +++++++++++--------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/ethreceipts/ethreceipts.go b/ethreceipts/ethreceipts.go index 1a8f0636..5b3b9cb2 100644 --- a/ethreceipts/ethreceipts.go +++ b/ethreceipts/ethreceipts.go @@ -104,11 +104,6 @@ type ReceiptsListener struct { ctxStop context.CancelFunc running int32 mu sync.RWMutex - - // latestBlockNumCache caches the latest block number only for the instance - // when we can't get the latest block number from the monitor (ie. monitor is just starting) - latestBlockNumCache atomic.Uint64 - latestBlockNumTime atomic.Int64 } var ( @@ -950,36 +945,23 @@ func (l *ReceiptsListener) latestBlockNum() *big.Int { return latestBlockNum } - // fetch it from the node directly, and cache for very short period - const latestBlockNumCacheTTL = 500 * time.Millisecond + // wait until monitor has a block num for us, up to a certain amount of time + maxWaitTime := 30 * time.Second + period := 250 * time.Millisecond + for { + time.Sleep(period) - cachedTime := time.Unix(l.latestBlockNumTime.Load(), 0) - if time.Since(cachedTime) < latestBlockNumCacheTTL { - cachedNum := l.latestBlockNumCache.Load() - if cachedNum > 0 { - return big.NewInt(int64(cachedNum)) + latestBlockNum := l.monitor.LatestBlockNum() + if latestBlockNum != nil && latestBlockNum.Cmp(big.NewInt(0)) > 0 { + return latestBlockNum } - } - timeoutCtx, cancel := context.WithTimeout(l.ctx, 5*time.Second) - defer cancel() - - err := l.br.Do(l.ctx, func() error { - block, err := l.provider.BlockByNumber(timeoutCtx, nil) - if err != nil { - return err + maxWaitTime -= period + if maxWaitTime <= 0 { + l.log.Error("ethreceipts: latestBlockNum: monitor has no latest block number after waiting, returning 0") + return big.NewInt(0) } - latestBlockNum = block.Number() - return nil - }) - if err != nil || latestBlockNum == nil { - latestBlockNum = big.NewInt(0) } - - l.latestBlockNumTime.Store(time.Now().Unix()) - l.latestBlockNumCache.Store(latestBlockNum.Uint64()) - - return latestBlockNum } func getChainID(ctx context.Context, provider ethrpc.Interface) (*big.Int, error) { From 1590ac309c90d27b920e2dc3f6ab196ee595e569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 14:02:51 -0600 Subject: [PATCH 09/11] ethrpc: use mutex instead of atomic value --- ethrpc/ethrpc.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ethrpc/ethrpc.go b/ethrpc/ethrpc.go index 233391ec..5acc6fd1 100644 --- a/ethrpc/ethrpc.go +++ b/ethrpc/ethrpc.go @@ -30,7 +30,8 @@ type Provider struct { nodeURL string nodeWSURL string - httpClient atomic.Value // stores an object that satisfies the httpClient interface + httpClient httpClient + httpClientMu sync.RWMutex br breaker.Breaker jwtToken string // optional @@ -58,7 +59,7 @@ func NewProvider(nodeURL string, options ...Option) (*Provider, error) { opt(p) } - if p.httpClient.Load() == nil { + if p.httpClient == nil { httpTransport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ @@ -78,7 +79,7 @@ func NewProvider(nodeURL string, options ...Option) (*Provider, error) { Transport: httpTransport, Timeout: 35 * time.Second, } - p.httpClient.Store(client) + p.httpClient = client } return p, nil @@ -109,11 +110,15 @@ type StreamUnsubscriber interface { } func (s *Provider) SetHTTPClient(client httpClient) { - s.httpClient.Store(client) + s.httpClientMu.Lock() + defer s.httpClientMu.Unlock() + s.httpClient = client } func (p *Provider) getHTTPClient() httpClient { - return p.httpClient.Load().(httpClient) + p.httpClientMu.RLock() + defer p.httpClientMu.RUnlock() + return p.httpClient } func (p *Provider) StrictnessLevel() StrictnessLevel { From 869642d74579313bd645421eec161918361b7b10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 14:05:35 -0600 Subject: [PATCH 10/11] ethrpc: use mutex instead of atomic value --- ethrpc/ethrpc.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ethrpc/ethrpc.go b/ethrpc/ethrpc.go index 5acc6fd1..2e000027 100644 --- a/ethrpc/ethrpc.go +++ b/ethrpc/ethrpc.go @@ -111,14 +111,16 @@ type StreamUnsubscriber interface { func (s *Provider) SetHTTPClient(client httpClient) { s.httpClientMu.Lock() - defer s.httpClientMu.Unlock() s.httpClient = client + s.httpClientMu.Unlock() } func (p *Provider) getHTTPClient() httpClient { p.httpClientMu.RLock() - defer p.httpClientMu.RUnlock() - return p.httpClient + httpClient := p.httpClient + p.httpClientMu.RUnlock() + + return httpClient } func (p *Provider) StrictnessLevel() StrictnessLevel { From 0a3b6cd4efb6ac2df65b84fb3e2c7a64d7b83e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Nieto?= Date: Mon, 27 Oct 2025 14:23:13 -0600 Subject: [PATCH 11/11] ethreceipts: add small delay for retrial --- ethreceipts/subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethreceipts/subscription.go b/ethreceipts/subscription.go index 1266703f..221d1a0c 100644 --- a/ethreceipts/subscription.go +++ b/ethreceipts/subscription.go @@ -367,7 +367,7 @@ func (s *subscriber) retryPendingReceipts(ctx context.Context) { // If context is cancelled, release the claim so the item can be retried later. s.retryMu.Lock() if current, ok := s.pendingReceipts[p.receipt.TransactionHash()]; ok && current == p { - current.nextRetryAt = time.Now() // Reschedule immediately. + current.nextRetryAt = time.Now().Add(100 * time.Millisecond) // small delay to avoid immediate retry } s.retryMu.Unlock() return