@@ -60,6 +60,35 @@ type EthereumClient struct {
6060 doneChan chan struct {}
6161 l zerolog.Logger
6262 subscriptionWg sync.WaitGroup
63+ sharedPoller * SharedHeaderPoller
64+ }
65+
66+ var chainPollingRegistry = struct {
67+ sync.Mutex
68+ m map [int64 ]* SharedHeaderPoller
69+ }{
70+ m : make (map [int64 ]* SharedHeaderPoller ),
71+ }
72+
73+ type SharedHeaderPoller struct {
74+ chainID int64
75+ lastPolled time.Time
76+ pollInterval time.Duration
77+ mu sync.Mutex
78+ fetching bool
79+ done chan struct {}
80+ }
81+
82+ var sharedSubscriptionRegistry = struct {
83+ sync.Mutex
84+ m map [int64 ]* HeaderSubscription
85+ }{
86+ m : make (map [int64 ]* HeaderSubscription ),
87+ }
88+
89+ type HeaderSubscription struct {
90+ mu sync.Mutex
91+ headerSubscriptions map [string ]HeaderEventSubscription
6392}
6493
6594// newEVMClient creates an EVM client for a single node/URL
@@ -105,6 +134,21 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient,
105134 return nil , err
106135 }
107136 ec .gasStats = NewGasStats (ec .ID )
137+ chainID := ec .GetChainID ().Int64 ()
138+ // Acquire or create the shared data
139+ sharedSubscriptionRegistry .Lock ()
140+ sharedSubs , exists := sharedSubscriptionRegistry .m [chainID ]
141+ if ! exists {
142+ sharedSubs = & HeaderSubscription {
143+ headerSubscriptions : make (map [string ]HeaderEventSubscription ),
144+ }
145+ sharedSubscriptionRegistry .m [chainID ] = sharedSubs
146+ }
147+ sharedSubscriptionRegistry .Unlock ()
148+
149+ // Now point the client’s fields to the shared map & mutex
150+ ec .headerSubscriptions = sharedSubs .headerSubscriptions
151+ ec .subscriptionMutex = & sharedSubs .mu
108152 // Initialize header subscription or polling
109153 if err := ec .InitializeHeaderSubscription (); err != nil {
110154 return nil , err
@@ -1263,53 +1307,103 @@ func (e *EthereumClient) InitializeHeaderSubscription() error {
12631307 }
12641308 // Fallback to polling if subscriptions are not supported
12651309 e .l .Info ().Str ("Network" , e .NetworkConfig .Name ).Msg ("Subscriptions not supported. Using polling for new headers." )
1266- return e .startPollingHeaders ()
1310+
1311+ chainPollingRegistry .Lock ()
1312+ poller , exists := chainPollingRegistry .m [e .GetChainID ().Int64 ()]
1313+ if ! exists {
1314+ poller = & SharedHeaderPoller {
1315+ chainID : e .GetChainID ().Int64 (),
1316+ pollInterval : 10 * time .Second ,
1317+ done : make (chan struct {}),
1318+ }
1319+ chainPollingRegistry .m [e .GetChainID ().Int64 ()] = poller
1320+ }
1321+ e .sharedPoller = poller
1322+ chainPollingRegistry .Unlock ()
1323+ return e .startHeaderPolling ()
12671324}
12681325
12691326// startPollingHeaders starts a polling loop to fetch new headers at regular intervals
1270- func (e * EthereumClient ) startPollingHeaders () error {
1271- pollInterval := time .Second * 5
1327+ func (e * EthereumClient ) startHeaderPolling () error {
1328+ e .l .Info ().Msg ("Starting Polling Headers" )
1329+ // pollInterval := time.Second * 30
1330+ chainPollingRegistry .Lock ()
1331+ pollInterval := chainPollingRegistry .m [e .GetChainID ().Int64 ()].pollInterval
12721332 ticker := time .NewTicker (pollInterval )
1333+ chainPollingRegistry .Unlock ()
1334+
12731335 e .subscriptionWg .Add (1 )
12741336
1275- latestHeader , err := e .HeaderByNumber (context .Background (), nil ) // Fetch the latest header
1337+ // Create a context with timeout for the initial header fetch
1338+ initCtx , initCancel := context .WithTimeout (context .Background (), e .NetworkConfig .Timeout .Duration )
1339+ defer initCancel ()
1340+
1341+ e .l .Info ().Msg ("Attempting to fetch latest header during initialization" )
1342+ latestHeader , err := e .HeaderByNumber (initCtx , nil ) // Fetch the latest header
12761343 if err != nil {
1344+ e .l .Error ().Err (err ).Msg ("Failed to fetch the latest header during initialization" )
12771345 return fmt .Errorf ("failed to fetch the latest header during initialization: %w" , err )
12781346 }
1347+ e .l .Info ().Str ("HeaderHash" , latestHeader .Hash .String ()).Msg ("Successfully fetched latest header during initialization" )
1348+
12791349 go func () {
12801350 defer e .subscriptionWg .Done ()
1351+ e .l .Info ().Msg ("Polling for headers goroutine started" )
12811352 // Initialize lastHeaderNumber dynamically
12821353 lastHeaderNumber := latestHeader .Number .Uint64 () - 1
12831354 for {
12841355 select {
12851356 case <- ticker .C :
1286- latestHeader , err := e .HeaderByNumber (context .Background (), nil ) // nil gets the latest header
1357+ // Determine if it's this goroutine's turn to poll
1358+ e .sharedPoller .mu .Lock ()
1359+ now := time .Now ()
1360+ if now .Sub (e .sharedPoller .lastPolled ) < e .sharedPoller .pollInterval || e .sharedPoller .fetching {
1361+ e .l .Debug ().Msg ("Not my time to poll, skipping" )
1362+ e .sharedPoller .mu .Unlock ()
1363+ continue
1364+ }
1365+ e .sharedPoller .lastPolled = now
1366+ e .sharedPoller .fetching = true
1367+ e .sharedPoller .mu .Unlock ()
1368+
1369+ e .l .Debug ().Msg ("Polling Headers" )
1370+
1371+ // Create a new context with timeout for each HeaderByNumber call
1372+ pollCtx , pollCancel := context .WithTimeout (context .Background (), e .NetworkConfig .Timeout .Duration )
1373+ latestHeader , err := e .HeaderByNumber (pollCtx , nil ) // nil gets the latest header
1374+ pollCancel ()
1375+
12871376 if err != nil {
12881377 e .l .Error ().Err (err ).Msg ("Error fetching latest header during polling" )
12891378 continue
12901379 }
1291- if latestHeader .Number .Uint64 () > lastHeaderNumber {
1292- lastHeaderNumber = latestHeader .Number .Uint64 ()
1293- if err := e .receiveHeader (latestHeader ); err != nil {
1294- e .l .Error ().Err (err ).Msg ("Error processing header" )
1295- continue
1296- }
12971380
1381+ if latestHeader .Number .Uint64 () > lastHeaderNumber {
1382+ e .l .Info ().Uint64 ("LatestHeaderNumber" , latestHeader .Number .Uint64 ()).Msg ("New headers detected" )
12981383 // Process headers from (lastHeaderNumber + 1) to latestHeader.Number
12991384 // We may need to add a rate limiter, if we run into issues.
13001385 for blockNum := lastHeaderNumber + 1 ; blockNum <= latestHeader .Number .Uint64 (); blockNum ++ {
1301- header , err := e .HeaderByNumber (context .Background (), big .NewInt (int64 (blockNum )))
1386+ // Create a new context with timeout for each HeaderByNumber call
1387+ blockCtx , blockCancel := context .WithTimeout (context .Background (), e .NetworkConfig .Timeout .Duration )
1388+ header , err := e .HeaderByNumber (blockCtx , big .NewInt (int64 (blockNum )))
1389+ blockCancel ()
13021390 if err != nil {
13031391 e .l .Error ().Err (err ).Uint64 ("BlockNumber" , blockNum ).Msg ("Error fetching header during range processing" )
13041392 continue
13051393 }
1394+
13061395 lastHeaderNumber = header .Number .Uint64 ()
1396+
13071397 if err := e .receiveHeader (header ); err != nil {
13081398 e .l .Error ().Err (err ).Uint64 ("BlockNumber" , blockNum ).Msg ("Error processing header" )
13091399 continue
13101400 }
1401+ e .l .Debug ().Uint64 ("BlockNumber" , blockNum ).Msg ("Processing header" )
13111402 }
13121403 }
1404+ e .sharedPoller .mu .Lock ()
1405+ e .sharedPoller .fetching = false
1406+ e .sharedPoller .mu .Unlock ()
13131407 case <- e .doneChan :
13141408 e .l .Debug ().Str ("Network" , e .NetworkConfig .Name ).Msg ("Polling loop cancelled" )
13151409 ticker .Stop ()
0 commit comments