1313import io .kafbat .ui .exception .ConnectorOffsetsResetException ;
1414import io .kafbat .ui .exception .NotFoundException ;
1515import io .kafbat .ui .exception .ValidationException ;
16- import io .kafbat .ui .mapper .ClusterMapper ;
1716import io .kafbat .ui .mapper .KafkaConnectMapper ;
1817import io .kafbat .ui .model .ConnectDTO ;
1918import io .kafbat .ui .model .ConnectorActionDTO ;
3736import javax .annotation .Nullable ;
3837import lombok .extern .slf4j .Slf4j ;
3938import org .apache .commons .lang3 .StringUtils ;
40- import org .apache .kafka .common .protocol .types .Field ;
4139import org .springframework .stereotype .Service ;
4240import org .springframework .web .reactive .function .client .WebClientResponseException ;
4341import reactor .core .publisher .Flux ;
4745@ Service
4846@ Slf4j
4947public class KafkaConnectService {
50- private final ClusterMapper clusterMapper ;
5148 private final KafkaConnectMapper kafkaConnectMapper ;
5249 private final KafkaConfigSanitizer kafkaConfigSanitizer ;
5350 private final ClustersProperties clustersProperties ;
5451
5552 private final AsyncCache <ConnectCacheKey , List <InternalConnectorInfo >> cachedConnectors ;
5653 private final AsyncCache <String , ClusterInfo > cacheClusterInfo ;
5754
58- public KafkaConnectService (ClusterMapper clusterMapper , KafkaConnectMapper kafkaConnectMapper ,
55+ public KafkaConnectService (KafkaConnectMapper kafkaConnectMapper ,
5956 KafkaConfigSanitizer kafkaConfigSanitizer ,
6057 ClustersProperties clustersProperties ) {
61- this .clusterMapper = clusterMapper ;
6258 this .kafkaConnectMapper = kafkaConnectMapper ;
6359 this .kafkaConfigSanitizer = kafkaConfigSanitizer ;
6460 this .clustersProperties = clustersProperties ;
@@ -79,7 +75,7 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
7975 Flux .fromIterable (connects ).flatMap ( c ->
8076 getClusterInfo (cluster , c .getName ()).map (ci -> Tuples .of (c , ci ))
8177 ).flatMap (tuple -> (
82- getConnectConnectorsFromCache (new ConnectCacheKey (cluster , tuple .getT1 ()), withStats )
78+ getConnectConnectorsFromCache (new ConnectCacheKey (cluster , tuple .getT1 ()))
8379 .map (connectors ->
8480 kafkaConnectMapper .toKafkaConnect (tuple .getT1 (), connectors , tuple .getT2 (), withStats )
8581 )
@@ -96,7 +92,7 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
9692 }
9793 }
9894
99- private Mono <List <InternalConnectorInfo >> getConnectConnectorsFromCache (ConnectCacheKey key , boolean withStats ) {
95+ private Mono <List <InternalConnectorInfo >> getConnectConnectorsFromCache (ConnectCacheKey key ) {
10096 if (clustersProperties .getCache ().isEnabled ()) {
10197 return Mono .fromFuture (
10298 cachedConnectors .get (key , (t , e ) ->
@@ -201,12 +197,13 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
201197 .mono (client ->
202198 connector
203199 .flatMap (c -> connectorExists (cluster , connectName , c .getName ())
204- .map (exists -> {
200+ .flatMap (exists -> {
205201 if (Boolean .TRUE .equals (exists )) {
206- throw new ValidationException (
207- String .format ("Connector with name %s already exists" , c .getName ()));
202+ return Mono .error (new ValidationException (
203+ String .format ("Connector with name %s already exists" , c .getName ())));
204+ } else {
205+ return Mono .just (c );
208206 }
209- return c ;
210207 }))
211208 .map (kafkaConnectMapper ::toClient )
212209 .flatMap (client ::createConnector )
0 commit comments