Skip to content

Commit bfa01dc

Browse files
authored
BE: fix prometheus endpoint and warn on label duplicates (#1546)
1 parent f51df4c commit bfa01dc

File tree

4 files changed

+51
-2
lines changed

4 files changed

+51
-2
lines changed

api/src/main/java/io/kafbat/ui/service/metrics/prometheus/PrometheusMetricsExposer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public final class PrometheusMetricsExposer {
3030

3131
static {
3232
PROMETHEUS_EXPOSE_ENDPOINT_HEADERS = new HttpHeaders();
33-
PROMETHEUS_EXPOSE_ENDPOINT_HEADERS.set(CONTENT_TYPE, OpenMetricsTextFormatWriter.CONTENT_TYPE);
33+
PROMETHEUS_EXPOSE_ENDPOINT_HEADERS.set(CONTENT_TYPE, PrometheusTextFormatWriter.CONTENT_TYPE);
3434
}
3535

3636
private PrometheusMetricsExposer() {

api/src/main/java/io/kafbat/ui/util/MetricsUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import java.util.Collection;
1818
import java.util.List;
1919
import java.util.stream.Stream;
20+
import lombok.extern.slf4j.Slf4j;
2021

22+
@Slf4j
2123
public final class MetricsUtils {
2224

2325
private MetricsUtils() {
@@ -109,6 +111,11 @@ public static MetricSnapshot concatDataPoints(MetricSnapshot d1, MetricSnapshot
109111
}
110112

111113
private static Labels extendLabels(Labels labels, String name, String value) {
112-
return labels.add(name, value);
114+
if (!labels.contains(name)) {
115+
return labels.add(name, value);
116+
} else {
117+
log.warn("Label {} already exists with value {} not updated to {}, skipping", name, labels.get(name), value);
118+
return labels;
119+
}
113120
}
114121
}

api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,19 @@ public abstract class AbstractIntegrationTest {
3939
public static final String SECOND_LOCAL = "secondLocal";
4040

4141
private static final String CONFLUENT_PLATFORM_VERSION = "7.8.0";
42+
private static final int JMX_PORT = 5555;
4243

4344
public static final ConfluentKafkaContainer kafkaOriginal = new ConfluentKafkaContainer(
4445
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION));
4546

4647
public static final ConfluentKafkaContainer kafka = kafkaOriginal
4748
.withListener("0.0.0.0:9095", () -> kafkaOriginal.getNetworkAliases().getFirst() + ":9095")
49+
.withEnv("KAFKA_JMX_PORT", String.valueOf(JMX_PORT))
50+
.withEnv("KAFKA_OPTS",
51+
"-Dcom.sun.management.jmxremote "
52+
+ "-Dcom.sun.management.jmxremote.authenticate=false "
53+
+ "-Dcom.sun.management.jmxremote.ssl=false "
54+
+ "-Dcom.sun.management.jmxremote.local.only=false")
4855
.withNetwork(Network.SHARED);
4956

5057
public static final SchemaRegistryContainer schemaRegistry =
@@ -67,6 +74,7 @@ public abstract class AbstractIntegrationTest {
6774
public static Path tmpDir;
6875

6976
static {
77+
kafka.addExposedPort(JMX_PORT);
7078
kafka.start();
7179
schemaRegistry.start();
7280
kafkaConnect.start();
@@ -111,6 +119,8 @@ public void initialize(@NotNull ConfigurableApplicationContext context) {
111119
IsolationLevel.READ_COMMITTED.toString());
112120
System.setProperty("kafka.clusters.0.producerProperties.request.timeout.ms", "45000");
113121
System.setProperty("kafka.clusters.0.producerProperties.max.block.ms", "80000");
122+
System.setProperty("kafka.clusters.0.metrics.prometheusExpose", "true");
123+
System.setProperty("kafka.clusters.0.metrics.port", kafka.getMappedPort(JMX_PORT).toString());
114124

115125
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
116126
System.setProperty("kafka.clusters.1.readOnly", "true");
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.kafbat.ui.controller;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.AbstractIntegrationTest;
6+
import io.kafbat.ui.service.ClustersStatisticsScheduler;
7+
import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
8+
import java.io.IOException;
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.test.web.reactive.server.WebTestClient;
12+
13+
class PrometheusExposeControllerTest extends AbstractIntegrationTest {
14+
@Autowired
15+
private WebTestClient webTestClient;
16+
17+
@Autowired
18+
private ClustersStatisticsScheduler scheduler;
19+
20+
@Test
21+
void testGetMetrics() throws IOException {
22+
23+
scheduler.updateStatistics();
24+
25+
webTestClient
26+
.get()
27+
.uri("/metrics")
28+
.exchange()
29+
.expectStatus().isOk()
30+
.expectHeader().contentType(PrometheusTextFormatWriter.CONTENT_TYPE);
31+
}
32+
}

0 commit comments

Comments
 (0)