Skip to content

Commit 9ec70e3

Browse files
committed
Switch header polling to fan out pattern
1 parent a6522ca commit 9ec70e3

File tree

2 files changed

+269
-124
lines changed

2 files changed

+269
-124
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package blockchain
2+
3+
import (
4+
"context"
5+
"math"
6+
"math/big"
7+
"sync"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/ethereum/go-ethereum/ethclient"
12+
"github.com/ethereum/go-ethereum/rpc"
13+
"github.com/rs/zerolog"
14+
)
15+
16+
type ChainHeaderManager struct {
17+
chainID int64
18+
pollInterval time.Duration
19+
networkCfg EVMNetwork
20+
logger zerolog.Logger
21+
22+
ethClient *ethclient.Client
23+
rpcClient *rpc.Client
24+
25+
done chan struct{}
26+
wg sync.WaitGroup
27+
28+
headersChan chan *SafeEVMHeader
29+
30+
mu sync.RWMutex
31+
subscribers map[*EthereumClient]struct{}
32+
33+
lastProcessed uint64
34+
35+
started bool
36+
}
37+
38+
var (
39+
chainManagerRegistry = struct {
40+
sync.Mutex
41+
managers map[int64]*ChainHeaderManager
42+
}{
43+
managers: make(map[int64]*ChainHeaderManager),
44+
}
45+
)
46+
47+
// getOrCreateChainManager returns an existing manager if found, otherwise creates one.
48+
func getOrCreateChainManager(
49+
chainID int64,
50+
pollInterval time.Duration,
51+
networkCfg EVMNetwork,
52+
logger zerolog.Logger,
53+
ethClient *ethclient.Client,
54+
rpcClient *rpc.Client,
55+
) *ChainHeaderManager {
56+
chainManagerRegistry.Lock()
57+
defer chainManagerRegistry.Unlock()
58+
59+
if mgr, exists := chainManagerRegistry.managers[chainID]; exists {
60+
return mgr
61+
}
62+
63+
mgr := newChainHeaderManager(chainID, pollInterval, networkCfg, logger, ethClient, rpcClient)
64+
chainManagerRegistry.managers[chainID] = mgr
65+
return mgr
66+
}
67+
68+
func removeChainManager(chainID int64) {
69+
chainManagerRegistry.Lock()
70+
defer chainManagerRegistry.Unlock()
71+
delete(chainManagerRegistry.managers, chainID)
72+
}
73+
74+
// newChainHeaderManager creates the manager but does not start polling automatically
75+
func newChainHeaderManager(
76+
chainID int64,
77+
pollInterval time.Duration,
78+
networkCfg EVMNetwork,
79+
logger zerolog.Logger,
80+
ethClient *ethclient.Client,
81+
rpcClient *rpc.Client,
82+
) *ChainHeaderManager {
83+
return &ChainHeaderManager{
84+
chainID: chainID,
85+
pollInterval: pollInterval,
86+
networkCfg: networkCfg,
87+
logger: logger,
88+
ethClient: ethClient,
89+
rpcClient: rpcClient,
90+
subscribers: make(map[*EthereumClient]struct{}),
91+
headersChan: make(chan *SafeEVMHeader, 1000), // Buffer to handle rapid blocks
92+
done: make(chan struct{}),
93+
}
94+
}
95+
96+
// startPolling initiates the two background goroutines (poll + fan-out).
97+
func (m *ChainHeaderManager) startPolling() {
98+
if m.started {
99+
return
100+
}
101+
m.started = true
102+
103+
// Attempt an initial fetch of the latest block, so we know where to begin
104+
initCtx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
105+
defer cancel()
106+
m.ethClient.HeaderByNumber(initCtx, nil)
107+
latestHeader, err := m.ethClient.HeaderByNumber(initCtx, nil)
108+
if err != nil {
109+
m.logger.Error().
110+
Int64("ChainID", m.chainID).
111+
Err(err).
112+
Msg("Failed initial fetch of the latest header, manager won't start polling")
113+
return
114+
}
115+
safeLatest := convertToSafeEVMHeader(latestHeader)
116+
m.lastProcessed = safeLatest.Number.Uint64() - 1
117+
118+
m.logger.Info().
119+
Int64("ChainID", m.chainID).
120+
Uint64("InitialBlock", m.lastProcessed).
121+
Msg("ChainHeaderManager starting polling")
122+
123+
m.wg.Add(2)
124+
go m.pollRoutine()
125+
go m.fanOutRoutine()
126+
}
127+
128+
// pollRoutine fetches new headers at a fixed interval and sends them down m.headersChan
129+
func (m *ChainHeaderManager) pollRoutine() {
130+
defer m.wg.Done()
131+
132+
ticker := time.NewTicker(m.pollInterval)
133+
defer ticker.Stop()
134+
135+
for {
136+
select {
137+
case <-m.done:
138+
m.logger.Debug().
139+
Int64("ChainID", m.chainID).
140+
Msg("pollRoutine: shutting down")
141+
return
142+
case <-ticker.C:
143+
if err := m.fetchAndQueueNewHeaders(); err != nil {
144+
m.logger.Error().
145+
Int64("ChainID", m.chainID).
146+
Err(err).
147+
Msg("pollRoutine: error fetching new headers")
148+
}
149+
}
150+
}
151+
}
152+
153+
// fanOutRoutine receives newly fetched headers from m.headersChan and distributes them
154+
func (m *ChainHeaderManager) fanOutRoutine() {
155+
defer m.wg.Done()
156+
157+
for {
158+
select {
159+
case <-m.done:
160+
m.logger.Debug().
161+
Int64("ChainID", m.chainID).
162+
Msg("fanOutRoutine: shutting down")
163+
return
164+
case hdr := <-m.headersChan:
165+
m.mu.RLock()
166+
for sub := range m.subscribers {
167+
sub.receiveHeader(hdr)
168+
}
169+
m.mu.RUnlock()
170+
}
171+
}
172+
}
173+
174+
// fetchAndQueueNewHeaders fetches the latest header and then loops over any missing blocks
175+
func (m *ChainHeaderManager) fetchAndQueueNewHeaders() error {
176+
ctx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
177+
defer cancel()
178+
179+
latest, err := m.ethClient.HeaderByNumber(ctx, nil)
180+
if err != nil {
181+
return err
182+
}
183+
latestNum := latest.Number.Uint64()
184+
185+
// We already processed up to X, we process X+1..latest
186+
for blockNum := m.lastProcessed + 1; blockNum <= latestNum; blockNum++ {
187+
if blockNum > math.MaxInt64 {
188+
m.logger.Error().Int64("ChainID", m.chainID).
189+
Uint64("BlockNumber", blockNum).
190+
Msg("blockNum exceeds int64 max, skipping")
191+
continue
192+
}
193+
blockCtx, blockCancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
194+
blockHdr, err := m.ethClient.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
195+
blockCancel()
196+
if err != nil {
197+
m.logger.Error().
198+
Int64("ChainID", m.chainID).
199+
Err(err).
200+
Uint64("BlockNumber", blockNum).
201+
Msg("Could not fetch block header in range")
202+
continue
203+
}
204+
safeHdr := convertToSafeEVMHeader(blockHdr)
205+
m.headersChan <- safeHdr
206+
m.lastProcessed = blockNum
207+
}
208+
return nil
209+
}
210+
211+
// subscribe attaches an EthereumClient to our manager
212+
func (m *ChainHeaderManager) subscribe(client *EthereumClient) {
213+
m.mu.Lock()
214+
defer m.mu.Unlock()
215+
m.subscribers[client] = struct{}{}
216+
}
217+
218+
// unsubscribe removes a subscriber from the manager
219+
func (m *ChainHeaderManager) unsubscribe(client *EthereumClient) {
220+
m.mu.Lock()
221+
defer m.mu.Unlock()
222+
delete(m.subscribers, client)
223+
}
224+
225+
// shutdown stops the goroutines and closes channels.
226+
func (m *ChainHeaderManager) shutdown() {
227+
close(m.done)
228+
m.wg.Wait()
229+
close(m.headersChan)
230+
}
231+
232+
func convertToSafeEVMHeader(hdr *types.Header) *SafeEVMHeader {
233+
if hdr == nil {
234+
return nil
235+
}
236+
return &SafeEVMHeader{
237+
Hash: hdr.Hash(),
238+
Number: hdr.Number,
239+
BaseFee: hdr.BaseFee,
240+
Timestamp: time.Unix(int64(hdr.Time), 0),
241+
}
242+
}

0 commit comments

Comments
 (0)