Skip to content

Commit a704a49

Browse files
committed
Add retry logic for header polling
1 parent c47e5c8 commit a704a49

File tree

1 file changed

+26
-34
lines changed

1 file changed

+26
-34
lines changed

lib/blockchain/ethereum.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"sync"
2020
"time"
2121

22+
"github.com/avast/retry-go"
2223
"github.com/ethereum/go-ethereum"
2324
"github.com/ethereum/go-ethereum/accounts/abi"
2425
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -79,18 +80,6 @@ type SharedHeaderPoller struct {
7980
done chan struct{}
8081
}
8182

82-
var sharedSubscriptionRegistry = struct {
83-
sync.Mutex
84-
m map[int64]*HeaderSubscription
85-
}{
86-
m: make(map[int64]*HeaderSubscription),
87-
}
88-
89-
type HeaderSubscription struct {
90-
mu sync.Mutex
91-
headerSubscriptions map[string]HeaderEventSubscription
92-
}
93-
9483
// newEVMClient creates an EVM client for a single node/URL
9584
func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient, error) {
9685
logger.Info().
@@ -134,21 +123,7 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient,
134123
return nil, err
135124
}
136125
ec.gasStats = NewGasStats(ec.ID)
137-
chainID := ec.GetChainID().Int64()
138-
// Acquire or create the shared data
139-
sharedSubscriptionRegistry.Lock()
140-
sharedSubs, exists := sharedSubscriptionRegistry.m[chainID]
141-
if !exists {
142-
sharedSubs = &HeaderSubscription{
143-
headerSubscriptions: make(map[string]HeaderEventSubscription),
144-
}
145-
sharedSubscriptionRegistry.m[chainID] = sharedSubs
146-
}
147-
sharedSubscriptionRegistry.Unlock()
148126

149-
// Now point the client’s fields to the shared map & mutex
150-
ec.headerSubscriptions = sharedSubs.headerSubscriptions
151-
ec.subscriptionMutex = &sharedSubs.mu
152127
// Initialize header subscription or polling
153128
if err := ec.InitializeHeaderSubscription(); err != nil {
154129
return nil, err
@@ -1348,7 +1323,7 @@ func (e *EthereumClient) startHeaderPolling() error {
13481323

13491324
go func() {
13501325
defer e.subscriptionWg.Done()
1351-
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Msg("Polling for headers goroutine started")
1326+
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Msg("Polling for headers started")
13521327
// Initialize lastHeaderNumber dynamically
13531328
lastHeaderNumber := latestHeader.Number.Uint64() - 1
13541329
for {
@@ -1379,20 +1354,37 @@ func (e *EthereumClient) startHeaderPolling() error {
13791354
}
13801355

13811356
if latestHeader.Number.Uint64() > lastHeaderNumber {
1382-
e.l.Info().Int64("Chain Id", e.GetChainID().Int64()).Uint64("LatestHeaderNumber", latestHeader.Number.Uint64()).Msg("New headers detected")
1357+
e.l.Debug().Int64("Chain Id", e.GetChainID().Int64()).Uint64("LatestHeaderNumber", latestHeader.Number.Uint64()).Msg("New headers detected")
13831358
// Process headers from (lastHeaderNumber + 1) to latestHeader.Number
13841359
// We may need to add a rate limiter, if we run into issues.
13851360
for blockNum := lastHeaderNumber + 1; blockNum <= latestHeader.Number.Uint64(); blockNum++ {
1386-
// Create a new context with timeout for each HeaderByNumber call
1387-
blockCtx, blockCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
13881361
if blockNum > math.MaxInt64 {
13891362
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Uint64("BlockNumber", blockNum).Msg("blockNum exceeds the maximum value for int64")
13901363
continue
13911364
}
1392-
header, err := e.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
1393-
blockCancel()
1394-
if err != nil {
1395-
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(err).Uint64("BlockNumber", blockNum).Msg("Error fetching header during range processing")
1365+
var header *SafeEVMHeader
1366+
var fetchErr error
1367+
1368+
// Retry logic for fetching the block header
1369+
retryErr := retry.Do(
1370+
func() error {
1371+
blockCtx, blockCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
1372+
defer blockCancel()
1373+
1374+
header, fetchErr = e.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
1375+
if fetchErr != nil {
1376+
e.l.Warn().Int64("Chain Id", e.GetChainID().Int64()).Err(fetchErr).Uint64("BlockNumber", blockNum).Msg("Retry fetching header")
1377+
}
1378+
return fetchErr
1379+
},
1380+
retry.Attempts(5),
1381+
retry.Delay(2*time.Second),
1382+
retry.MaxDelay(10*time.Second),
1383+
retry.DelayType(retry.BackOffDelay),
1384+
)
1385+
1386+
if retryErr != nil {
1387+
e.l.Error().Int64("Chain Id", e.GetChainID().Int64()).Err(fetchErr).Uint64("BlockNumber", blockNum).Msg("Failed to fetch header after retries. Skipping header.")
13961388
continue
13971389
}
13981390

0 commit comments

Comments
 (0)