11package io .kafbat .ui .service ;
22
3- import com .github .benmanes .caffeine .cache .AsyncCache ;
4- import com .github .benmanes .caffeine .cache .Caffeine ;
53import io .kafbat .ui .config .ClustersProperties ;
64import io .kafbat .ui .connect .api .KafkaConnectClientApi ;
75import io .kafbat .ui .connect .model .ClusterInfo ;
2523import io .kafbat .ui .model .FullConnectorInfoDTO ;
2624import io .kafbat .ui .model .KafkaCluster ;
2725import io .kafbat .ui .model .NewConnectorDTO ;
26+ import io .kafbat .ui .model .Statistics ;
2827import io .kafbat .ui .model .TaskDTO ;
2928import io .kafbat .ui .model .TaskIdDTO ;
3029import io .kafbat .ui .model .connect .InternalConnectorInfo ;
3130import io .kafbat .ui .service .index .KafkaConnectNgramFilter ;
31+ import io .kafbat .ui .service .metrics .scrape .KafkaConnectState ;
3232import io .kafbat .ui .util .ReactiveFailover ;
3333import jakarta .validation .Valid ;
34+ import java .util .HashMap ;
3435import java .util .List ;
3536import java .util .Map ;
3637import java .util .Optional ;
@@ -49,17 +50,16 @@ public class KafkaConnectService {
4950 private final KafkaConnectMapper kafkaConnectMapper ;
5051 private final KafkaConfigSanitizer kafkaConfigSanitizer ;
5152 private final ClustersProperties clustersProperties ;
52- private final AsyncCache < String , ClusterInfo > cacheClusterInfo ;
53+ private final StatisticsCache statisticsCache ;
5354
5455 public KafkaConnectService (KafkaConnectMapper kafkaConnectMapper ,
5556 KafkaConfigSanitizer kafkaConfigSanitizer ,
56- ClustersProperties clustersProperties ) {
57+ ClustersProperties clustersProperties ,
58+ StatisticsCache statisticsCache ) {
5759 this .kafkaConnectMapper = kafkaConnectMapper ;
5860 this .kafkaConfigSanitizer = kafkaConfigSanitizer ;
5961 this .clustersProperties = clustersProperties ;
60- this .cacheClusterInfo = Caffeine .newBuilder ()
61- .expireAfterWrite (clustersProperties .getCache ().getConnectClusterCacheExpiry ())
62- .buildAsync ();
62+ this .statisticsCache = statisticsCache ;
6363 }
6464
6565 public Flux <ConnectDTO > getConnects (KafkaCluster cluster , boolean withStats ) {
@@ -89,14 +89,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
8989 }
9090 }
9191
92- private Mono <ClusterInfo > getClusterInfo (KafkaCluster cluster , String connectName ) {
93- return Mono .fromFuture (cacheClusterInfo .get (connectName , (t , e ) ->
94- api (cluster , connectName ).mono (KafkaConnectClientApi ::getClusterInfo )
95- .onErrorResume (th -> {
96- log .error ("Error on collecting cluster info" , th );
97- return Mono .just (new ClusterInfo ());
98- }).toFuture ()
99- ));
92+ public Mono <ClusterInfo > getClusterInfo (KafkaCluster cluster , String connectName ) {
93+ KafkaConnectState state = statisticsCache .get (cluster ).getConnectStates ().get (connectName );
94+ if (state != null ) {
95+ return Mono .just (kafkaConnectMapper .toClient (state ));
96+ } else {
97+ return api (cluster , connectName ).mono (KafkaConnectClientApi ::getClusterInfo )
98+ .onErrorResume (th -> {
99+ log .error ("Error on collecting cluster info" , th );
100+ return Mono .just (new ClusterInfo ());
101+ });
102+ }
100103 }
101104
102105 private Flux <InternalConnectorInfo > getConnectConnectors (
@@ -134,6 +137,33 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
134137 .flatMapMany (Flux ::fromIterable );
135138 }
136139
140+ public Flux <KafkaConnectState > scrapeAllConnects (KafkaCluster cluster ) {
141+
142+ Optional <List <ClustersProperties .@ Valid ConnectCluster >> connectClusters =
143+ Optional .ofNullable (cluster .getOriginalProperties ().getKafkaConnect ());
144+
145+ return Flux .fromIterable (connectClusters .orElse (List .of ())).flatMap (c ->
146+ getClusterInfo (cluster , c .getName ()).map (info ->
147+ kafkaConnectMapper .toKafkaConnect (c , List .of (), info , false )
148+ ).onErrorResume ((t ) -> Mono .just (new ConnectDTO ().name (c .getName ())))
149+ ).flatMap (connect ->
150+ getConnectorsWithErrorsSuppress (cluster , connect .getName ())
151+ .onErrorResume (t -> Mono .just (Map .of ()))
152+ .flatMapMany (connectors ->
153+ Flux .fromIterable (connectors .entrySet ())
154+ .flatMap (e ->
155+ getConnectorTopics (
156+ cluster ,
157+ connect .getName (),
158+ e .getKey ()
159+ ).map (topics ->
160+ kafkaConnectMapper .fromClient (connect .getName (), e .getValue (), topics .getTopics ())
161+ )
162+ )
163+ ).collectList ().map (connectors -> kafkaConnectMapper .toScrapeState (connect , connectors ))
164+ );
165+ }
166+
137167 private List <FullConnectorInfoDTO > filterConnectors (
138168 List <FullConnectorInfoDTO > connectors ,
139169 String search ,
@@ -349,4 +379,30 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
349379 .formatted (connectorName , connectName ));
350380 });
351381 }
382+
383+ public Flux <FullConnectorInfoDTO > getTopicConnectors (KafkaCluster cluster , String topicName ) {
384+ Map <String , KafkaConnectState > connectStates = this .statisticsCache .get (cluster ).getConnectStates ();
385+ Map <String , List <String >> filteredConnects = new HashMap <>();
386+ for (Map .Entry <String , KafkaConnectState > entry : connectStates .entrySet ()) {
387+ List <KafkaConnectState .ConnectorState > connectors =
388+ entry .getValue ().getConnectors ().stream ().filter (c -> c .topics ().contains (topicName )).toList ();
389+ if (!connectors .isEmpty ()) {
390+ filteredConnects .put (entry .getKey (), connectors .stream ().map (KafkaConnectState .ConnectorState ::name ).toList ());
391+ }
392+ }
393+
394+ return Flux .fromIterable (filteredConnects .entrySet ())
395+ .flatMap (entry ->
396+ getConnectorsWithErrorsSuppress (cluster , entry .getKey ())
397+ .map (connectors ->
398+ connectors .entrySet ()
399+ .stream ()
400+ .filter (c -> entry .getValue ().contains (c .getKey ()))
401+ .map (c -> kafkaConnectMapper .fromClient (entry .getKey (), c .getValue (), null ))
402+ .map (kafkaConnectMapper ::fullConnectorInfo )
403+ .toList ()
404+ )
405+ ).flatMap (Flux ::fromIterable );
406+
407+ }
352408}
0 commit comments