Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -140,51 +140,51 @@ List<Consumer<Metric>> kafkaBrokerAssertions() {
metric,
"kafka.partition.count",
"The number of partitions on the broker",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.offline",
"The number of partitions offline",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.under_replicated",
"The number of under replicated partitions",
"{partitions}"),
"{partition}"),
metric ->
assertSumWithAttributes(
metric,
"kafka.isr.operation.count",
"The number of in-sync replica shrink and expand operations",
"{operations}",
"{operation}",
attrs -> attrs.containsOnly(entry("operation", "shrink")),
attrs -> attrs.containsOnly(entry("operation", "expand"))),
metric ->
assertGauge(
metric,
"kafka.controller.active.count",
"controller is active on broker",
"{controllers}"),
"For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.",
"{controller}"),
metric ->
assertSum(
metric,
"kafka.leader.election.rate",
"leader election rate - increasing indicates broker failures",
"{elections}"),
"Leader election rate - increasing indicates broker failures",
"{election}"),
metric ->
assertGauge(
metric,
"kafka.max.lag",
"max lag in messages between follower and leader replicas",
"{messages}"),
"Max lag in messages between follower and leader replicas",
"{message}"),
metric ->
assertSum(
metric,
"kafka.unclean.election.rate",
"unclean leader election rate - increasing indicates broker failures",
"{elections}"));
"Unclean leader election rate - increasing indicates broker failures",
"{election}"));
}

static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest {
Expand Down Expand Up @@ -235,52 +235,52 @@ void endToEnd() {
metric,
"kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-consumed-rate",
"The average number of records consumed per second",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer",
"1"),
"{record}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second",
"1"));
"{record}"));
}
}

Expand All @@ -300,14 +300,14 @@ void endToEnd() {
metric,
"kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.compression-rate",
"The average compression rate of record batches for a topic",
"1",
"{ratio}",
topics),
metric ->
assertKafkaGauge(
Expand All @@ -320,27 +320,27 @@ void endToEnd() {
metric,
"kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-send-rate",
"The average number of records sent per second for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
Expand All @@ -353,10 +353,13 @@ void endToEnd() {
metric,
"kafka.producer.request-rate",
"The average number of requests sent per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric, "kafka.producer.response-rate", "Responses received per second", "1"));
metric,
"kafka.producer.response-rate",
"Responses received per second",
"{response}"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,45 @@

def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second", "1",
"The number of fetch requests for all topics per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer", "1",
"Number of messages the consumer lags behind the producer", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-lag-max", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second", "by",
"The average number of bytes consumed for all topics per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"bytes-consumed-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics", "by",
"The average number of bytes fetched per request for all topics", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-size-avg", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second", "1",
"The average number of records consumed for all topics per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-consumed-rate", otel.&doubleValueCallback)

def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second", "by",
"The average number of bytes consumed per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"bytes-consumed-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request", "by",
"The average number of bytes fetched per request", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"fetch-size-avg", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate",
"The average number of records consumed per second", "1",
"The average number of records consumed per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"records-consumed-rate", otel.&doubleValueCallback)
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,45 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"io-wait-time-ns-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers", "by",
"The average number of outgoing bytes sent per second to all servers", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"outgoing-byte-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
"The average request latency", "ms",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-latency-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-rate",
"The average number of requests sent per second", "1",
"The average number of requests sent per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.response-rate",
"Responses received per second", "1",
"Responses received per second", "{response}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"response-rate", otel.&doubleValueCallback)

def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics")
otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic", "by",
"The average number of bytes sent per second for a topic", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"byte-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate",
"The average compression rate of record batches for a topic", "1",
"The average compression rate of record batches for a topic", "{ratio}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"compression-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic", "1",
"The average per-second number of record sends that resulted in errors for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-error-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic", "1",
"The average per-second number of retried record sends for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-retry-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate",
"The average number of records sent per second for a topic", "1",
"The average number of records sent per second for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-send-rate", otel.&doubleValueCallback)
Loading
Loading