2323@ RequiredArgsConstructor
2424public class InferredMetricsScraper {
2525
26+ public static final String NODE_ID_TAG = "node_id" ;
27+ public static final String TOPIC_TAG = "topic" ;
28+ public static final String GROUP_TAG = "group" ;
2629 private ScrapedClusterState prevState = null ;
2730
2831 public synchronized Mono <InferredMetrics > scrape (ScrapedClusterState newState ) {
@@ -79,7 +82,7 @@ private static void fillNodesMetrics(MetricsRegistry registry, ScrapedClusterSta
7982 registry .gauge (
8083 "broker_bytes_disk" ,
8184 "Written disk size in bytes of a broker" ,
82- List .of ("node_id" ),
85+ List .of (NODE_ID_TAG ),
8386 List .of (nodeId .toString ()),
8487 state .segmentStats ().getSegmentSize ()
8588 );
@@ -89,7 +92,7 @@ private static void fillNodesMetrics(MetricsRegistry registry, ScrapedClusterSta
8992 registry .gauge (
9093 "broker_bytes_usable" ,
9194 "Usable disk size in bytes of a broker" ,
92- List .of ("node_id" ),
95+ List .of (NODE_ID_TAG ),
9396 List .of (nodeId .toString ()),
9497 state .logDirSpaceStats ().usableBytes ()
9598 );
@@ -98,7 +101,7 @@ private static void fillNodesMetrics(MetricsRegistry registry, ScrapedClusterSta
98101 registry .gauge (
99102 "broker_bytes_total" ,
100103 "Total disk size in bytes of a broker" ,
101- List .of ("node_id" ),
104+ List .of (NODE_ID_TAG ),
102105 List .of (nodeId .toString ()),
103106 state .logDirSpaceStats ().totalBytes ()
104107 );
@@ -120,43 +123,43 @@ private static void fillTopicMetrics(MetricsRegistry registry, ScrapedClusterSta
120123 registry .gauge (
121124 "kafka_topic_partitions" ,
122125 "Number of partitions for this Topic" ,
123- List .of ("topic" ),
126+ List .of (TOPIC_TAG ),
124127 List .of (topicName ),
125128 state .description ().partitions ().size ()
126129 );
127130 state .endOffsets ().forEach ((partition , endOffset ) -> registry .gauge (
128131 "kafka_topic_partition_next_offset" ,
129132 "Current (next) Offset of a Broker at Topic/Partition" ,
130- List .of ("topic" , "partition" ),
133+ List .of (TOPIC_TAG , "partition" ),
131134 List .of (topicName , String .valueOf (partition )),
132135 endOffset
133136 ));
134137 state .startOffsets ().forEach ((partition , startOffset ) -> registry .gauge (
135138 "kafka_topic_partition_oldest_offset" ,
136139 "Oldest Offset of a Broker at Topic/Partition" ,
137- List .of ("topic" , "partition" ),
140+ List .of (TOPIC_TAG , "partition" ),
138141 List .of (topicName , String .valueOf (partition )),
139142 startOffset
140143 ));
141144 state .description ().partitions ().forEach (p -> {
142145 registry .gauge (
143146 "kafka_topic_partition_in_sync_replica" ,
144147 "Number of In-Sync Replicas for this Topic/Partition" ,
145- List .of ("topic" , "partition" ),
148+ List .of (TOPIC_TAG , "partition" ),
146149 List .of (topicName , String .valueOf (p .partition ())),
147150 p .isr ().size ()
148151 );
149152 registry .gauge (
150153 "kafka_topic_partition_replicas" ,
151154 "Number of Replicas for this Topic/Partition" ,
152- List .of ("topic" , "partition" ),
155+ List .of (TOPIC_TAG , "partition" ),
153156 List .of (topicName , String .valueOf (p .partition ())),
154157 p .replicas ().size ()
155158 );
156159 registry .gauge (
157160 "kafka_topic_partition_leader" ,
158161 "Leader Broker ID of this Topic/Partition (-1, if no leader)" ,
159- List .of ("topic" , "partition" ),
162+ List .of (TOPIC_TAG , "partition" ),
160163 List .of (topicName , String .valueOf (p .partition ())),
161164 Optional .ofNullable (p .leader ()).map (Node ::id ).orElse (-1 )
162165 );
@@ -165,7 +168,7 @@ private static void fillTopicMetrics(MetricsRegistry registry, ScrapedClusterSta
165168 registry .gauge (
166169 "topic_bytes_disk" ,
167170 "Disk size in bytes of a topic" ,
168- List .of ("topic" ),
171+ List .of (TOPIC_TAG ),
169172 List .of (topicName ),
170173 state .segmentStats ().getSegmentSize ()
171174 );
@@ -186,21 +189,21 @@ private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedC
186189 registry .gauge (
187190 "group_state" ,
188191 "State of the consumer group, value = ordinal of org.apache.kafka.common.ConsumerGroupState" ,
189- List .of ("group" ),
192+ List .of (GROUP_TAG ),
190193 List .of (groupName ),
191194 state .description ().state ().ordinal ()
192195 );
193196 registry .gauge (
194197 "group_member_count" ,
195198 "Number of member assignments in the consumer group." ,
196- List .of ("group" ),
199+ List .of (GROUP_TAG ),
197200 List .of (groupName ),
198201 state .description ().members ().size ()
199202 );
200203 registry .gauge (
201204 "group_host_count" ,
202205 "Number of distinct hosts in the consumer group." ,
203- List .of ("group" ),
206+ List .of (GROUP_TAG ),
204207 List .of (groupName ),
205208 state .description ().members ().stream ().map (MemberDescription ::host ).distinct ().count ()
206209 );
@@ -209,7 +212,7 @@ private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedC
209212 registry .gauge (
210213 "kafka_consumergroup_current_offset" ,
211214 "Current Offset of a ConsumerGroup at Topic/Partition" ,
212- List .of ("consumergroup" , "topic" , "partition" ),
215+ List .of ("consumergroup" , TOPIC_TAG , "partition" ),
213216 List .of (groupName , tp .topic (), String .valueOf (tp .partition ())),
214217 committedOffset
215218 );
@@ -220,7 +223,7 @@ private static void fillConsumerGroupsMetrics(MetricsRegistry registry, ScrapedC
220223 registry .gauge (
221224 "kafka_consumergroup_lag" ,
222225 "Current Approximate Lag of a ConsumerGroup at Topic/Partition" ,
223- List .of ("consumergroup" , "topic" , "partition" ),
226+ List .of ("consumergroup" , TOPIC_TAG , "partition" ),
224227 List .of (groupName , tp .topic (), String .valueOf (tp .partition ())),
225228 endOffset - committedOffset //TODO: check +-1
226229 ));
0 commit comments