Skip to content

Commit fcb5626

Browse files
committed
Merge branch 'main' of github.com:smartcontractkit/chainlink-testing-framework into parrotWildcards
2 parents bb9e2bd + 5e13d77 commit fcb5626

File tree

4 files changed

+344
-8
lines changed

4 files changed

+344
-8
lines changed

lib/.changeset/v1.50.21.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
- Fix start up of Nethermind 1.30.1+ containers
33
- Fix docker 8080 port mappings
44
- Do not change container name, when restarting it
5-
- Automatically forward `SETH_LOG_LEVEL` to k8s
5+
- Automatically forward `SETH_LOG_LEVEL` to k8s

lib/blockchain/blockchain.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ type EVMClient interface {
118118
RawJsonRPCCall(ctx context.Context, result interface{}, method string, params ...interface{}) error
119119

120120
GetEthClient() *ethclient.Client
121+
122+
InitializeHeaderSubscription() error
121123
}
122124

123125
// NodeHeader header with the ID of the node that received it
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package blockchain
2+
3+
import (
4+
"context"
5+
"math"
6+
"math/big"
7+
"sync"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/core/types"
11+
"github.com/ethereum/go-ethereum/ethclient"
12+
"github.com/ethereum/go-ethereum/rpc"
13+
"github.com/rs/zerolog"
14+
)
15+
16+
type ChainHeaderManager struct {
17+
chainID int64
18+
pollInterval time.Duration
19+
networkCfg EVMNetwork
20+
logger zerolog.Logger
21+
22+
ethClient *ethclient.Client
23+
rpcClient *rpc.Client
24+
25+
done chan struct{}
26+
wg sync.WaitGroup
27+
28+
headersChan chan *SafeEVMHeader
29+
30+
mu sync.RWMutex
31+
subscribers map[*EthereumClient]struct{}
32+
33+
lastProcessed uint64
34+
35+
started bool
36+
}
37+
38+
var (
39+
chainManagerRegistry = struct {
40+
sync.Mutex
41+
managers map[int64]*ChainHeaderManager
42+
}{
43+
managers: make(map[int64]*ChainHeaderManager),
44+
}
45+
)
46+
47+
// getOrCreateChainManager returns an existing manager if found, otherwise creates one.
48+
func getOrCreateChainManager(
49+
chainID int64,
50+
pollInterval time.Duration,
51+
networkCfg EVMNetwork,
52+
logger zerolog.Logger,
53+
ethClient *ethclient.Client,
54+
rpcClient *rpc.Client,
55+
) *ChainHeaderManager {
56+
chainManagerRegistry.Lock()
57+
defer chainManagerRegistry.Unlock()
58+
59+
if mgr, exists := chainManagerRegistry.managers[chainID]; exists {
60+
return mgr
61+
}
62+
63+
mgr := newChainHeaderManager(chainID, pollInterval, networkCfg, logger, ethClient, rpcClient)
64+
chainManagerRegistry.managers[chainID] = mgr
65+
return mgr
66+
}
67+
68+
func removeChainManager(chainID int64) {
69+
chainManagerRegistry.Lock()
70+
defer chainManagerRegistry.Unlock()
71+
delete(chainManagerRegistry.managers, chainID)
72+
}
73+
74+
// newChainHeaderManager creates the manager but does not start polling automatically
75+
func newChainHeaderManager(
76+
chainID int64,
77+
pollInterval time.Duration,
78+
networkCfg EVMNetwork,
79+
logger zerolog.Logger,
80+
ethClient *ethclient.Client,
81+
rpcClient *rpc.Client,
82+
) *ChainHeaderManager {
83+
return &ChainHeaderManager{
84+
chainID: chainID,
85+
pollInterval: pollInterval,
86+
networkCfg: networkCfg,
87+
logger: logger,
88+
ethClient: ethClient,
89+
rpcClient: rpcClient,
90+
subscribers: make(map[*EthereumClient]struct{}),
91+
headersChan: make(chan *SafeEVMHeader, 1000), // Buffer to handle rapid blocks
92+
done: make(chan struct{}),
93+
}
94+
}
95+
96+
// startPolling initiates the two background goroutines (poll + fan-out).
97+
func (m *ChainHeaderManager) startPolling() {
98+
if m.started {
99+
return
100+
}
101+
m.started = true
102+
103+
// Attempt an initial fetch of the latest block, so we know where to begin
104+
initCtx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
105+
defer cancel()
106+
latestHeader, err := m.ethClient.HeaderByNumber(initCtx, nil)
107+
if err != nil {
108+
m.logger.Error().
109+
Int64("ChainID", m.chainID).
110+
Err(err).
111+
Msg("Failed initial fetch of the latest header, manager won't start polling")
112+
return
113+
}
114+
safeLatest := convertToSafeEVMHeader(latestHeader)
115+
m.lastProcessed = safeLatest.Number.Uint64() - 1
116+
117+
m.logger.Info().
118+
Int64("ChainID", m.chainID).
119+
Uint64("InitialBlock", m.lastProcessed).
120+
Msg("ChainHeaderManager starting polling")
121+
122+
m.wg.Add(2)
123+
go m.pollRoutine()
124+
go m.fanOutRoutine()
125+
}
126+
127+
// pollRoutine fetches new headers at a fixed interval and sends them down m.headersChan
128+
func (m *ChainHeaderManager) pollRoutine() {
129+
defer m.wg.Done()
130+
131+
ticker := time.NewTicker(m.pollInterval)
132+
defer ticker.Stop()
133+
134+
for {
135+
select {
136+
case <-m.done:
137+
m.logger.Debug().
138+
Int64("ChainID", m.chainID).
139+
Msg("pollRoutine: shutting down")
140+
return
141+
case <-ticker.C:
142+
if err := m.fetchAndQueueNewHeaders(); err != nil {
143+
m.logger.Error().
144+
Int64("ChainID", m.chainID).
145+
Err(err).
146+
Msg("pollRoutine: error fetching new headers")
147+
}
148+
}
149+
}
150+
}
151+
152+
// fanOutRoutine receives newly fetched headers from m.headersChan and distributes them
153+
func (m *ChainHeaderManager) fanOutRoutine() {
154+
defer m.wg.Done()
155+
156+
for {
157+
select {
158+
case <-m.done:
159+
m.logger.Debug().
160+
Int64("ChainID", m.chainID).
161+
Msg("fanOutRoutine: shutting down")
162+
return
163+
case hdr := <-m.headersChan:
164+
m.mu.RLock()
165+
for sub := range m.subscribers {
166+
err := sub.receiveHeader(hdr)
167+
if err != nil {
168+
m.logger.Err(err).Msg("Finalizer received error during HTTP polling")
169+
}
170+
}
171+
m.mu.RUnlock()
172+
}
173+
}
174+
}
175+
176+
// fetchAndQueueNewHeaders fetches the latest header and then loops over any missing blocks
177+
func (m *ChainHeaderManager) fetchAndQueueNewHeaders() error {
178+
ctx, cancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
179+
defer cancel()
180+
181+
latest, err := m.ethClient.HeaderByNumber(ctx, nil)
182+
if err != nil {
183+
return err
184+
}
185+
latestNum := latest.Number.Uint64()
186+
187+
// We already processed up to X, we process X+1..latest
188+
for blockNum := m.lastProcessed + 1; blockNum <= latestNum; blockNum++ {
189+
if blockNum > math.MaxInt64 {
190+
m.logger.Error().Int64("ChainID", m.chainID).
191+
Uint64("BlockNumber", blockNum).
192+
Msg("blockNum exceeds int64 max, skipping")
193+
continue
194+
}
195+
blockCtx, blockCancel := context.WithTimeout(context.Background(), m.networkCfg.Timeout.Duration)
196+
blockHdr, err := m.ethClient.HeaderByNumber(blockCtx, big.NewInt(int64(blockNum)))
197+
blockCancel()
198+
if err != nil {
199+
m.logger.Error().
200+
Int64("ChainID", m.chainID).
201+
Err(err).
202+
Uint64("BlockNumber", blockNum).
203+
Msg("Could not fetch block header in range")
204+
continue
205+
}
206+
safeHdr := convertToSafeEVMHeader(blockHdr)
207+
m.headersChan <- safeHdr
208+
m.lastProcessed = blockNum
209+
}
210+
return nil
211+
}
212+
213+
// subscribe attaches an EthereumClient to our manager
214+
func (m *ChainHeaderManager) subscribe(client *EthereumClient) {
215+
m.mu.Lock()
216+
defer m.mu.Unlock()
217+
m.subscribers[client] = struct{}{}
218+
}
219+
220+
// unsubscribe removes a subscriber from the manager
221+
func (m *ChainHeaderManager) unsubscribe(client *EthereumClient) {
222+
m.mu.Lock()
223+
defer m.mu.Unlock()
224+
delete(m.subscribers, client)
225+
}
226+
227+
// shutdown stops the goroutines and closes channels.
228+
func (m *ChainHeaderManager) shutdown() {
229+
close(m.done)
230+
m.wg.Wait()
231+
close(m.headersChan)
232+
}
233+
234+
func convertToSafeEVMHeader(hdr *types.Header) *SafeEVMHeader {
235+
if hdr == nil {
236+
return nil
237+
}
238+
var safeTime int64
239+
if hdr.Time > math.MaxInt64 {
240+
safeTime = math.MaxInt64
241+
} else {
242+
safeTime = int64(hdr.Time)
243+
}
244+
return &SafeEVMHeader{
245+
Hash: hdr.Hash(),
246+
Number: hdr.Number,
247+
BaseFee: hdr.BaseFee,
248+
Timestamp: time.Unix(safeTime, 0),
249+
}
250+
}

0 commit comments

Comments
 (0)