Skip to content

Commit 5ce266b

Browse files
committed
Poll headers when WS is not available.
1 parent 15722ca commit 5ce266b

File tree

2 files changed

+85
-6
lines changed

2 files changed

+85
-6
lines changed

lib/blockchain/blockchain.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ type EVMClient interface {
118118
RawJsonRPCCall(ctx context.Context, result interface{}, method string, params ...interface{}) error
119119

120120
GetEthClient() *ethclient.Client
121+
122+
InitializeHeaderSubscription() error
121123
}
122124

123125
// NodeHeader header with the ID of the node that received it

lib/blockchain/ethereum.go

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,9 @@ func newEVMClient(networkSettings EVMNetwork, logger zerolog.Logger) (EVMClient,
105105
return nil, err
106106
}
107107
ec.gasStats = NewGasStats(ec.ID)
108-
// Check if subscriptions are supported since HTTP does not support subscriptions.
109-
if ec.Client.Client().SupportsSubscriptions() {
110-
err = ec.subscribeToNewHeaders()
111-
if err != nil {
112-
return nil, err
113-
}
108+
// Initialize header subscription or polling
109+
if err := ec.InitializeHeaderSubscription(); err != nil {
110+
return nil, err
114111
}
115112
// Check if the chain supports EIP-1559
116113
// https://eips.ethereum.org/EIPS/eip-1559
@@ -701,6 +698,40 @@ func (e *EthereumClient) WaitForFinalizedTx(txHash common.Hash) (*big.Int, time.
701698
key := "txFinalizer-" + txHash.String()
702699
e.AddHeaderEventSubscription(key, finalizer)
703700
defer e.DeleteHeaderEventSubscription(key)
701+
702+
if !e.Client.Client().SupportsSubscriptions() {
703+
go func() {
704+
ticker := time.NewTicker(10 * time.Second)
705+
defer ticker.Stop()
706+
707+
for {
708+
select {
709+
case <-finalizer.context.Done():
710+
return
711+
case <-ticker.C:
712+
latestHeader, err := e.GetLatestFinalizedBlockHeader(context.Background())
713+
if err != nil {
714+
e.l.Err(err).Msg("Error fetching latest finalized header via HTTP polling")
715+
}
716+
717+
nodeHeader := NodeHeader{
718+
// NodeID: 0, // Assign appropriate NodeID if needed
719+
SafeEVMHeader: SafeEVMHeader{
720+
Hash: latestHeader.Hash(),
721+
Number: latestHeader.Number,
722+
Timestamp: time.Unix(int64(latestHeader.Time), 0),
723+
BaseFee: latestHeader.BaseFee,
724+
},
725+
}
726+
727+
err = finalizer.ReceiveHeader(nodeHeader)
728+
if err != nil {
729+
e.l.Err(err).Msg("Finalizer received error during HTTP polling")
730+
}
731+
}
732+
}
733+
}()
734+
}
704735
err = finalizer.Wait()
705736
if err != nil {
706737
return nil, time.Time{}, fmt.Errorf("error waiting for finalization: %w in network %s tx %s", err, e.GetNetworkName(), txHash.Hex())
@@ -1220,6 +1251,47 @@ func (e *EthereumClient) AvgBlockTime(ctx context.Context) (time.Duration, error
12201251
return averageBlockTime, nil
12211252
}
12221253

1254+
// InitializeHeaderSubscription initializes either subscription-based or polling-based header processing
1255+
func (e *EthereumClient) InitializeHeaderSubscription() error {
1256+
if e.Client.Client().SupportsSubscriptions() {
1257+
return e.subscribeToNewHeaders()
1258+
}
1259+
// Fallback to polling if subscriptions are not supported
1260+
e.l.Info().Str("Network", e.NetworkConfig.Name).Msg("Subscriptions not supported. Using polling for new headers.")
1261+
return e.startPollingHeaders()
1262+
}
1263+
1264+
// startPollingHeaders starts a polling loop to fetch new headers at regular intervals
1265+
func (e *EthereumClient) startPollingHeaders() error {
1266+
pollInterval := time.Second * 5
1267+
ticker := time.NewTicker(pollInterval)
1268+
e.subscriptionWg.Add(1)
1269+
go func() {
1270+
defer e.subscriptionWg.Done()
1271+
lastHeaderNumber := uint64(0)
1272+
for {
1273+
select {
1274+
case <-ticker.C:
1275+
latestHeader, err := e.HeaderByNumber(context.Background(), nil) // nil gets the latest header
1276+
if err != nil {
1277+
e.l.Error().Err(err).Msg("Error fetching latest header during polling")
1278+
continue
1279+
}
1280+
if latestHeader.Number.Uint64() > lastHeaderNumber {
1281+
lastHeaderNumber = latestHeader.Number.Uint64()
1282+
e.receiveHeader(latestHeader)
1283+
}
1284+
case <-e.doneChan:
1285+
e.l.Debug().Str("Network", e.NetworkConfig.Name).Msg("Polling loop cancelled")
1286+
ticker.Stop()
1287+
e.Client.Close()
1288+
return
1289+
}
1290+
}
1291+
}()
1292+
return nil
1293+
}
1294+
12231295
// EthereumMultinodeClient wraps the client and the BlockChain network to interact with an EVM based Blockchain with multiple nodes
12241296
type EthereumMultinodeClient struct {
12251297
DefaultClient EVMClient
@@ -1755,3 +1827,8 @@ func (e *EthereumMultinodeClient) WaitForEvents() error {
17551827
func (e *EthereumMultinodeClient) ErrorReason(b ethereum.ContractCaller, tx *types.Transaction, receipt *types.Receipt) (string, error) {
17561828
return e.DefaultClient.ErrorReason(b, tx, receipt)
17571829
}
1830+
1831+
// InitializeHeaderSubscription initializes either subscription-based or polling-based header processing
1832+
func (e *EthereumMultinodeClient) InitializeHeaderSubscription() error {
1833+
return e.DefaultClient.InitializeHeaderSubscription()
1834+
}

0 commit comments

Comments
 (0)