Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand All @@ -21,27 +22,45 @@ public class PrometheusExposeController extends AbstractController implements Pr

@Override
public Mono<ResponseEntity<String>> exposeAllMetrics(ServerWebExchange exchange) {

String metrics = PrometheusMetricsExposer.exposeAllMetrics(
clustersStorage.getKafkaClusters().stream()
.filter(KafkaCluster::isExposeMetricsViaPrometheusEndpoint)
.collect(Collectors.toMap(
KafkaCluster::getName,
c -> statisticsCache.get(c).getMetrics()
))
).getBody();

return Mono.just(
PrometheusMetricsExposer.exposeAllMetrics(
clustersStorage.getKafkaClusters()
.stream()
.filter(KafkaCluster::isExposeMetricsViaPrometheusEndpoint)
.collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics()))
)
ResponseEntity
.ok()
.contentType(MediaType.parseMediaType("text/plain; version=0.0.4"))
.body(metrics)
);
}

@Override
public Mono<ResponseEntity<String>> exposeClusterMetrics(String clusterName,
ServerWebExchange exchange) {
public Mono<ResponseEntity<String>> exposeClusterMetrics(String clusterName, ServerWebExchange exchange) {

Optional<KafkaCluster> cluster = clustersStorage.getClusterByName(clusterName);
if (cluster.isPresent() && cluster.get().isExposeMetricsViaPrometheusEndpoint()) {
return Mono.just(PrometheusMetricsExposer.exposeAllMetrics(
Map.of(clusterName, statisticsCache.get(cluster.get()).getMetrics())
));
} else {

if (cluster.isEmpty() || !cluster.get().isExposeMetricsViaPrometheusEndpoint()) {
return Mono.just(ResponseEntity.notFound().build());
}
}

String metrics = PrometheusMetricsExposer.exposeAllMetrics(
Map.of(
clusterName,
statisticsCache.get(cluster.get()).getMetrics()
)
).getBody();

return Mono.just(
ResponseEntity
.ok()
.contentType(MediaType.parseMediaType("text/plain; version=0.0.4"))
.body(metrics)
);
}
}
Loading