44import io .kafbat .ui .model .KafkaCluster ;
55import io .kafbat .ui .service .StatisticsCache ;
66import io .kafbat .ui .service .metrics .prometheus .PrometheusMetricsExposer ;
7+ import java .nio .charset .StandardCharsets ;
78import java .util .Map ;
89import java .util .Optional ;
910import java .util .stream .Collectors ;
1011import lombok .RequiredArgsConstructor ;
12+ import org .springframework .http .HttpStatus ;
1113import org .springframework .http .ResponseEntity ;
1214import org .springframework .web .bind .annotation .RestController ;
1315import org .springframework .web .server .ServerWebExchange ;
@@ -20,28 +22,48 @@ public class PrometheusExposeController extends AbstractController implements Pr
2022 private final StatisticsCache statisticsCache ;
2123
2224 @ Override
23- public Mono <ResponseEntity <String >> exposeAllMetrics (ServerWebExchange exchange ) {
24- return Mono .just (
25- PrometheusMetricsExposer .exposeAllMetrics (
26- clustersStorage .getKafkaClusters ()
27- .stream ()
28- .filter (KafkaCluster ::isExposeMetricsViaPrometheusEndpoint )
29- .collect (Collectors .toMap (KafkaCluster ::getName , c -> statisticsCache .get (c ).getMetrics ()))
30- )
31- );
25+ public Mono <ResponseEntity <Void >> exposeAllMetrics (ServerWebExchange exchange ) {
26+ String metrics = PrometheusMetricsExposer .exposeAllMetrics (
27+ clustersStorage .getKafkaClusters ()
28+ .stream ()
29+ .filter (KafkaCluster ::isExposeMetricsViaPrometheusEndpoint )
30+ .collect (Collectors .toMap (
31+ KafkaCluster ::getName ,
32+ c -> statisticsCache .get (c ).getMetrics ()))
33+ ).getBody ();
34+
35+ var response = exchange .getResponse ();
36+ response .getHeaders ().add ("Content-Type" , "text/plain; version=0.0.4" );
37+
38+ byte [] bytes = metrics .getBytes (StandardCharsets .UTF_8 );
39+ var buffer = response .bufferFactory ().wrap (bytes );
40+
41+ return response .writeWith (Mono .just (buffer ))
42+ .doOnTerminate (response ::setComplete )
43+ .thenReturn (ResponseEntity .ok ().build ());
3244 }
3345
3446 @ Override
35- public Mono <ResponseEntity <String >> exposeClusterMetrics (String clusterName ,
36- ServerWebExchange exchange ) {
47+ public Mono <ResponseEntity <Void >> exposeClusterMetrics (String clusterName , ServerWebExchange exchange ) {
3748 Optional <KafkaCluster > cluster = clustersStorage .getClusterByName (clusterName );
38- if (cluster .isPresent () && cluster .get ().isExposeMetricsViaPrometheusEndpoint ()) {
39- return Mono .just (PrometheusMetricsExposer .exposeAllMetrics (
40- Map .of (clusterName , statisticsCache .get (cluster .get ()).getMetrics ())
41- ));
42- } else {
43- return Mono .just (ResponseEntity .notFound ().build ());
49+ if (cluster .isEmpty () || !cluster .get ().isExposeMetricsViaPrometheusEndpoint ()) {
50+ exchange .getResponse ().setStatusCode (HttpStatus .NOT_FOUND );
51+ return exchange .getResponse ().setComplete ()
52+ .thenReturn (ResponseEntity .notFound ().build ());
4453 }
45- }
4654
55+ String metrics = PrometheusMetricsExposer .exposeAllMetrics (
56+ Map .of (clusterName , statisticsCache .get (cluster .get ()).getMetrics ())
57+ ).getBody ();
58+
59+ var response = exchange .getResponse ();
60+ response .getHeaders ().add ("Content-Type" , "text/plain; version=0.0.4" );
61+
62+ byte [] bytes = metrics .getBytes (StandardCharsets .UTF_8 );
63+ var buffer = response .bufferFactory ().wrap (bytes );
64+
65+ return response .writeWith (Mono .just (buffer ))
66+ .doOnTerminate (response ::setComplete )
67+ .thenReturn (ResponseEntity .ok ().build ());
68+ }
4769}
0 commit comments