Skip to content

Commit 33a3deb

Browse files
committed
Add caching and shared header subscriptions per chain id
1 parent a41193b commit 33a3deb

File tree

1 file changed

+106
-12
lines changed

1 file changed

+106
-12
lines changed

lib/blockchain/ethereum.go

Lines changed: 106 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,35 @@ type EthereumClient struct {
6060
doneChan chan struct{}
6161
l zerolog.Logger
6262
subscriptionWg sync.WaitGroup
63+
sharedPoller *SharedHeaderPoller
64+
}
65+
66+
var chainPollingRegistry = struct {
67+
sync.Mutex
68+
m map[int64]*SharedHeaderPoller
69+
}{
70+
m: make(map[int64]*SharedHeaderPoller),
71+
}
72+
73+
type SharedHeaderPoller struct {
74+
chainID int64
75+
lastPolled time.Time
76+
pollInterval time.Duration
77+
mu sync.Mutex
78+
fetching bool
79+
done chan struct{}
80+
}
81+
82+
var sharedSubscriptionRegistry = struct {
83+
sync.Mutex
84+
m map[int64]*HeaderSubscription
85+
}{
86+
m: make(map[int64]*HeaderSubscription),
87+
}
88+
89+
type HeaderSubscription struct {
90+
mu sync.Mutex
91+
headerSubscriptions map[string]HeaderEventSubscription
6392
}
6493

6594
// newEVMClient creates an EVM client for a single node/URL
@@ -105,6 +134,21 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient,
105134
return nil, err
106135
}
107136
ec.gasStats = NewGasStats(ec.ID)
137+
chainID := ec.GetChainID().Int64()
138+
// Acquire or create the shared data
139+
sharedSubscriptionRegistry.Lock()
140+
sharedSubs, exists := sharedSubscriptionRegistry.m[chainID]
141+
if !exists {
142+
sharedSubs = &HeaderSubscription{
143+
headerSubscriptions: make(map[string]HeaderEventSubscription),
144+
}
145+
sharedSubscriptionRegistry.m[chainID] = sharedSubs
146+
}
147+
sharedSubscriptionRegistry.Unlock()
148+
149+
// Now point the client’s fields to the shared map & mutex
150+
ec.headerSubscriptions = sharedSubs.headerSubscriptions
151+
ec.subscriptionMutex = &sharedSubs.mu
108152
// Initialize header subscription or polling
109153
if err := ec.InitializeHeaderSubscription(); err != nil {
110154
return nil, err
@@ -1263,53 +1307,103 @@ func (e *EthereumClient) InitializeHeaderSubscription() error {
12631307
}
12641308
// Fallback to polling if subscriptions are not supported
12651309
e.l.Info().Str("Network", e.NetworkConfig.Name).Msg("Subscriptions not supported. Using polling for new headers.")
1266-
return e.startPollingHeaders()
1310+
1311+
chainPollingRegistry.Lock()
1312+
poller, exists := chainPollingRegistry.m[e.GetChainID().Int64()]
1313+
if !exists {
1314+
poller = &SharedHeaderPoller{
1315+
chainID: e.GetChainID().Int64(),
1316+
pollInterval: 10 * time.Second,
1317+
done: make(chan struct{}),
1318+
}
1319+
chainPollingRegistry.m[e.GetChainID().Int64()] = poller
1320+
}
1321+
e.sharedPoller = poller
1322+
chainPollingRegistry.Unlock()
1323+
return e.startHeaderPolling()
12671324
}
12681325

12691326
// startPollingHeaders starts a polling loop to fetch new headers at regular intervals
1270-
func (e *EthereumClient) startPollingHeaders() error {
1271-
pollInterval := time.Second * 5
1327+
func (e *EthereumClient) startHeaderPolling() error {
1328+
e.l.Info().Msg("Starting Polling Headers")
1329+
// pollInterval := time.Second * 30
1330+
chainPollingRegistry.Lock()
1331+
pollInterval := chainPollingRegistry.m[e.GetChainID().Int64()].pollInterval
12721332
ticker := time.NewTicker(pollInterval)
1333+
chainPollingRegistry.Unlock()
1334+
12731335
e.subscriptionWg.Add(1)
12741336

1275-
latestHeader, err := e.HeaderByNumber(context.Background(), nil) // Fetch the latest header
1337+
// Create a context with timeout for the initial header fetch
1338+
initCtx, initCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
1339+
defer initCancel()
1340+
1341+
e.l.Info().Msg("Attempting to fetch latest header during initialization")
1342+
latestHeader, err := e.HeaderByNumber(initCtx, nil) // Fetch the latest header
12761343
if err != nil {
1344+
e.l.Error().Err(err).Msg("Failed to fetch the latest header during initialization")
12771345
return fmt.Errorf("failed to fetch the latest header during initialization: %w", err)
12781346
}
1347+
e.l.Info().Str("HeaderHash", latestHeader.Hash.String()).Msg("Successfully fetched latest header during initialization")
1348+
12791349
go func() {
12801350
defer e.subscriptionWg.Done()
1351+
e.l.Info().Msg("Polling for headers goroutine started")
12811352
// Initialize lastHeaderNumber dynamically
12821353
lastHeaderNumber := latestHeader.Number.Uint64() - 1
12831354
for {
12841355
select {
12851356
case <-ticker.C:
1286-
latestHeader, err := e.HeaderByNumber(context.Background(), nil) // nil gets the latest header
1357+
// Determine if it's this goroutine's turn to poll
1358+
e.sharedPoller.mu.Lock()
1359+
now := time.Now()
1360+
if now.Sub(e.sharedPoller.lastPolled) < e.sharedPoller.pollInterval || e.sharedPoller.fetching {
1361+
e.l.Debug().Msg("Not my time to poll, skipping")
1362+
e.sharedPoller.mu.Unlock()
1363+
continue
1364+
}
1365+
e.sharedPoller.lastPolled = now
1366+
e.sharedPoller.fetching = true
1367+
e.sharedPoller.mu.Unlock()
1368+
1369+
e.l.Debug().Msg("Polling Headers")
1370+
1371+
// Create a new context with timeout for each HeaderByNumber call
1372+
pollCtx, pollCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
1373+
latestHeader, err := e.HeaderByNumber(pollCtx, nil) // nil gets the latest header
1374+
pollCancel()
1375+
12871376
if err != nil {
12881377
e.l.Error().Err(err).Msg("Error fetching latest header during polling")
12891378
continue
12901379
}
1291-
if latestHeader.Number.Uint64() > lastHeaderNumber {
1292-
lastHeaderNumber = latestHeader.Number.Uint64()
1293-
if err := e.receiveHeader(latestHeader); err != nil {
1294-
e.l.Error().Err(err).Msg("Error processing header")
1295-
continue
1296-
}
12971380

1381+
if latestHeader.Number.Uint64() > lastHeaderNumber {
1382+
e.l.Info().Uint64("LatestHeaderNumber", latestHeader.Number.Uint64()).Msg("New headers detected")
12981383
// Process headers from (lastHeaderNumber + 1) to latestHeader.Number
12991384
// We may need to add a rate limiter, if we run into issues.
13001385
for blockNum := lastHeaderNumber + 1; blockNum <= latestHeader.Number.Uint64(); blockNum++ {
1301-
header, err := e.HeaderByNumber(context.Background(), big.NewInt(int64(blockNum)))
1386+
// Create a new context with timeout for each HeaderByNumber call
1387+
blockCtx, blockCancel := context.WithTimeout(context.Background(), e.NetworkConfig.Timeout.Duration)
1388+
header, err := e.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
1389+
blockCancel()
13021390
if err != nil {
13031391
e.l.Error().Err(err).Uint64("BlockNumber", blockNum).Msg("Error fetching header during range processing")
13041392
continue
13051393
}
1394+
13061395
lastHeaderNumber = header.Number.Uint64()
1396+
13071397
if err := e.receiveHeader(header); err != nil {
13081398
e.l.Error().Err(err).Uint64("BlockNumber", blockNum).Msg("Error processing header")
13091399
continue
13101400
}
1401+
e.l.Debug().Uint64("BlockNumber", blockNum).Msg("Processing header")
13111402
}
13121403
}
1404+
e.sharedPoller.mu.Lock()
1405+
e.sharedPoller.fetching = false
1406+
e.sharedPoller.mu.Unlock()
13131407
case <-e.doneChan:
13141408
e.l.Debug().Str("Network", e.NetworkConfig.Name).Msg("Polling loop cancelled")
13151409
ticker.Stop()

0 commit comments

Comments
 (0)