55import io .kafbat .ui .config .ClustersProperties ;
66import io .kafbat .ui .connect .api .KafkaConnectClientApi ;
77import io .kafbat .ui .connect .model .ClusterInfo ;
8+ import io .kafbat .ui .connect .model .ConnectorExpand ;
89import io .kafbat .ui .connect .model .ConnectorStatus ;
910import io .kafbat .ui .connect .model .ConnectorStatusConnector ;
1011import io .kafbat .ui .connect .model .ConnectorTopics ;
12+ import io .kafbat .ui .connect .model .ExpandedConnector ;
1113import io .kafbat .ui .connect .model .TaskStatus ;
1214import io .kafbat .ui .exception .ConnectorOffsetsResetException ;
1315import io .kafbat .ui .exception .NotFoundException ;
2426import io .kafbat .ui .model .KafkaCluster ;
2527import io .kafbat .ui .model .NewConnectorDTO ;
2628import io .kafbat .ui .model .TaskDTO ;
29+ import io .kafbat .ui .model .TaskIdDTO ;
2730import io .kafbat .ui .model .connect .InternalConnectorInfo ;
2831import io .kafbat .ui .service .index .KafkaConnectNgramFilter ;
2932import io .kafbat .ui .util .ReactiveFailover ;
3235import java .util .Map ;
3336import java .util .Optional ;
3437import java .util .function .Predicate ;
35- import java .util .stream .Stream ;
3638import javax .annotation .Nullable ;
3739import lombok .extern .slf4j .Slf4j ;
3840import org .springframework .stereotype .Service ;
@@ -47,8 +49,6 @@ public class KafkaConnectService {
4749 private final KafkaConnectMapper kafkaConnectMapper ;
4850 private final KafkaConfigSanitizer kafkaConfigSanitizer ;
4951 private final ClustersProperties clustersProperties ;
50-
51- private final AsyncCache <ConnectCacheKey , List <InternalConnectorInfo >> cachedConnectors ;
5252 private final AsyncCache <String , ClusterInfo > cacheClusterInfo ;
5353
5454 public KafkaConnectService (KafkaConnectMapper kafkaConnectMapper ,
@@ -57,9 +57,6 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5757 this .kafkaConnectMapper = kafkaConnectMapper ;
5858 this .kafkaConfigSanitizer = kafkaConfigSanitizer ;
5959 this .clustersProperties = clustersProperties ;
60- this .cachedConnectors = Caffeine .newBuilder ()
61- .expireAfterWrite (clustersProperties .getCache ().getConnectCacheExpiry ())
62- .buildAsync ();
6360 this .cacheClusterInfo = Caffeine .newBuilder ()
6461 .expireAfterWrite (clustersProperties .getCache ().getConnectClusterCacheExpiry ())
6562 .buildAsync ();
@@ -74,9 +71,10 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
7471 Flux .fromIterable (connects ).flatMap (c ->
7572 getClusterInfo (cluster , c .getName ()).map (ci -> Tuples .of (c , ci ))
7673 ).flatMap (tuple -> (
77- getConnectConnectorsFromCache (new ConnectCacheKey (cluster , tuple .getT1 ()))
74+ getConnectConnectors (cluster , tuple .getT1 ())
75+ .collectList ()
7876 .map (connectors ->
79- kafkaConnectMapper .toKafkaConnect (tuple .getT1 (), connectors , tuple .getT2 (), withStats )
77+ kafkaConnectMapper .toKafkaConnect (tuple .getT1 (), connectors , tuple .getT2 (), true )
8078 )
8179 )
8280 )
@@ -85,29 +83,17 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
8583 return Flux .fromIterable (connectClusters .orElse (List .of ()))
8684 .flatMap (c ->
8785 getClusterInfo (cluster , c .getName ()).map (info ->
88- kafkaConnectMapper .toKafkaConnect (c , List .of (), info , withStats )
86+ kafkaConnectMapper .toKafkaConnect (c , List .of (), info , false )
8987 )
9088 );
9189 }
9290 }
9391
94- private Mono <List <InternalConnectorInfo >> getConnectConnectorsFromCache (ConnectCacheKey key ) {
95- if (clustersProperties .getCache ().isEnabled ()) {
96- return Mono .fromFuture (
97- cachedConnectors .get (key , (t , e ) ->
98- getConnectConnectors (t .cluster (), t .connect ()).collectList ().toFuture ()
99- )
100- );
101- } else {
102- return getConnectConnectors (key .cluster (), key .connect ()).collectList ();
103- }
104- }
105-
10692 private Mono <ClusterInfo > getClusterInfo (KafkaCluster cluster , String connectName ) {
10793 return Mono .fromFuture (cacheClusterInfo .get (connectName , (t , e ) ->
10894 api (cluster , connectName ).mono (KafkaConnectClientApi ::getClusterInfo )
10995 .onErrorResume (th -> {
110- log .error ("Error on collecting cluster info" + th . getMessage () , th );
96+ log .error ("Error on collecting cluster info" , th );
11197 return Mono .just (new ClusterInfo ());
11298 }).toFuture ()
11399 ));
@@ -116,17 +102,11 @@ private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectNam
116102 private Flux <InternalConnectorInfo > getConnectConnectors (
117103 KafkaCluster cluster ,
118104 ClustersProperties .ConnectCluster connect ) {
119- return getConnectorNamesWithErrorsSuppress (cluster , connect .getName ()).flatMap (connectorName ->
120- Mono .zip (
121- getConnector (cluster , connect .getName (), connectorName ),
122- getConnectorTasks (cluster , connect .getName (), connectorName ).collectList ()
123- ).map (tuple ->
124- InternalConnectorInfo .builder ()
125- .connector (tuple .getT1 ())
126- .config (null )
127- .tasks (tuple .getT2 ())
128- .topics (null )
129- .build ()
105+ return getConnectorsWithErrorsSuppress (cluster , connect .getName ()).flatMapMany (connectors ->
106+ Flux .fromStream (
107+ connectors .values ().stream ().map (c ->
108+ kafkaConnectMapper .fromClient (connect .getName (), c , null )
109+ )
130110 )
131111 );
132112 }
@@ -135,21 +115,20 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
135115 @ Nullable final String search , Boolean fts ) {
136116 return getConnects (cluster , false )
137117 .flatMap (connect ->
138- getConnectorNamesWithErrorsSuppress (cluster , connect .getName ())
139- .flatMap (connectorName ->
140- Mono .zip (
141- getConnector (cluster , connect .getName (), connectorName ),
142- getConnectorConfig (cluster , connect .getName (), connectorName ),
143- getConnectorTasks (cluster , connect .getName (), connectorName ).collectList (),
144- getConnectorTopics (cluster , connect .getName (), connectorName )
145- ).map (tuple ->
146- InternalConnectorInfo .builder ()
147- .connector (tuple .getT1 ())
148- .config (tuple .getT2 ())
149- .tasks (tuple .getT3 ())
150- .topics (tuple .getT4 ().getTopics ())
151- .build ())))
152- .map (kafkaConnectMapper ::fullConnectorInfo )
118+ getConnectorsWithErrorsSuppress (cluster , connect .getName ())
119+ .flatMapMany (connectors ->
120+ Flux .fromIterable (connectors .entrySet ())
121+ .flatMap (e ->
122+ getConnectorTopics (
123+ cluster ,
124+ connect .getName (),
125+ e .getKey ()
126+ ).map (topics ->
127+ kafkaConnectMapper .fromClient (connect .getName (), e .getValue (), topics .getTopics ())
128+ )
129+ )
130+ )
131+ ).map (kafkaConnectMapper ::fullConnectorInfo )
153132 .collectList ()
154133 .map (lst -> filterConnectors (lst , search , fts ))
155134 .flatMapMany (Flux ::fromIterable );
@@ -165,14 +144,6 @@ private List<FullConnectorInfoDTO> filterConnectors(
165144 return filter .find (search );
166145 }
167146
168- private Stream <String > getStringsForSearch (FullConnectorInfoDTO fullConnectorInfo ) {
169- return Stream .of (
170- fullConnectorInfo .getName (),
171- fullConnectorInfo .getConnect (),
172- fullConnectorInfo .getStatus ().getState ().getValue (),
173- fullConnectorInfo .getType ().getValue ());
174- }
175-
176147 public Mono <ConnectorTopics > getConnectorTopics (KafkaCluster cluster , String connectClusterName ,
177148 String connectorName ) {
178149 return api (cluster , connectClusterName )
@@ -183,15 +154,17 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con
183154 .onErrorResume (Exception .class , e -> Mono .just (new ConnectorTopics ().topics (List .of ())));
184155 }
185156
186- public Flux < String > getConnectorNames (KafkaCluster cluster , String connectName ) {
157+ public Mono < Map < String , ExpandedConnector >> getConnectors (KafkaCluster cluster , String connectName ) {
187158 return api (cluster , connectName )
188- .mono (client -> client .getConnectors (null ))
189- .flatMapMany (Flux ::fromIterable );
159+ .mono (client ->
160+ client .getConnectors (null , List .of (ConnectorExpand .INFO , ConnectorExpand .STATUS ))
161+ );
190162 }
191163
192164 // returns empty flux if there was an error communicating with Connect
193- public Flux <String > getConnectorNamesWithErrorsSuppress (KafkaCluster cluster , String connectName ) {
194- return getConnectorNames (cluster , connectName ).onErrorComplete ();
165+ public Mono <Map <String , ExpandedConnector >> getConnectorsWithErrorsSuppress (
166+ KafkaCluster cluster , String connectName ) {
167+ return getConnectors (cluster , connectName ).onErrorComplete ();
195168 }
196169
197170 public Mono <ConnectorDTO > createConnector (KafkaCluster cluster , String connectName ,
@@ -216,8 +189,8 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
216189
217190 private Mono <Boolean > connectorExists (KafkaCluster cluster , String connectName ,
218191 String connectorName ) {
219- return getConnectorNames (cluster , connectName )
220- .any ( name -> name . equals (connectorName ));
192+ return getConnectors (cluster , connectName )
193+ .map ( m -> m . containsKey (connectorName ));
221194 }
222195
223196 public Mono <ConnectorDTO > getConnector (KafkaCluster cluster , String connectName ,
@@ -306,8 +279,11 @@ private Mono<Void> restartTasks(KafkaCluster cluster, String connectName,
306279 return getConnectorTasks (cluster , connectName , connectorName )
307280 .filter (taskFilter )
308281 .flatMap (t ->
309- restartConnectorTask (cluster , connectName , connectorName , t .getId ().getTask ()))
310- .then ();
282+ restartConnectorTask (
283+ cluster , connectName , connectorName ,
284+ Optional .ofNullable (t .getId ()).map (TaskIdDTO ::getTask ).orElseThrow ()
285+ )
286+ ).then ();
311287 }
312288
313289 public Flux <TaskDTO > getConnectorTasks (KafkaCluster cluster , String connectName , String connectorName ) {
@@ -318,8 +294,9 @@ public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName,
318294 .map (kafkaConnectMapper ::fromClient )
319295 .flatMap (task ->
320296 client
321- .getConnectorTaskStatus (connectorName , task .getId ().getTask ())
322- .onErrorResume (WebClientResponseException .NotFound .class , e -> Mono .empty ())
297+ .getConnectorTaskStatus (connectorName ,
298+ Optional .ofNullable (task .getId ()).map (TaskIdDTO ::getTask ).orElseThrow ()
299+ ).onErrorResume (WebClientResponseException .NotFound .class , e -> Mono .empty ())
323300 .map (kafkaConnectMapper ::fromClient )
324301 .map (task ::status )
325302 ));
@@ -372,6 +349,4 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
372349 .formatted (connectorName , connectName ));
373350 });
374351 }
375-
376- record ConnectCacheKey (KafkaCluster cluster , ClustersProperties .ConnectCluster connect ) {}
377352}
0 commit comments