Conversation
There was a problem hiding this comment.
Pull request overview
This pull request adds comprehensive metrics instrumentation to support new Grafana monitoring dashboards. The changes introduce producer metrics, retry/failure metrics, true Kafka consumer lag tracking, message size tracking, and ClickHouse write metrics across the indexing pipeline.
Changes:
- Added producer-side metrics for Kafka message publishing with error classification
- Added retry and failure tracking metrics in the sliding window manager with stage-specific counters
- Implemented true consumer group lag tracking using Kafka broker watermark offsets
- Added Kafka message size histograms for both produced and consumed messages
- Added ClickHouse write metrics with per-table duration and status tracking
- Refactored metrics nil-checking pattern to rely on receiver nil checks rather than caller checks
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/metrics/metrics.go | Added new metric definitions (producer, retry/failure, consumer lag, message size, ClickHouse writes), NewNoOp() helper, and error classification logic |
| pkg/slidingwindow/worker/subnet_evm.go | Added producer metrics, block-to-publish latency tracking, message size observation, removed nil checks |
| pkg/slidingwindow/worker/coreth.go | Added producer metrics, block-to-publish latency tracking, message size observation, removed nil checks, updated comments |
| pkg/slidingwindow/manager.go | Added stage parameter to handleFailure, integrated retry/failure metrics, removed nil checks |
| pkg/kafka/offset_manager.go | Added topic tracking to offsetState, implemented recordConsumerGroupLag for true lag metrics, cleanup lag metrics on partition revocation |
| pkg/kafka/consumer.go | Added message size observation for consumed messages |
| pkg/kafka/processor/coreth.go | Added ClickHouse write metrics per table, moved processing duration recording to measure full pipeline, updated comments |
| pkg/metrics/dashboards/avalanche-indexer-metrics-template.json | Added comprehensive Grafana dashboard with panels for all new metrics |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: J <github@bouncer.mozmail.com>
| if cw.metrics != nil { | ||
| cw.metrics.RecordReceiptFetch(err, receiptDuration.Seconds(), 0) | ||
| } | ||
| cw.metrics.RecordReceiptFetch(err, receiptDuration.Seconds(), 0) |
There was a problem hiding this comment.
Curious, why are we using seconds here instead of ms?
| if err := p.blocksRepo.WriteBlock(ctx, blockRow); err != nil { | ||
| writeStart := time.Now() | ||
| err = p.blocksRepo.WriteBlock(ctx, blockRow) | ||
| p.metrics.RecordClickHouseWrite(clickHouseTableBlocks, err, time.Since(writeStart).Seconds()) |
There was a problem hiding this comment.
As we grow our metrics plumbing, I am seeing us fall into the old trap we got ourselves in with analytics. It's starting to get confusing how to figure out where metrics functions are. Ideally, we have a shared pkg/metrics which has the most generic reused stuff, including very common labels like chain_id or mainnetOrTestnet. As for things like RecordClickhouseWrite, I think we should be colocating these helper functions. One way to do this would be something like
recordClickHouseWrite(p.metrics, err, time.Since(writeStart).Seconds())
where recordClickhouseWrite() is a private function to this processor pkg.
This will address the scattered constants we're already seeing, such as the clickHouseTable* consts above. If this breaks avalanchego's practice, I would say this is a time where I think avalanchego did it wrong.
Can we make this change?
| // On successful processing, commits offset. On failure, publishes to DLQ (if configured) before committing. | ||
| func (c *Consumer) dispatch(ctx context.Context, msg *cKafka.Message) { | ||
| if msg != nil { | ||
| c.metrics.ObserveKafkaMessageSize("consumed", len(msg.Value)) |
There was a problem hiding this comment.
Another example of a naked constant here. Metrics helper functions would be cleaner.
| // When no committed offset exists (offset < 0), lag is estimated based on auto.offset.reset: | ||
| // zero for "latest", full partition range for "earliest" or unknown values. | ||
| // Skipped in dryRun mode or when metrics is nil. | ||
| func (om *OffsetManager) recordConsumerGroupLag(dryRun bool) { |
There was a problem hiding this comment.
If we define the gauge direclty in this package, it'd be easier to track down metrics definitions without having to jump out of this folder. This is clearly a private function which sole purpose is to record group lag. This is quite interesting to implement btw. Typical practice is to build dashboards and take metrics emitted directly from Kafka, but this makes dashboarding much more complicated. I like how you just included it here. However, I think 5 sec intervals is too aggressive and can generate a lot of unnecessary requests to the broker(s). How about every 30-60s?
| }), | ||
| producerMessages: prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: Namespace, | ||
| Subsystem: "producer", |
There was a problem hiding this comment.
Since we use "producer" multiple places, recommend to move it to a variable for reuse.
| } | ||
| m.log.Debugw("failed processing block height", "height", h, "error", err) | ||
| m.handleFailure(h) | ||
| m.handleFailure(h, "process") |
There was a problem hiding this comment.
Looks "process" is a label value. Recommended to move it to metrics.go, so we have centralized label values for metrics.
| select { | ||
| case <-ticker.C: | ||
| om.commitLatestValidOffsets(dryRun) | ||
| om.recordConsumerGroupLag(dryRun) |
There was a problem hiding this comment.
Looks recordConsumerGroupLag() is a blocking call after commitLatestValidOffsets(). Shall we make recordConsumerGroupLag() a separate gorountine to process async?
| status = StatusError | ||
| } | ||
| m.clickHouseWrites.WithLabelValues(table, status).Inc() | ||
| m.clickHouseWriteDuration.WithLabelValues(table).Observe(durationSeconds) |
There was a problem hiding this comment.
Do we also want to add status to indicate success/failure for clickHouseWriteDuration?
| ) | ||
|
|
||
| const ( | ||
| clickHouseTableBlocks = "raw_blocks" |
There was a problem hiding this comment.
We already have "raw_blocks" hardcoded in the flags.go. Recommend to define a variable and reuse it in this PR and also flags.go
| } | ||
|
|
||
| cw.log.Debugw("block serialized, producing to kafka", "height", height, "bytes", len(bytes)) | ||
| cw.metrics.ObserveKafkaMessageSize("produced", len(bytes)) |
There was a problem hiding this comment.
Recommend to move "produced" to label variable in metrics.go
| } | ||
| m.log.Warnw("failed to mark processed", "height", h, "error", err) | ||
| m.handleFailure(h) | ||
| m.handleFailure(h, "mark_processed") |
There was a problem hiding this comment.
Recommend to move "mark_processed" to a constant variable in metrics.go
Why this should be merged
Support the following requested metrics:
How this works
Adds new metrics logging in different stages of our pipeline.
How this was tested
Local setup
Need to be documented in RELEASES.md?
No