-
Notifications
You must be signed in to change notification settings - Fork 1
Metrics for new grafana graphs #92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b6b1145
0959bb3
956a5a2
4194506
2b52e43
6830f70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ const ( | |
| type offsetState struct { | ||
| window []kafka.TopicPartition | ||
| lastCommitted kafka.Offset | ||
| topic *string | ||
| } | ||
|
|
||
| /* | ||
|
|
@@ -43,7 +44,7 @@ lastCommitted offset and those in the window to determine the highest offset to | |
| commit to Kafka brokers. | ||
|
|
||
| When threads are done processing consumed messages, they should call | ||
| InsertCommit() to add the commits to the window. | ||
| InsertOffset() to add the commits to the window. | ||
|
|
||
| The window length is unbounded, meaning code bugs can lead to no offsets | ||
| committed and a continually growing window size. If the offset window length | ||
|
|
@@ -93,6 +94,7 @@ func (om *OffsetManager) run( | |
| select { | ||
| case <-ticker.C: | ||
| om.commitLatestValidOffsets(dryRun) | ||
| om.recordConsumerGroupLag(dryRun) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks recordConsumerGroupLag() is a blocking call after commitLatestValidOffsets(). Shall we make recordConsumerGroupLag() a separate gorountine to process async? |
||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
|
|
@@ -162,11 +164,16 @@ func (om *OffsetManager) commitLatestValidOffsets(dryRun bool) { | |
| om.log.Debugf("committed offset %d for partition %d", window[end].Offset, partition) | ||
| if end == len(window)-1 { | ||
| om.partitionStates[partition] = &offsetState{ | ||
| []kafka.TopicPartition{}, | ||
| window[end].Offset, | ||
| window: []kafka.TopicPartition{}, | ||
| lastCommitted: window[end].Offset, | ||
| topic: state.topic, | ||
| } | ||
| } else { | ||
| om.partitionStates[partition] = &offsetState{window[end+1:], window[end].Offset} | ||
| om.partitionStates[partition] = &offsetState{ | ||
| window: window[end+1:], | ||
| lastCommitted: window[end].Offset, | ||
| topic: state.topic, | ||
| } | ||
| } | ||
|
|
||
| newWindowSize := len(om.partitionStates[partition].window) | ||
|
|
@@ -212,6 +219,9 @@ func (om *OffsetManager) InsertOffset(ctx context.Context, offset kafka.TopicPar | |
| om.log.Warnf("partition %d not found in partition states, ignoring", offset.Partition) | ||
| return nil | ||
| } | ||
| if state.topic == nil && offset.Topic != nil { | ||
| state.topic = offset.Topic | ||
| } | ||
|
|
||
| // If the lastCommitted offset is not initialized, we set it to the offset | ||
| // of the actual message that has been processed to generate this offset | ||
|
|
@@ -252,7 +262,6 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event | |
| partitionNums[i] = p.Partition | ||
| } | ||
|
|
||
| // Record metrics for assignment event | ||
| om.metrics.RecordPartitionAssignment(partitionNums) | ||
|
|
||
| // Rebalance events may provide offsets, but offsets seem to be | ||
|
|
@@ -276,6 +285,7 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event | |
| om.partitionStates[co.Partition] = &offsetState{ | ||
| window: []kafka.TopicPartition{}, | ||
| lastCommitted: co.Offset, | ||
| topic: co.Topic, | ||
| } | ||
|
|
||
| // If the group's stored offset is lower than the earliest offset of | ||
|
|
@@ -322,13 +332,13 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event | |
| partitionNums[i] = p.Partition | ||
| } | ||
|
|
||
| // Record metrics for revocation event | ||
| om.metrics.RecordPartitionRevocation(partitionNums) | ||
|
|
||
| logStr := make([]string, len(ev.Partitions)) | ||
| for i, partition := range ev.Partitions { | ||
| logStr[i] = strconv.Itoa(int(partition.Partition)) | ||
| delete(om.partitionStates, partition.Partition) | ||
| om.metrics.DeleteKafkaConsumerGroupLag(partition.Partition) | ||
| } | ||
| om.log.Infof("rebalance event, removing state for partitions: %s\n", strings.Join(logStr, ",")) | ||
| default: | ||
|
|
@@ -337,6 +347,76 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event | |
| return nil | ||
| } | ||
|
|
||
| // recordConsumerGroupLag queries committed offsets and partition watermarks from Kafka | ||
| // brokers to compute and record true consumer group lag for each assigned partition. | ||
| // When no committed offset exists (offset < 0), lag is estimated based on auto.offset.reset: | ||
| // zero for "latest", full partition range for "earliest" or unknown values. | ||
| // Skipped in dryRun mode or when metrics is nil. | ||
| func (om *OffsetManager) recordConsumerGroupLag(dryRun bool) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we define the gauge direclty in this package, it'd be easier to track down metrics definitions without having to jump out of this folder. This is clearly a private function which sole purpose is to record group lag. This is quite interesting to implement btw. Typical practice is to build dashboards and take metrics emitted directly from Kafka, but this makes dashboarding much more complicated. I like how you just included it here. However, I think 5 sec intervals is too aggressive and can generate a lot of unnecessary requests to the broker(s). How about every 30-60s? |
||
| if dryRun || om.metrics == nil { | ||
| return | ||
| } | ||
|
|
||
| om.mutex.Lock() | ||
| partitions := make([]kafka.TopicPartition, 0, len(om.partitionStates)) | ||
| for partition, state := range om.partitionStates { | ||
| if state.topic == nil { | ||
| continue | ||
| } | ||
| partitions = append(partitions, kafka.TopicPartition{ | ||
| Topic: state.topic, | ||
| Partition: partition, | ||
| }) | ||
| } | ||
| om.mutex.Unlock() | ||
|
|
||
| if len(partitions) == 0 { | ||
| return | ||
| } | ||
|
|
||
| committed, err := om.consumer.Committed(partitions, DefaultOffsetCommitTimeout) | ||
| if err != nil { | ||
| om.log.Warnf("failed to query committed offsets for lag metrics: %v", err) | ||
| om.metrics.IncError("lag_query_committed_offsets") | ||
| return | ||
| } | ||
|
|
||
| for _, partition := range committed { | ||
| if partition.Topic == nil { | ||
| continue | ||
| } | ||
|
|
||
| low, high, err := om.consumer.QueryWatermarkOffsets(*partition.Topic, partition.Partition, DefaultOffsetCommitTimeout) | ||
| if err != nil { | ||
| om.log.Warnf("failed to query watermark offsets for lag metrics (partition=%d): %v", partition.Partition, err) | ||
| om.metrics.IncError("lag_query_watermark_offsets") | ||
| continue | ||
| } | ||
|
|
||
| var lag int64 | ||
| if partition.Offset < 0 { | ||
| // No committed offset exists: effective lag depends on auto.offset.reset behavior. | ||
| switch strings.ToLower(om.autoOffsetReset) { | ||
| case "latest": | ||
| lag = 0 | ||
| case "earliest": | ||
| lag = high - low | ||
| default: | ||
| // Treat unknown auto.offset.reset values as "earliest" (conservative: assume maximum lag) | ||
| om.log.Warnf("unrecognized auto.offset.reset value %q, defaulting to earliest-style lag calculation", om.autoOffsetReset) | ||
| lag = high - low | ||
| } | ||
| } else { | ||
| lag = high - int64(partition.Offset) | ||
| } | ||
| if lag < 0 { | ||
| lag = 0 | ||
| } | ||
|
|
||
| om.metrics.SetKafkaConsumerGroupLag(partition.Partition, lag) | ||
| } | ||
| } | ||
|
|
||
| func (om *OffsetManager) InsertOffsetWithRetry( | ||
| ctx context.Context, | ||
| msg *kafka.Message, | ||
|
|
@@ -380,6 +460,7 @@ func (om *OffsetManager) getPartitionState(partition int32) *offsetState { | |
| return &offsetState{ | ||
| window: windowCopy, | ||
| lastCommitted: state.lastCommitted, | ||
| topic: state.topic, | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,8 +24,14 @@ var ( | |
| ErrUnmarshalBlock = errors.New("failed to unmarshal coreth block") | ||
| ) | ||
|
|
||
| const ( | ||
| clickHouseTableBlocks = "raw_blocks" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already have "raw_blocks" hardcoded in the flags.go. Recommend to define a variable and reuse it in this PR and also flags.go |
||
| clickHouseTableTransactions = "raw_transactions" | ||
| clickHouseTableLogs = "raw_logs" | ||
| ) | ||
|
|
||
| // CorethProcessor unmarshals and logs Coreth blocks from Kafka messages. | ||
| // If repositories are provided, persists blocks and transactions to ClickHouse. | ||
| // If repositories are provided, persists blocks, transactions, and logs to ClickHouse. | ||
| // Safe for concurrent use. | ||
| type CorethProcessor struct { | ||
| log *zap.SugaredLogger | ||
|
|
@@ -36,7 +42,7 @@ type CorethProcessor struct { | |
| } | ||
|
|
||
| // NewCorethProcessor creates a new CorethProcessor with the given logger. | ||
| // If repositories are provided, blocks and transactions will be persisted to ClickHouse. | ||
| // If repositories are provided, blocks, transactions, and logs will be persisted to ClickHouse. | ||
| func NewCorethProcessor( | ||
| log *zap.SugaredLogger, | ||
| blocksRepo evmrepo.Blocks, | ||
|
|
@@ -90,7 +96,10 @@ func (p *CorethProcessor) Process(ctx context.Context, msg *cKafka.Message) erro | |
| return fmt.Errorf("failed to parse block for storage: %w", err) | ||
| } | ||
|
|
||
| if err := p.blocksRepo.WriteBlock(ctx, blockRow); err != nil { | ||
| writeStart := time.Now() | ||
| err = p.blocksRepo.WriteBlock(ctx, blockRow) | ||
| p.metrics.RecordClickHouseWrite(clickHouseTableBlocks, err, time.Since(writeStart).Seconds()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we grow our metrics plumbing, I am seeing us fall into the old trap we got ourselves in with analytics. It's starting to get confusing how to figure out where metrics functions are. Ideally, we have a shared pkg/metrics which has the most generic reused stuff, including very common labels like recordClickHouseWrite(p.metrics, err, time.Since(writeStart).Seconds()) where recordClickhouseWrite() is a private function to this This will address the scattered constants we're already seeing, such as the Can we make this change? |
||
| if err != nil { | ||
| p.metrics.IncError("coreth_write_error") | ||
| return fmt.Errorf("failed to write block to ClickHouse: %w", err) | ||
| } | ||
|
|
@@ -117,13 +126,14 @@ func (p *CorethProcessor) Process(ctx context.Context, msg *cKafka.Message) erro | |
| } | ||
| } | ||
|
|
||
| // Record successful processing duration | ||
| // Record successful end-to-end processing duration (block + transactions + logs) | ||
| p.metrics.ObserveBlockProcessingDuration(time.Since(start).Seconds()) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // CorethBlockToBlockRow converts a kafkamsg.CorethBlock to BlockRow | ||
| // Exported for testing purposes | ||
| // CorethBlockToBlockRow converts a kafkamsg.EVMBlock to BlockRow. | ||
| // Exported for testing purposes. | ||
| func CorethBlockToBlockRow(block *kafkamsg.EVMBlock) (*evmrepo.BlockRow, error) { | ||
| // Validate blockchain ID | ||
| if block.BlockchainID == nil { | ||
|
|
@@ -208,8 +218,8 @@ func CorethBlockToBlockRow(block *kafkamsg.EVMBlock) (*evmrepo.BlockRow, error) | |
| return blockRow, nil | ||
| } | ||
|
|
||
| // CorethTransactionToTransactionRow converts a coreth.Transaction to TransactionRow | ||
| // Exported for testing purposes | ||
| // CorethTransactionToTransactionRow converts a kafkamsg.EVMTransaction to TransactionRow. | ||
| // Exported for testing purposes. | ||
| func CorethTransactionToTransactionRow( | ||
| tx *kafkamsg.EVMTransaction, | ||
| block *kafkamsg.EVMBlock, | ||
|
|
@@ -302,7 +312,10 @@ func (p *CorethProcessor) processTransactions( | |
| return fmt.Errorf("failed to convert transaction %d: %w", i, err) | ||
| } | ||
|
|
||
| if err := p.txsRepo.WriteTransaction(ctx, txRow); err != nil { | ||
| writeStart := time.Now() | ||
| err = p.txsRepo.WriteTransaction(ctx, txRow) | ||
| p.metrics.RecordClickHouseWrite(clickHouseTableTransactions, err, time.Since(writeStart).Seconds()) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to write transaction %s: %w", tx.Hash, err) | ||
| } | ||
|
|
||
|
|
@@ -312,7 +325,6 @@ func (p *CorethProcessor) processTransactions( | |
| } | ||
| } | ||
|
|
||
| // Record logs processed metric | ||
| p.metrics.AddLogsProcessed(totalLogs) | ||
|
|
||
| var blockNumber uint64 | ||
|
|
@@ -348,7 +360,10 @@ func (p *CorethProcessor) processLogs( | |
| return fmt.Errorf("failed to convert log: %w", err) | ||
| } | ||
|
|
||
| if err := p.logsRepo.WriteLog(ctx, logRow); err != nil { | ||
| writeStart := time.Now() | ||
| err = p.logsRepo.WriteLog(ctx, logRow) | ||
| p.metrics.RecordClickHouseWrite(clickHouseTableLogs, err, time.Since(writeStart).Seconds()) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to write log (tx: %s, index: %d): %w", tx.Hash, log.Index, err) | ||
| } | ||
| totalLogs++ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example of a naked constant here. Metrics helper functions would be cleaner.