diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/prometheus/PrometheusMetricsExposer.java b/api/src/main/java/io/kafbat/ui/service/metrics/prometheus/PrometheusMetricsExposer.java index a90e65247..de26c2571 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/prometheus/PrometheusMetricsExposer.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/prometheus/PrometheusMetricsExposer.java @@ -30,7 +30,7 @@ public final class PrometheusMetricsExposer { static { PROMETHEUS_EXPOSE_ENDPOINT_HEADERS = new HttpHeaders(); - PROMETHEUS_EXPOSE_ENDPOINT_HEADERS.set(CONTENT_TYPE, OpenMetricsTextFormatWriter.CONTENT_TYPE); + PROMETHEUS_EXPOSE_ENDPOINT_HEADERS.set(CONTENT_TYPE, PrometheusTextFormatWriter.CONTENT_TYPE); } private PrometheusMetricsExposer() { diff --git a/api/src/main/java/io/kafbat/ui/util/MetricsUtils.java b/api/src/main/java/io/kafbat/ui/util/MetricsUtils.java index 3b4a22024..0379a6f26 100644 --- a/api/src/main/java/io/kafbat/ui/util/MetricsUtils.java +++ b/api/src/main/java/io/kafbat/ui/util/MetricsUtils.java @@ -17,7 +17,9 @@ import java.util.Collection; import java.util.List; import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +@Slf4j public final class MetricsUtils { private MetricsUtils() { @@ -109,6 +111,11 @@ public static MetricSnapshot concatDataPoints(MetricSnapshot d1, MetricSnapshot } private static Labels extendLabels(Labels labels, String name, String value) { - return labels.add(name, value); + if (!labels.contains(name)) { + return labels.add(name, value); + } else { + log.warn("Label {} already exists with value {} not updated to {}, skipping", name, labels.get(name), value); + return labels; + } } } diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index fc13dc74c..6f72def08 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -39,12 +39,19 @@ public abstract class AbstractIntegrationTest { public static final String SECOND_LOCAL = "secondLocal"; private static final String CONFLUENT_PLATFORM_VERSION = "7.8.0"; + private static final int JMX_PORT = 5555; public static final ConfluentKafkaContainer kafkaOriginal = new ConfluentKafkaContainer( DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)); public static final ConfluentKafkaContainer kafka = kafkaOriginal .withListener("0.0.0.0:9095", () -> kafkaOriginal.getNetworkAliases().getFirst() + ":9095") + .withEnv("KAFKA_JMX_PORT", String.valueOf(JMX_PORT)) + .withEnv("KAFKA_OPTS", + "-Dcom.sun.management.jmxremote " + + "-Dcom.sun.management.jmxremote.authenticate=false " + + "-Dcom.sun.management.jmxremote.ssl=false " + + "-Dcom.sun.management.jmxremote.local.only=false") .withNetwork(Network.SHARED); public static final SchemaRegistryContainer schemaRegistry = @@ -67,6 +74,7 @@ public abstract class AbstractIntegrationTest { public static Path tmpDir; static { + kafka.addExposedPort(JMX_PORT); kafka.start(); schemaRegistry.start(); kafkaConnect.start(); @@ -111,6 +119,8 @@ public void initialize(@NotNull ConfigurableApplicationContext context) { IsolationLevel.READ_COMMITTED.toString()); System.setProperty("kafka.clusters.0.producerProperties.request.timeout.ms", "45000"); System.setProperty("kafka.clusters.0.producerProperties.max.block.ms", "80000"); + System.setProperty("kafka.clusters.0.metrics.prometheusExpose", "true"); + System.setProperty("kafka.clusters.0.metrics.port", kafka.getMappedPort(JMX_PORT).toString()); System.setProperty("kafka.clusters.1.name", SECOND_LOCAL); System.setProperty("kafka.clusters.1.readOnly", "true"); diff --git a/api/src/test/java/io/kafbat/ui/controller/PrometheusExposeControllerTest.java b/api/src/test/java/io/kafbat/ui/controller/PrometheusExposeControllerTest.java new file mode 100644 index 000000000..01b7e14a6 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/controller/PrometheusExposeControllerTest.java @@ -0,0 +1,32 @@ +package io.kafbat.ui.controller; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.AbstractIntegrationTest; +import io.kafbat.ui.service.ClustersStatisticsScheduler; +import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.web.reactive.server.WebTestClient; + +class PrometheusExposeControllerTest extends AbstractIntegrationTest { + @Autowired + private WebTestClient webTestClient; + + @Autowired + private ClustersStatisticsScheduler scheduler; + + @Test + void testGetMetrics() throws IOException { + + scheduler.updateStatistics(); + + webTestClient + .get() + .uri("/metrics") + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(PrometheusTextFormatWriter.CONTENT_TYPE); + } +}