@@ -19,6 +19,7 @@ import (
1919 "sync"
2020 "time"
2121
22+ "github.com/avast/retry-go"
2223 "github.com/ethereum/go-ethereum"
2324 "github.com/ethereum/go-ethereum/accounts/abi"
2425 "github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -79,18 +80,6 @@ type SharedHeaderPoller struct {
7980 done chan struct {}
8081}
8182
82- var sharedSubscriptionRegistry = struct {
83- sync.Mutex
84- m map [int64 ]* HeaderSubscription
85- }{
86- m : make (map [int64 ]* HeaderSubscription ),
87- }
88-
89- type HeaderSubscription struct {
90- mu sync.Mutex
91- headerSubscriptions map [string ]HeaderEventSubscription
92- }
93-
9483// newEVMClient creates an EVM client for a single node/URL
9584func newEVMClient (networkSettings EVMNetwork , logger zerolog.Logger ) (EVMClient , error ) {
9685 logger .Info ().
@@ -134,21 +123,7 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient,
134123 return nil , err
135124 }
136125 ec .gasStats = NewGasStats (ec .ID )
137- chainID := ec .GetChainID ().Int64 ()
138- // Acquire or create the shared data
139- sharedSubscriptionRegistry .Lock ()
140- sharedSubs , exists := sharedSubscriptionRegistry .m [chainID ]
141- if ! exists {
142- sharedSubs = & HeaderSubscription {
143- headerSubscriptions : make (map [string ]HeaderEventSubscription ),
144- }
145- sharedSubscriptionRegistry .m [chainID ] = sharedSubs
146- }
147- sharedSubscriptionRegistry .Unlock ()
148126
149- // Now point the client’s fields to the shared map & mutex
150- ec .headerSubscriptions = sharedSubs .headerSubscriptions
151- ec .subscriptionMutex = & sharedSubs .mu
152127 // Initialize header subscription or polling
153128 if err := ec .InitializeHeaderSubscription (); err != nil {
154129 return nil , err
@@ -1348,7 +1323,7 @@ func (e *EthereumClient) startHeaderPolling() error {
13481323
13491324 go func () {
13501325 defer e .subscriptionWg .Done ()
1351- e .l .Info ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Msg ("Polling for headers goroutine started" )
1326+ e .l .Info ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Msg ("Polling for headers started" )
13521327 // Initialize lastHeaderNumber dynamically
13531328 lastHeaderNumber := latestHeader .Number .Uint64 () - 1
13541329 for {
@@ -1379,20 +1354,37 @@ func (e *EthereumClient) startHeaderPolling() error {
13791354 }
13801355
13811356 if latestHeader .Number .Uint64 () > lastHeaderNumber {
1382- e .l .Info ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Uint64 ("LatestHeaderNumber" , latestHeader .Number .Uint64 ()).Msg ("New headers detected" )
1357+ e .l .Debug ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Uint64 ("LatestHeaderNumber" , latestHeader .Number .Uint64 ()).Msg ("New headers detected" )
13831358 // Process headers from (lastHeaderNumber + 1) to latestHeader.Number
13841359 // We may need to add a rate limiter, if we run into issues.
13851360 for blockNum := lastHeaderNumber + 1 ; blockNum <= latestHeader .Number .Uint64 (); blockNum ++ {
1386- // Create a new context with timeout for each HeaderByNumber call
1387- blockCtx , blockCancel := context .WithTimeout (context .Background (), e .NetworkConfig .Timeout .Duration )
13881361 if blockNum > math .MaxInt64 {
13891362 e .l .Error ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Uint64 ("BlockNumber" , blockNum ).Msg ("blockNum exceeds the maximum value for int64" )
13901363 continue
13911364 }
1392- header , err := e .HeaderByNumber (blockCtx , big .NewInt (int64 (blockNum )))
1393- blockCancel ()
1394- if err != nil {
1395- e .l .Error ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Err (err ).Uint64 ("BlockNumber" , blockNum ).Msg ("Error fetching header during range processing" )
1365+ var header * SafeEVMHeader
1366+ var fetchErr error
1367+
1368+ // Retry logic for fetching the block header
1369+ retryErr := retry .Do (
1370+ func () error {
1371+ blockCtx , blockCancel := context .WithTimeout (context .Background (), e .NetworkConfig .Timeout .Duration )
1372+ defer blockCancel ()
1373+
1374+ header , fetchErr = e .HeaderByNumber (blockCtx , big .NewInt (int64 (blockNum )))
1375+ if fetchErr != nil {
1376+ e .l .Warn ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Err (fetchErr ).Uint64 ("BlockNumber" , blockNum ).Msg ("Retry fetching header" )
1377+ }
1378+ return fetchErr
1379+ },
1380+ retry .Attempts (5 ),
1381+ retry .Delay (2 * time .Second ),
1382+ retry .MaxDelay (10 * time .Second ),
1383+ retry .DelayType (retry .BackOffDelay ),
1384+ )
1385+
1386+ if retryErr != nil {
1387+ e .l .Error ().Int64 ("Chain Id" , e .GetChainID ().Int64 ()).Err (fetchErr ).Uint64 ("BlockNumber" , blockNum ).Msg ("Failed to fetch header after retries. Skipping header." )
13961388 continue
13971389 }
13981390
0 commit comments