Skip to content

Kafka: Emit additional consumer metrics.#17919

Merged
gianm merged 4 commits intoapache:masterfrom
gianm:kafka-consumer-addl-metrics
Apr 16, 2025
Merged

Kafka: Emit additional consumer metrics.#17919
gianm merged 4 commits intoapache:masterfrom
gianm:kafka-consumer-addl-metrics

Conversation

@gianm
Copy link
Contributor

@gianm gianm commented Apr 15, 2025

Adds more metrics to the ones originally added in #14582. These metrics help provide insight into the workings of the Kafka consumer.

As a bonus, one of the metrics, "kafka/consumer/recordsLag", has a dimension "partition" which can be used to more easily determine which tasks are reading which partitions. It also provides a view of lag from the task perspective rather than the Overlord perspective, which is useful if the Overlord is ever unable to report metrics properly.

Adds more metrics to the ones originally added in apache#14582. These metrics
help provide insight into the workings of the Kafka consumer.

As a bonus, one of the metrics, "kafka/consumer/recordsLag", has a
dimension "partition" which can be used to more easily determine which
tasks are reading which partitions. It also provides a view of lag from
the task perspective rather than the Overlord perspective, which is
useful if the Overlord is ever unable to report metrics properly.
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, minor queries.

final KafkaConsumerMetric kafkaConsumerMetric = METRICS.get(metricName.name());

if (kafkaConsumerMetric != null &&
kafkaConsumerMetric.getDimensions().equals(metricName.tags().keySet())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of requiring both the sets to be equal, should we just check for a contains relationship?

kafkaConsumerMetric != null &&
   metricName.tags().keySet().containsAll(kafkaConsumerMetric.getDimensions())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a comment on why we need to check for the dimensions. The older code had this comment:

    // Certain metrics are emitted both as grand totals and broken down by topic; we want to ignore the grand total and
    // only look at the per-topic metrics. See https://kafka.apache.org/documentation/#consumer_fetch_monitoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes, that comment is the reason why we need to look for equal sets. I have restored it.

new KafkaConsumerMetric(
"outgoing-byte-total",
"kafka/consumer/outgoingBytes",
Set.of(CLIENT_ID_TAG, "node-id"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add constants for "topic", "partition" and "node-id".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

.getAndSet(newValueAsLong);
emitValue = newValueAsLong - priorValue;
} else {
throw DruidException.defensive("Unexpected metric type[%s]", kafkaConsumerMetric.getMetricType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also log an error and then throw the exception, because I am not sure where this exception would be caught.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would only happen if there is an enum value for KafkaConsumerMetric.MetricType that isn't handled here. Since KafkaConsumerMetric is only used by this monitor, it seems hard to believe it would ever happen. That is why I chose to use a defensive check.

Tracing through the code, it looks like exceptions here are ultimately caught in ScheduledExecutors.scheduleAtFixedRate and logged at ERROR level.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @gianm !

@gianm gianm merged commit 3595028 into apache:master Apr 16, 2025
42 checks passed
@gianm gianm deleted the kafka-consumer-addl-metrics branch April 16, 2025 08:06
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
abhishekrb19 added a commit that referenced this pull request Aug 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants