diff --git a/docs/metrics.md b/docs/metrics.md index 07232dbc..f36e523d 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -95,6 +95,10 @@ kminion_kafka_consumer_group_topic_offset_sum{group_id="bigquery-sink",topic_nam # TYPE kminion_kafka_consumer_group_topic_partition_lag gauge kminion_kafka_consumer_group_topic_partition_lag{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 147481 +# HELP kminion_kafka_consumer_group_topic_partition_offset The committed offset of a consumer group for a given partition +# TYPE kminion_kafka_consumer_group_topic_partition_offset gauge +kminion_kafka_consumer_group_topic_partition_offset{group_id="bigquery-sink",partition_id="10",topic_name="shop-activity"} 427 + # HELP kminion_kafka_consumer_group_topic_lag The number of messages a consumer group is lagging behind across all partitions in a topic # TYPE kminion_kafka_consumer_group_topic_lag gauge kminion_kafka_consumer_group_topic_lag{group_id="bigquery-sink",topic_name="shop-activity"} 147481 diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index b61fd073..18b0db58 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -84,6 +84,14 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha if e.minionSvc.Cfg.ConsumerGroups.Granularity == minion.ConsumerGroupGranularityTopic { continue } + ch <- prometheus.MustNewConstMetric( + e.consumerGroupTopicPartitionOffset, + prometheus.GaugeValue, + float64(partition.Value.Offset), + groupName, + topicName, + strconv.Itoa(int(partitionID)), + ) ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionLag, prometheus.GaugeValue, @@ -176,6 +184,14 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan if e.minionSvc.Cfg.ConsumerGroups.Granularity == minion.ConsumerGroupGranularityTopic { continue } + ch <- prometheus.MustNewConstMetric( + e.consumerGroupTopicPartitionOffset, + prometheus.GaugeValue, + float64(partition.Offset), + groupName, + topic.Topic, + strconv.Itoa(int(partition.Partition)), + ) ch <- prometheus.MustNewConstMetric( e.consumerGroupTopicPartitionLag, prometheus.GaugeValue, diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcfa..7c1814e2 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -44,6 +44,7 @@ type Exporter struct { consumerGroupMembersEmpty *prometheus.Desc consumerGroupTopicMembers *prometheus.Desc consumerGroupAssignedTopicPartitions *prometheus.Desc + consumerGroupTopicPartitionOffset *prometheus.Desc consumerGroupTopicOffsetSum *prometheus.Desc consumerGroupTopicPartitionLag *prometheus.Desc consumerGroupTopicLag *prometheus.Desc @@ -179,7 +180,14 @@ func (e *Exporter) InitializeMetrics() { []string{"group_id", "topic_name"}, nil, ) - // Topic / Partition Offset Sum (useful for calculating the consumed messages / sec on a topic) + // Topic Partition Offsets (useful for calculating the consumed messages / sec on a topic partition) + e.consumerGroupTopicPartitionOffset = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_partition_offset"), + "The committed offset of a consumer group for a given partition", + []string{"group_id", "topic_name", "partition_id"}, + nil, + ) + // Topic Offset Sum (useful for calculating the consumed messages / sec on a topic) e.consumerGroupTopicOffsetSum = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_topic_offset_sum"), "The sum of all committed group offsets across all partitions in a topic",