diff --git a/internal/drift/drift.go b/internal/drift/drift.go index b82800e..955c855 100644 --- a/internal/drift/drift.go +++ b/internal/drift/drift.go @@ -3,6 +3,8 @@ package drift import ( "context" "fmt" + "strings" + "github.com/01builders/ev-metrics/internal/metrics" "github.com/ethereum/go-ethereum/ethclient" "github.com/rs/zerolog" @@ -31,10 +33,15 @@ func Monitor(ctx context.Context, m *metrics.Metrics, chainID string, referenceN refHeight, err := getBlockHeight(ctx, referenceNode) if err != nil { logger.Error().Err(err).Str("endpoint", referenceNode).Msg("failed to get reference node block height") + // Record error for reference node + errorType := classifyError(err) + m.RecordEndpointError(chainID, referenceNode, errorType) + m.RecordEndpointAvailability(chainID, referenceNode, false) continue } m.RecordReferenceBlockHeight(chainID, referenceNode, refHeight) + m.RecordEndpointAvailability(chainID, referenceNode, true) logger.Info().Uint64("height", refHeight).Str("endpoint", referenceNode).Msg("recorded reference node height") // get each full node height and calculate drift @@ -42,11 +49,16 @@ func Monitor(ctx context.Context, m *metrics.Metrics, chainID string, referenceN currentHeight, err := getBlockHeight(ctx, fullNode) if err != nil { logger.Error().Err(err).Str("endpoint", fullNode).Msg("failed to get full node block height") + // Record error for full node + errorType := classifyError(err) + m.RecordEndpointError(chainID, fullNode, errorType) + m.RecordEndpointAvailability(chainID, fullNode, false) continue } m.RecordCurrentBlockHeight(chainID, fullNode, currentHeight) m.RecordBlockHeightDrift(chainID, fullNode, refHeight, currentHeight) + m.RecordEndpointAvailability(chainID, fullNode, true) drift := int64(refHeight) - int64(currentHeight) logger.Info(). @@ -75,3 +87,50 @@ func getBlockHeight(ctx context.Context, rpcURL string) (uint64, error) { return height, nil } + +// classifyError categorizes errors into specific types for metrics +func classifyError(err error) string { + if err == nil { + return "none" + } + + errStr := strings.ToLower(err.Error()) + + // Check for connection refused + if strings.Contains(errStr, "connection refused") { + return "connection_refused" + } + + // Check for timeout errors + if strings.Contains(errStr, "timeout") || strings.Contains(errStr, "deadline exceeded") { + return "timeout" + } + + // Check for DNS/host resolution errors + if strings.Contains(errStr, "no such host") || strings.Contains(errStr, "dns") { + return "dns_error" + } + + // Check for network unreachable + if strings.Contains(errStr, "network is unreachable") || strings.Contains(errStr, "no route to host") { + return "network_unreachable" + } + + // Check for HTTP status errors + if strings.Contains(errStr, "status code") { + return "http_error" + } + + // Check for EOF errors + if strings.Contains(errStr, "eof") { + return "eof" + } + + // Check for TLS/certificate errors + if strings.Contains(errStr, "tls") || strings.Contains(errStr, "certificate") { + return "tls_error" + } + + // Default to unknown error type + return "unknown" +} diff --git a/internal/evm/client.go b/internal/evm/client.go index f0c368b..3721f94 100644 --- a/internal/evm/client.go +++ b/internal/evm/client.go @@ -58,6 +58,11 @@ func NewClient(ctx context.Context, wsURL, rpcURL string, logger zerolog.Logger) // } //} +// GetRPCURL returns the RPC URL of the client +func (c *Client) GetRPCURL() string { + return c.rpcURL +} + // HealthCheckRequest performs a lightweight JSON-RPC health check and returns the RTT duration func (c *Client) HealthCheckRequest(ctx context.Context) (time.Duration, error) { // Create the JSON-RPC request for eth_blockNumber diff --git a/internal/jsonrpc/monitor.go b/internal/jsonrpc/monitor.go index e96bcaf..eb34e3d 100644 --- a/internal/jsonrpc/monitor.go +++ b/internal/jsonrpc/monitor.go @@ -2,6 +2,7 @@ package jsonrpc import ( "context" + "strings" "time" "github.com/01builders/ev-metrics/internal/evm" @@ -24,9 +25,6 @@ func Monitor( Int("scrape_interval_seconds", scrapeInterval). Msg("starting JSON-RPC health monitoring") - // Initialize SLO threshold gauges once at startup - m.InitializeJsonRpcSloThresholds(chainID) - ticker := time.NewTicker(time.Duration(scrapeInterval) * time.Second) defer ticker.Stop() @@ -57,9 +55,18 @@ func performHealthCheck( ) error { duration, err := evmClient.HealthCheckRequest(ctx) if err != nil { + // Record endpoint as unreachable + m.RecordEndpointAvailability(chainID, evmClient.GetRPCURL(), false) + + // Classify and record the error type + errorType := classifyError(err) + m.RecordEndpointError(chainID, evmClient.GetRPCURL(), errorType) + return err } + // Record endpoint as reachable + m.RecordEndpointAvailability(chainID, evmClient.GetRPCURL(), true) m.RecordJsonRpcRequestDuration(chainID, duration) logger.Info(). @@ -69,3 +76,50 @@ func performHealthCheck( return nil } + +// classifyError categorizes errors into specific types for metrics +func classifyError(err error) string { + if err == nil { + return "none" + } + + errStr := strings.ToLower(err.Error()) + + // Check for connection refused + if strings.Contains(errStr, "connection refused") { + return "connection_refused" + } + + // Check for timeout errors + if strings.Contains(errStr, "timeout") || strings.Contains(errStr, "deadline exceeded") { + return "timeout" + } + + // Check for DNS/host resolution errors + if strings.Contains(errStr, "no such host") || strings.Contains(errStr, "dns") { + return "dns_error" + } + + // Check for network unreachable + if strings.Contains(errStr, "network is unreachable") || strings.Contains(errStr, "no route to host") { + return "network_unreachable" + } + + // Check for HTTP status errors + if strings.Contains(errStr, "status code") { + return "http_error" + } + + // Check for EOF errors + if strings.Contains(errStr, "eof") { + return "eof" + } + + // Check for TLS/certificate errors + if strings.Contains(errStr, "tls") || strings.Contains(errStr, "certificate") { + return "tls_error" + } + + // Default to unknown error type + return "unknown" +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 0e0b8a2..d201173 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -28,12 +28,22 @@ type Metrics struct { SubmissionDuration *prometheus.SummaryVec // SubmissionDaHeight tracks the DA height at which blocks were submitted. SubmissionDaHeight *prometheus.GaugeVec - // BlockTime tracks the time between consecutive blocks over a rolling window. - BlockTime *prometheus.SummaryVec - // JsonRpcRequestDuration tracks the duration of JSON-RPC requests to the EVM node. + // BlockTime tracks the time between consecutive blocks with histogram buckets for accurate SLO calculations. + BlockTime *prometheus.HistogramVec + // BlockTimeSummary tracks block time quantiles over a rolling window. + BlockTimeSummary *prometheus.SummaryVec + // BlockReceiveDelay tracks the delay between block creation and reception with histogram buckets. + BlockReceiveDelay *prometheus.HistogramVec + // BlockReceiveDelaySummary tracks block receive delay quantiles over a rolling window. + BlockReceiveDelaySummary *prometheus.SummaryVec + // JsonRpcRequestDuration tracks the duration of JSON-RPC requests to the EVM node (histogram for detailed buckets). JsonRpcRequestDuration *prometheus.HistogramVec - // JsonRpcRequestSloSeconds exports constant SLO thresholds for JSON-RPC requests. - JsonRpcRequestSloSeconds *prometheus.GaugeVec + // JsonRpcRequestDurationSummary tracks JSON-RPC request duration with percentiles over a rolling window. + JsonRpcRequestDurationSummary *prometheus.SummaryVec + // EndpointAvailability tracks whether an endpoint is reachable (1 = reachable, 0 = unreachable). + EndpointAvailability *prometheus.GaugeVec + // EndpointErrors tracks connection errors with error reasons. + EndpointErrors *prometheus.CounterVec // internal tracking to ensure we only record increasing DA heights latestHeaderDaHeight uint64 @@ -132,18 +142,90 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric }, []string{"chain_id", "type"}, ), - BlockTime: factory.NewSummaryVec( - prometheus.SummaryOpts{ + BlockTime: factory.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: namespace, Name: "block_time_seconds", - Help: "time between consecutive blocks over rolling window", + Help: "time between consecutive blocks with histogram buckets for accurate SLO calculations", + Buckets: []float64{ + 0.1, // 100ms + 0.15, // 150ms + 0.2, // 200ms + 0.21, // 210ms + 0.22, // 220ms + 0.225, // 225ms + 0.23, // 230ms + 0.24, // 240ms + 0.25, // 250ms + 0.26, // 260ms + 0.27, // 270ms + 0.275, // 275ms + 0.28, // 280ms + 0.29, // 290ms + 0.3, // 300ms + 0.35, // 350ms + 0.4, // 400ms + 0.5, // 500ms + 1.0, // 1s + 2.0, // 2s + }, + }, + []string{"chain_id"}, + ), + BlockTimeSummary: factory.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "block_time_summary_seconds", + Help: "block time quantiles over a rolling window", Objectives: map[float64]float64{ - 0.5: 0.05, // median block time - 0.9: 0.01, // p90 - 0.99: 0.01, // p99 + 0.5: 0.05, + 0.9: 0.01, + 0.95: 0.01, + 0.99: 0.001, }, - MaxAge: 5 * time.Minute, - AgeBuckets: 5, + MaxAge: 60 * time.Second, + AgeBuckets: 6, + }, + []string{"chain_id"}, + ), + BlockReceiveDelay: factory.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "block_receive_delay_seconds", + Help: "delay between block creation timestamp and reception time", + Buckets: []float64{ + 0.1, // 100ms + 0.15, // 150ms + 0.2, // 200ms + 0.25, // 250ms + 0.3, // 300ms + 0.35, // 350ms + 0.4, // 400ms + 0.5, // 500ms + 0.75, // 750ms + 1.0, // 1s + 1.5, // 1.5s + 2.0, // 2s + 3.0, // 3s + 5.0, // 5s + 10.0, // 10s + }, + }, + []string{"chain_id"}, + ), + BlockReceiveDelaySummary: factory.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "block_receive_delay_summary_seconds", + Help: "block receive delay quantiles over a rolling window", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.95: 0.01, + 0.99: 0.001, + }, + MaxAge: 60 * time.Second, + AgeBuckets: 6, }, []string{"chain_id"}, ), @@ -156,13 +238,37 @@ func NewWithRegistry(namespace string, registerer prometheus.Registerer) *Metric }, []string{"chain_id"}, ), - JsonRpcRequestSloSeconds: factory.NewGaugeVec( + JsonRpcRequestDurationSummary: factory.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "jsonrpc_request_duration_summary_seconds", + Help: "JSON-RPC request duration percentiles over rolling window", + Objectives: map[float64]float64{ + 0.5: 0.05, // p50 (median) ±5% + 0.9: 0.01, // p90 ±1% + 0.95: 0.01, // p95 ±1% + 0.99: 0.001, // p99 ±0.1% + }, + MaxAge: 2 * time.Minute, // rolling window of 2 minutes + AgeBuckets: 5, + }, + []string{"chain_id"}, + ), + EndpointAvailability: factory.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, - Name: "jsonrpc_request_slo_seconds", - Help: "SLO thresholds for JSON-RPC request duration", + Name: "endpoint_availability", + Help: "endpoint availability status (1 = reachable, 0 = unreachable)", + }, + []string{"chain_id", "endpoint"}, + ), + EndpointErrors: factory.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "endpoint_errors_total", + Help: "total number of endpoint connection errors by error type", }, - []string{"chain_id", "percentile"}, + []string{"chain_id", "endpoint", "error_type"}, ), ranges: make(map[string][]*blockRange), lastBlockArrivalTime: make(map[string]time.Time), @@ -411,7 +517,9 @@ func (m *Metrics) RecordBlockTime(chainID string, arrivalTime time.Time) { blockTime := arrivalTime.Sub(lastArrival) // only record positive durations if blockTime > 0 { - m.BlockTime.WithLabelValues(chainID).Observe(blockTime.Seconds()) + seconds := blockTime.Seconds() + m.BlockTime.WithLabelValues(chainID).Observe(seconds) + m.BlockTimeSummary.WithLabelValues(chainID).Observe(seconds) } } @@ -421,13 +529,34 @@ func (m *Metrics) RecordBlockTime(chainID string, arrivalTime time.Time) { // RecordJsonRpcRequestDuration records the duration of a JSON-RPC request func (m *Metrics) RecordJsonRpcRequestDuration(chainID string, duration time.Duration) { - m.JsonRpcRequestDuration.WithLabelValues(chainID).Observe(duration.Seconds()) + seconds := duration.Seconds() + m.JsonRpcRequestDuration.WithLabelValues(chainID).Observe(seconds) + m.JsonRpcRequestDurationSummary.WithLabelValues(chainID).Observe(seconds) } -// InitializeJsonRpcSloThresholds initializes the constant SLO threshold gauges -func (m *Metrics) InitializeJsonRpcSloThresholds(chainID string) { - m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "p50").Set(0.2) - m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "p90").Set(0.35) - m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "p95").Set(0.4) - m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "p99").Set(0.5) +// RecordEndpointAvailability records whether an endpoint is reachable +// available should be true if endpoint is reachable, false otherwise +func (m *Metrics) RecordEndpointAvailability(chainID, endpoint string, available bool) { + value := 0.0 + if available { + value = 1.0 + } + m.EndpointAvailability.WithLabelValues(chainID, endpoint).Set(value) +} + +// RecordEndpointError records an endpoint connection error with its type +func (m *Metrics) RecordEndpointError(chainID, endpoint, errorType string) { + m.EndpointErrors.WithLabelValues(chainID, endpoint, errorType).Inc() +} + +// RecordBlockReceiveDelay records the delay between block creation and reception +// blockTimestamp is the block's creation timestamp, receiveTime is when the block was received +func (m *Metrics) RecordBlockReceiveDelay(chainID string, blockTimestamp, receiveTime time.Time) { + delay := receiveTime.Sub(blockTimestamp) + // only record positive delays + if delay > 0 { + seconds := delay.Seconds() + m.BlockReceiveDelay.WithLabelValues(chainID).Observe(seconds) + m.BlockReceiveDelaySummary.WithLabelValues(chainID).Observe(seconds) + } } diff --git a/mise.toml b/mise.toml new file mode 100644 index 0000000..b55f8c1 --- /dev/null +++ b/mise.toml @@ -0,0 +1,2 @@ +[tools] +go = "latest"