@@ -26,6 +26,7 @@ public class InferredMetricsScraper {
2626 public static final String NODE_ID_TAG = "node_id" ;
2727 public static final String TOPIC_TAG = "topic" ;
2828 public static final String GROUP_TAG = "group" ;
29+ public static final String PARTITION_TAG = "partition" ;
2930 private ScrapedClusterState prevState = null ;
3031
3132 public synchronized Mono <InferredMetrics > scrape (ScrapedClusterState newState ) {
@@ -130,36 +131,36 @@ private static void fillTopicMetrics(MetricsRegistry registry, ScrapedClusterSta
130131 state .endOffsets ().forEach ((partition , endOffset ) -> registry .gauge (
131132 "kafka_topic_partition_next_offset" ,
132133 "Current (next) Offset of a Broker at Topic/Partition" ,
133- List .of (TOPIC_TAG , "partition" ),
134+ List .of (TOPIC_TAG , PARTITION_TAG ),
134135 List .of (topicName , String .valueOf (partition )),
135136 endOffset
136137 ));
137138 state .startOffsets ().forEach ((partition , startOffset ) -> registry .gauge (
138139 "kafka_topic_partition_oldest_offset" ,
139140 "Oldest Offset of a Broker at Topic/Partition" ,
140- List .of (TOPIC_TAG , "partition" ),
141+ List .of (TOPIC_TAG , PARTITION_TAG ),
141142 List .of (topicName , String .valueOf (partition )),
142143 startOffset
143144 ));
144145 state .description ().partitions ().forEach (p -> {
145146 registry .gauge (
146147 "kafka_topic_partition_in_sync_replica" ,
147148 "Number of In-Sync Replicas for this Topic/Partition" ,
148- List .of (TOPIC_TAG , "partition" ),
149+ List .of (TOPIC_TAG , PARTITION_TAG ),
149150 List .of (topicName , String .valueOf (p .partition ())),
150151 p .isr ().size ()
151152 );
152153 registry .gauge (
153154 "kafka_topic_partition_replicas" ,
154155 "Number of Replicas for this Topic/Partition" ,
155- List .of (TOPIC_TAG , "partition" ),
156+ List .of (TOPIC_TAG , PARTITION_TAG ),
156157 List .of (topicName , String .valueOf (p .partition ())),
157158 p .replicas ().size ()
158159 );
159160 registry .gauge (
160161 "kafka_topic_partition_leader" ,
161162 "Leader Broker ID of this Topic/Partition (-1, if no leader)" ,
162- List .of (TOPIC_TAG , "partition" ),
163+ List .of (TOPIC_TAG , PARTITION_TAG ),
163164 List .of (topicName , String .valueOf (p .partition ())),
164165 Optional .ofNullable (p .leader ()).map (Node ::id ).orElse (-1 )
165166 );
@@ -212,7 +213,7 @@ private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedC
212213 registry .gauge (
213214 "kafka_consumergroup_current_offset" ,
214215 "Current Offset of a ConsumerGroup at Topic/Partition" ,
215- List .of ("consumergroup" , TOPIC_TAG , "partition" ),
216+ List .of ("consumergroup" , TOPIC_TAG , PARTITION_TAG ),
216217 List .of (groupName , tp .topic (), String .valueOf (tp .partition ())),
217218 committedOffset
218219 );
@@ -223,7 +224,7 @@ private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedC
223224 registry .gauge (
224225 "kafka_consumergroup_lag" ,
225226 "Current Approximate Lag of a ConsumerGroup at Topic/Partition" ,
226- List .of ("consumergroup" , TOPIC_TAG , "partition" ),
227+ List .of ("consumergroup" , TOPIC_TAG , PARTITION_TAG ),
227228 List .of (groupName , tp .topic (), String .valueOf (tp .partition ())),
228229 endOffset - committedOffset //TODO: check +-1
229230 ));
0 commit comments