Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (c *Consumer) Start(ctx context.Context) error {
// acquisition, the message is dropped (will be reprocessed after rebalance).
// On successful processing, commits offset. On failure, publishes to DLQ (if configured) before committing.
func (c *Consumer) dispatch(ctx context.Context, msg *cKafka.Message) {
if msg != nil {
c.metrics.ObserveKafkaMessageSize("consumed", len(msg.Value))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another example of a naked constant here. Metrics helper functions would be cleaner.

}

if err := c.sem.Acquire(ctx, 1); err != nil {
return
}
Expand Down
93 changes: 87 additions & 6 deletions pkg/kafka/offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
type offsetState struct {
window []kafka.TopicPartition
lastCommitted kafka.Offset
topic *string
}

/*
Expand All @@ -43,7 +44,7 @@ lastCommitted offset and those in the window to determine the highest offset to
commit to Kafka brokers.

When threads are done processing consumed messages, they should call
InsertCommit() to add the commits to the window.
InsertOffset() to add the commits to the window.

The window length is unbounded, meaning code bugs can lead to no offsets
committed and a continually growing window size. If the offset window length
Expand Down Expand Up @@ -93,6 +94,7 @@ func (om *OffsetManager) run(
select {
case <-ticker.C:
om.commitLatestValidOffsets(dryRun)
om.recordConsumerGroupLag(dryRun)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks recordConsumerGroupLag() is a blocking call after commitLatestValidOffsets(). Shall we make recordConsumerGroupLag() a separate gorountine to process async?

case <-ctx.Done():
return
}
Expand Down Expand Up @@ -162,11 +164,16 @@ func (om *OffsetManager) commitLatestValidOffsets(dryRun bool) {
om.log.Debugf("committed offset %d for partition %d", window[end].Offset, partition)
if end == len(window)-1 {
om.partitionStates[partition] = &offsetState{
[]kafka.TopicPartition{},
window[end].Offset,
window: []kafka.TopicPartition{},
lastCommitted: window[end].Offset,
topic: state.topic,
}
} else {
om.partitionStates[partition] = &offsetState{window[end+1:], window[end].Offset}
om.partitionStates[partition] = &offsetState{
window: window[end+1:],
lastCommitted: window[end].Offset,
topic: state.topic,
}
}

newWindowSize := len(om.partitionStates[partition].window)
Expand Down Expand Up @@ -212,6 +219,9 @@ func (om *OffsetManager) InsertOffset(ctx context.Context, offset kafka.TopicPar
om.log.Warnf("partition %d not found in partition states, ignoring", offset.Partition)
return nil
}
if state.topic == nil && offset.Topic != nil {
state.topic = offset.Topic
}

// If the lastCommitted offset is not initialized, we set it to the offset
// of the actual message that has been processed to generate this offset
Expand Down Expand Up @@ -252,7 +262,6 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event
partitionNums[i] = p.Partition
}

// Record metrics for assignment event
om.metrics.RecordPartitionAssignment(partitionNums)

// Rebalance events may provide offsets, but offsets seem to be
Expand All @@ -276,6 +285,7 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event
om.partitionStates[co.Partition] = &offsetState{
window: []kafka.TopicPartition{},
lastCommitted: co.Offset,
topic: co.Topic,
}

// If the group's stored offset is lower than the earliest offset of
Expand Down Expand Up @@ -322,13 +332,13 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event
partitionNums[i] = p.Partition
}

// Record metrics for revocation event
om.metrics.RecordPartitionRevocation(partitionNums)

logStr := make([]string, len(ev.Partitions))
for i, partition := range ev.Partitions {
logStr[i] = strconv.Itoa(int(partition.Partition))
delete(om.partitionStates, partition.Partition)
om.metrics.DeleteKafkaConsumerGroupLag(partition.Partition)
}
om.log.Infof("rebalance event, removing state for partitions: %s\n", strings.Join(logStr, ","))
default:
Expand All @@ -337,6 +347,76 @@ func (om *OffsetManager) RebalanceCb(consumer *kafka.Consumer, event kafka.Event
return nil
}

// recordConsumerGroupLag queries committed offsets and partition watermarks from Kafka
// brokers to compute and record true consumer group lag for each assigned partition.
// When no committed offset exists (offset < 0), lag is estimated based on auto.offset.reset:
// zero for "latest", full partition range for "earliest" or unknown values.
// Skipped in dryRun mode or when metrics is nil.
func (om *OffsetManager) recordConsumerGroupLag(dryRun bool) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we define the gauge direclty in this package, it'd be easier to track down metrics definitions without having to jump out of this folder. This is clearly a private function which sole purpose is to record group lag. This is quite interesting to implement btw. Typical practice is to build dashboards and take metrics emitted directly from Kafka, but this makes dashboarding much more complicated. I like how you just included it here. However, I think 5 sec intervals is too aggressive and can generate a lot of unnecessary requests to the broker(s). How about every 30-60s?

if dryRun || om.metrics == nil {
return
}

om.mutex.Lock()
partitions := make([]kafka.TopicPartition, 0, len(om.partitionStates))
for partition, state := range om.partitionStates {
if state.topic == nil {
continue
}
partitions = append(partitions, kafka.TopicPartition{
Topic: state.topic,
Partition: partition,
})
}
om.mutex.Unlock()

if len(partitions) == 0 {
return
}

committed, err := om.consumer.Committed(partitions, DefaultOffsetCommitTimeout)
if err != nil {
om.log.Warnf("failed to query committed offsets for lag metrics: %v", err)
om.metrics.IncError("lag_query_committed_offsets")
return
}

for _, partition := range committed {
if partition.Topic == nil {
continue
}

low, high, err := om.consumer.QueryWatermarkOffsets(*partition.Topic, partition.Partition, DefaultOffsetCommitTimeout)
if err != nil {
om.log.Warnf("failed to query watermark offsets for lag metrics (partition=%d): %v", partition.Partition, err)
om.metrics.IncError("lag_query_watermark_offsets")
continue
}

var lag int64
if partition.Offset < 0 {
// No committed offset exists: effective lag depends on auto.offset.reset behavior.
switch strings.ToLower(om.autoOffsetReset) {
case "latest":
lag = 0
case "earliest":
lag = int64(high - low)
default:
// Treat unknown auto.offset.reset values as "earliest" (conservative: assume maximum lag)
om.log.Warnf("unrecognized auto.offset.reset value %q, defaulting to earliest-style lag calculation", om.autoOffsetReset)
lag = int64(high - low)
}
} else {
lag = int64(high) - int64(partition.Offset)
}
if lag < 0 {
lag = 0
}

om.metrics.SetKafkaConsumerGroupLag(partition.Partition, lag)
}
}

func (om *OffsetManager) InsertOffsetWithRetry(
ctx context.Context,
msg *kafka.Message,
Expand Down Expand Up @@ -380,6 +460,7 @@ func (om *OffsetManager) getPartitionState(partition int32) *offsetState {
return &offsetState{
window: windowCopy,
lastCommitted: state.lastCommitted,
topic: state.topic,
}
}

Expand Down
39 changes: 27 additions & 12 deletions pkg/kafka/processor/coreth.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ var (
ErrUnmarshalBlock = errors.New("failed to unmarshal coreth block")
)

const (
clickHouseTableBlocks = "raw_blocks"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have "raw_blocks" hardcoded in the flags.go. Recommend to define a variable and reuse it in this PR and also flags.go

clickHouseTableTransactions = "raw_transactions"
clickHouseTableLogs = "raw_logs"
)

// CorethProcessor unmarshals and logs Coreth blocks from Kafka messages.
// If repositories are provided, persists blocks and transactions to ClickHouse.
// If repositories are provided, persists blocks, transactions, and logs to ClickHouse.
// Safe for concurrent use.
type CorethProcessor struct {
log *zap.SugaredLogger
Expand All @@ -36,7 +42,7 @@ type CorethProcessor struct {
}

// NewCorethProcessor creates a new CorethProcessor with the given logger.
// If repositories are provided, blocks and transactions will be persisted to ClickHouse.
// If repositories are provided, blocks, transactions, and logs will be persisted to ClickHouse.
func NewCorethProcessor(
log *zap.SugaredLogger,
blocksRepo evmrepo.Blocks,
Expand Down Expand Up @@ -90,7 +96,10 @@ func (p *CorethProcessor) Process(ctx context.Context, msg *cKafka.Message) erro
return fmt.Errorf("failed to parse block for storage: %w", err)
}

if err := p.blocksRepo.WriteBlock(ctx, blockRow); err != nil {
writeStart := time.Now()
err = p.blocksRepo.WriteBlock(ctx, blockRow)
p.metrics.RecordClickHouseWrite(clickHouseTableBlocks, err, time.Since(writeStart).Seconds())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we grow our metrics plumbing, I am seeing us fall into the old trap we got ourselves in with analytics. It's starting to get confusing how to figure out where metrics functions are. Ideally, we have a shared pkg/metrics which has the most generic reused stuff, including very common labels like chain_id or mainnetOrTestnet. As for things like RecordClickhouseWrite, I think we should be colocating these helper functions. One way to do this would be something like

recordClickHouseWrite(p.metrics, err, time.Since(writeStart).Seconds())

where recordClickhouseWrite() is a private function to this processor pkg.

This will address the scattered constants we're already seeing, such as the clickHouseTable* consts above. If this breaks avalanchego's practice, I would say this is a time where I think avalanchego did it wrong.

Can we make this change?

if err != nil {
p.metrics.IncError("coreth_write_error")
return fmt.Errorf("failed to write block to ClickHouse: %w", err)
}
Expand All @@ -103,8 +112,6 @@ func (p *CorethProcessor) Process(ctx context.Context, msg *cKafka.Message) erro
)
}

// Record successful processing duration
p.metrics.ObserveBlockProcessingDuration(time.Since(start).Seconds())
// Persist transactions to ClickHouse if repository is configured
if p.txsRepo != nil && len(block.Transactions) > 0 {
if err := p.processTransactions(ctx, &block); err != nil {
Expand All @@ -119,11 +126,14 @@ func (p *CorethProcessor) Process(ctx context.Context, msg *cKafka.Message) erro
}
}

// Record successful end-to-end processing duration (block + transactions + logs)
p.metrics.ObserveBlockProcessingDuration(time.Since(start).Seconds())

return nil
}

// CorethBlockToBlockRow converts a kafkamsg.CorethBlock to BlockRow
// Exported for testing purposes
// CorethBlockToBlockRow converts a kafkamsg.EVMBlock to BlockRow.
// Exported for testing purposes.
func CorethBlockToBlockRow(block *kafkamsg.EVMBlock) (*evmrepo.BlockRow, error) {
// Validate blockchain ID
if block.BlockchainID == nil {
Expand Down Expand Up @@ -207,8 +217,8 @@ func CorethBlockToBlockRow(block *kafkamsg.EVMBlock) (*evmrepo.BlockRow, error)
return blockRow, nil
}

// CorethTransactionToTransactionRow converts a coreth.Transaction to TransactionRow
// Exported for testing purposes
// CorethTransactionToTransactionRow converts a kafkamsg.EVMTransaction to TransactionRow.
// Exported for testing purposes.
func CorethTransactionToTransactionRow(
tx *kafkamsg.EVMTransaction,
block *kafkamsg.EVMBlock,
Expand Down Expand Up @@ -294,7 +304,10 @@ func (p *CorethProcessor) processTransactions(
return fmt.Errorf("failed to convert transaction %d: %w", i, err)
}

if err := p.txsRepo.WriteTransaction(ctx, txRow); err != nil {
writeStart := time.Now()
err = p.txsRepo.WriteTransaction(ctx, txRow)
p.metrics.RecordClickHouseWrite(clickHouseTableTransactions, err, time.Since(writeStart).Seconds())
if err != nil {
return fmt.Errorf("failed to write transaction %s: %w", tx.Hash, err)
}

Expand All @@ -304,7 +317,6 @@ func (p *CorethProcessor) processTransactions(
}
}

// Record logs processed metric
p.metrics.AddLogsProcessed(totalLogs)

var blockNumber uint64
Expand Down Expand Up @@ -340,7 +352,10 @@ func (p *CorethProcessor) processLogs(
return fmt.Errorf("failed to convert log: %w", err)
}

if err := p.logsRepo.WriteLog(ctx, logRow); err != nil {
writeStart := time.Now()
err = p.logsRepo.WriteLog(ctx, logRow)
p.metrics.RecordClickHouseWrite(clickHouseTableLogs, err, time.Since(writeStart).Seconds())
if err != nil {
return fmt.Errorf("failed to write log (tx: %s, index: %d): %w", tx.Hash, log.Index, err)
}
totalLogs++
Expand Down
Loading