Skip to content

Commit 9bf046b

Browse files
committed
feat: add monitoring of node drift
2 parents a62b1c0 + a871715 commit 9bf046b

File tree

4 files changed

+154
-6
lines changed

4 files changed

+154
-6
lines changed

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# DA Monitor
22

3-
Data Availability monitoring tool for rollups using Celestia DA. This tool monitors EVM block headers in real-time, queries ev-node for DA submission information, and verifies blob data on Celestia.
3+
Data Availability monitoring tool for evstack using Celestia DA. This tool monitors EVM block headers in real-time, queries ev-node for DA submission information, and verifies blob data on Celestia.
44

55
## Features
66

@@ -62,8 +62,7 @@ Metrics will be available at `http://localhost:2112/metrics`
6262
- `--evm-ws-url`: EVM client WebSocket URL (default: `ws://localhost:8546`)
6363
- `--celestia-url`: Celestia DA JSON-RPC URL (default: `http://localhost:26658`)
6464
- `--celestia-token`: Celestia authentication token (optional)
65-
- `--block-height`: Specific block height to verify (0 = stream mode, default: 0)
66-
- `--duration`: Duration in seconds to stream (0 = infinite, default: 30)
65+
- `--duration`: Duration in seconds to stream (0 = infinite )
6766
- `--chain-id`: Chain identifier for metrics labels (default: "testnet")
6867
- `--enable-metrics`: Enable Prometheus metrics HTTP server (default: false)
6968
- `--port`: HTTP server port for metrics (default: 2112)

cmd/monitor.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cmd
33
import (
44
"context"
55
"fmt"
6+
"github.com/01builders/da-monitor/internal/drift"
67
coreda "github.com/evstack/ev-node/core/da"
78
"golang.org/x/sync/errgroup"
89
"net/http"
@@ -16,6 +17,7 @@ import (
1617
"github.com/prometheus/client_golang/prometheus/promhttp"
1718
"github.com/rs/zerolog"
1819
"github.com/spf13/cobra"
20+
"strings"
1921
)
2022

2123
const (
@@ -30,6 +32,9 @@ const (
3032
flagPort = "port"
3133
flagChain = "chain-id"
3234
flagEnableMetrics = "enable-metrics"
35+
flagReferenceNode = "reference-node"
36+
flagFullNodes = "full-nodes"
37+
flagPollingInterval = "polling-interval"
3338

3439
metricsPath = "/metrics"
3540
)
@@ -48,6 +53,9 @@ type flagValues struct {
4853
port int
4954
chainID string
5055
enableMetrics bool
56+
referenceNode string
57+
fullNodes string
58+
pollingInterval int
5159
}
5260

5361
func NewMonitorCmd() *cobra.Command {
@@ -73,6 +81,9 @@ func NewMonitorCmd() *cobra.Command {
7381
cmd.Flags().BoolVar(&flags.enableMetrics, flagEnableMetrics, false, "Enable Prometheus metrics HTTP server")
7482
cmd.Flags().IntVar(&flags.port, flagPort, 2112, "HTTP server port for metrics (only used if --enable-metrics is set)")
7583
cmd.Flags().StringVar(&flags.chainID, flagChain, "testnet", "chainID identifier for metrics labels")
84+
cmd.Flags().StringVar(&flags.referenceNode, flagReferenceNode, "", "Reference node RPC endpoint URL (sequencer) for drift monitoring")
85+
cmd.Flags().StringVar(&flags.fullNodes, flagFullNodes, "", "Comma-separated list of full node RPC endpoint URLs for drift monitoring")
86+
cmd.Flags().IntVar(&flags.pollingInterval, flagPollingInterval, 10, "Polling interval in seconds for checking node block heights (default: 10)")
7687

7788
if err := cmd.MarkFlagRequired(flagHeaderNS); err != nil {
7889
panic(err)
@@ -213,5 +224,18 @@ func monitorAndVerifyDataAndHeaders(cmd *cobra.Command, args []string) error {
213224
return verifier.ProcessHeaders(streamCtx)
214225
})
215226

227+
g.Go(func() error {
228+
if flags.referenceNode == "" || len(flags.fullNodes) == 0 {
229+
logger.Info().
230+
Str("reference_node", flags.referenceNode).
231+
Strs("full_nodes", strings.Split(flags.fullNodes, ",")).
232+
Msg("skipping node drift monitoring")
233+
return nil
234+
}
235+
236+
fullNodeList := strings.Split(flags.fullNodes, ",")
237+
return drift.Monitor(retryCtx, m, flags.chainID, flags.referenceNode, fullNodeList, flags.pollingInterval, logger)
238+
})
239+
216240
return g.Wait()
217241
}

internal/drift/drift.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package drift
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/01builders/da-monitor/internal/metrics"
7+
"github.com/ethereum/go-ethereum/ethclient"
8+
"github.com/rs/zerolog"
9+
"time"
10+
)
11+
12+
// Monitor continuously checks block heights of a reference node and multiple full nodes,
13+
// recording the heights and their drift into the provided metrics instance at specified intervals.
14+
func Monitor(ctx context.Context, m *metrics.Metrics, chainID string, referenceNode string, fullNodes []string, pollingInterval int, logger zerolog.Logger) error {
15+
ticker := time.NewTicker(time.Duration(pollingInterval) * time.Second)
16+
defer ticker.Stop()
17+
18+
logger.Info().
19+
Str("reference_node", referenceNode).
20+
Strs("full_nodes", fullNodes).
21+
Int("polling_interval_sec", pollingInterval).
22+
Msg("starting node drift monitoring")
23+
24+
for {
25+
select {
26+
case <-ctx.Done():
27+
logger.Info().Msg("stopping node drift monitoring")
28+
return ctx.Err()
29+
case <-ticker.C:
30+
// get reference node height
31+
refHeight, err := getBlockHeight(ctx, referenceNode)
32+
if err != nil {
33+
logger.Error().Err(err).Str("endpoint", referenceNode).Msg("failed to get reference node block height")
34+
continue
35+
}
36+
37+
m.RecordReferenceBlockHeight(chainID, referenceNode, refHeight)
38+
logger.Info().Uint64("height", refHeight).Str("endpoint", referenceNode).Msg("recorded reference node height")
39+
40+
// get each full node height and calculate drift
41+
for _, fullNode := range fullNodes {
42+
currentHeight, err := getBlockHeight(ctx, fullNode)
43+
if err != nil {
44+
logger.Error().Err(err).Str("endpoint", fullNode).Msg("failed to get full node block height")
45+
continue
46+
}
47+
48+
m.RecordCurrentBlockHeight(chainID, fullNode, currentHeight)
49+
m.RecordBlockHeightDrift(chainID, fullNode, refHeight, currentHeight)
50+
51+
drift := int64(refHeight) - int64(currentHeight)
52+
logger.Info().
53+
Uint64("ref_height", refHeight).
54+
Uint64("target_height", currentHeight).
55+
Int64("drift", drift).
56+
Str("endpoint", fullNode).
57+
Msg("recorded full node height and drift")
58+
}
59+
}
60+
}
61+
}
62+
63+
// getBlockHeight queries an EVM RPC endpoint for its current block height
64+
func getBlockHeight(ctx context.Context, rpcURL string) (uint64, error) {
65+
client, err := ethclient.DialContext(ctx, rpcURL)
66+
if err != nil {
67+
return 0, fmt.Errorf("failed to connect to %s: %w", rpcURL, err)
68+
}
69+
defer client.Close()
70+
71+
height, err := client.BlockNumber(ctx)
72+
if err != nil {
73+
return 0, fmt.Errorf("failed to get block number from %s: %w", rpcURL, err)
74+
}
75+
76+
return height, nil
77+
}

internal/metrics/metrics.go

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,18 @@ import (
1111

1212
// Metrics contains Prometheus metrics for DA verification failures
1313
type Metrics struct {
14-
// track ranges of unsubmitted blocks
15-
UnsubmittedRangeStart *prometheus.GaugeVec
16-
UnsubmittedRangeEnd *prometheus.GaugeVec
14+
// UnsubmittedRangeStart tracks the start of unsubmitted block ranges.
15+
UnsubmittedRangeStart *prometheus.GaugeVec
16+
// UnsubmittedRangeEnd tracks the end of unsubmitted block ranges.
17+
UnsubmittedRangeEnd *prometheus.GaugeVec
18+
// UnsubmittedBlocksTotal tracks the total number of unsubmitted blocks.
1719
UnsubmittedBlocksTotal *prometheus.GaugeVec
20+
// ReferenceBlockHeight tracks the block height for reference endpoint (sequencer).
21+
ReferenceBlockHeight *prometheus.GaugeVec
22+
// CurrentBlockHeight tracks the block height for target endpoints (operator nodes).
23+
CurrentBlockHeight *prometheus.GaugeVec
24+
// BlockHeightDrift tracks the drift between reference and target endpoints for a specific node.
25+
BlockHeightDrift *prometheus.GaugeVec
1826

1927
mu sync.Mutex
2028
ranges map[string][]*blockRange // key: blobType -> sorted slice of ranges
@@ -58,6 +66,30 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric
5866
},
5967
[]string{"chain_id", "blob_type"},
6068
),
69+
ReferenceBlockHeight: factory.NewGaugeVec(
70+
prometheus.GaugeOpts{
71+
Namespace: namespace,
72+
Name: "reference_block_height",
73+
Help: "current block height of the reference endpoint (sequencer)",
74+
},
75+
[]string{"chain_id", "endpoint"},
76+
),
77+
CurrentBlockHeight: factory.NewGaugeVec(
78+
prometheus.GaugeOpts{
79+
Namespace: namespace,
80+
Name: "target_block_height",
81+
Help: "current block height of target endpoints (operator nodes)",
82+
},
83+
[]string{"chain_id", "endpoint"},
84+
),
85+
BlockHeightDrift: factory.NewGaugeVec(
86+
prometheus.GaugeOpts{
87+
Namespace: namespace,
88+
Name: "block_height_drift",
89+
Help: "block height difference between reference and target endpoints (positive = target behind, negative = target ahead)",
90+
},
91+
[]string{"chain_id", "target_endpoint"},
92+
),
6193
ranges: make(map[string][]*blockRange),
6294
}
6395
}
@@ -248,3 +280,19 @@ func (m *Metrics) findRangeIndex(ranges []*blockRange, blockHeight uint64) int {
248280
func (m *Metrics) rangeID(r *blockRange) string {
249281
return fmt.Sprintf("%d-%d", r.start, r.end)
250282
}
283+
284+
// RecordReferenceBlockHeight records the current block height of the reference endpoint
285+
func (m *Metrics) RecordReferenceBlockHeight(chainID, endpoint string, height uint64) {
286+
m.ReferenceBlockHeight.WithLabelValues(chainID, endpoint).Set(float64(height))
287+
}
288+
289+
// RecordCurrentBlockHeight records the current block height of a target endpoint
290+
func (m *Metrics) RecordCurrentBlockHeight(chainID, endpoint string, height uint64) {
291+
m.CurrentBlockHeight.WithLabelValues(chainID, endpoint).Set(float64(height))
292+
}
293+
294+
// RecordBlockHeightDrift calculates and records the drift between reference and target
295+
func (m *Metrics) RecordBlockHeightDrift(chainID, targetEndpoint string, referenceHeight, targetHeight uint64) {
296+
drift := int64(referenceHeight) - int64(targetHeight)
297+
m.BlockHeightDrift.WithLabelValues(chainID, targetEndpoint).Set(float64(drift))
298+
}

0 commit comments

Comments
 (0)