diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index 8a03c3dee..cbe48f811 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -7,11 +7,13 @@ import io.kafbat.ui.connect.ApiClient; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.Connector; +import io.kafbat.ui.connect.model.ConnectorExpand; import io.kafbat.ui.connect.model.ConnectorPlugin; import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorTask; import io.kafbat.ui.connect.model.ConnectorTopics; +import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.connect.model.NewConnector; import io.kafbat.ui.connect.model.TaskStatus; import io.kafbat.ui.exception.KafkaConnectConflictResponseException; @@ -221,13 +223,17 @@ public Mono>> getConnectorTopicsWith } @Override - public Mono> getConnectors(String search) throws WebClientResponseException { - return withRetryOnConflictOrRebalance(super.getConnectors(search)); + public Mono> getConnectors( + String search, List expand + ) throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.getConnectors(search, expand)); } @Override - public Mono>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException { - return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search)); + public Mono>> getConnectorsWithHttpInfo( + String search, List expand + ) throws WebClientResponseException { + return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand)); } @Override diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 46a178f62..e9686b3bb 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -227,7 +227,6 @@ public enum LogLevel { @AllArgsConstructor public static class CacheProperties { boolean enabled = true; - Duration connectCacheExpiry = Duration.ofMinutes(1); Duration connectClusterCacheExpiry = Duration.ofHours(24); } diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index e535d0ea6..4be545da9 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -66,8 +66,12 @@ public Mono>> getConnectors(String clusterName, Stri .build(); return validateAccess(context) - .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))) - .doOnEach(sig -> audit(context, sig)); + .thenReturn( + ResponseEntity.ok( + kafkaConnectService.getConnectors(getCluster(clusterName), connectName) + .flatMapMany(m -> Flux.fromIterable(m.keySet())) + ) + ).doOnEach(sig -> audit(context, sig)); } @Override diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 5f17724fd..4057087a1 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -4,6 +4,7 @@ import io.kafbat.ui.connect.model.ClusterInfo; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; +import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.connect.model.NewConnector; import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorDTO; @@ -14,10 +15,15 @@ import io.kafbat.ui.model.ConnectorTaskStatusDTO; import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.TaskDTO; +import io.kafbat.ui.model.TaskIdDTO; import io.kafbat.ui.model.TaskStatusDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -43,6 +49,39 @@ ConnectorPluginConfigValidationResponseDTO fromClient( io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); + default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List topics) { + Objects.requireNonNull(connector.getInfo()); + Objects.requireNonNull(connector.getStatus()); + List tasks = List.of(); + + if (connector.getInfo().getTasks() != null + && connector.getStatus().getTasks() != null + ) { + Map taskIds = connector.getInfo().getTasks() + .stream().map(t -> new TaskIdDTO().task(t.getTask()).connector(t.getConnector())) + .collect(Collectors.toMap( + TaskIdDTO::getTask, + t -> t + )); + + tasks = connector.getStatus().getTasks().stream() + .map(s -> + new TaskDTO().status(fromClient(s)).id(taskIds.get(s.getId())) + ).toList(); + } + + ConnectorDTO connectorDto = fromClient(connector.getInfo()) + .connect(connect) + .status(fromClient(connector.getStatus().getConnector())); + + return InternalConnectorInfo.builder() + .connector(connectorDto) + .config(connector.getInfo().getConfig()) + .tasks(tasks) + .topics(topics) + .build(); + } + default ConnectDTO toKafkaConnect( ClustersProperties.ConnectCluster connect, List connectors, diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java b/api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java index 68b205c56..7040cf7e4 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java @@ -19,7 +19,7 @@ import org.springframework.stereotype.Component; @Component -class KafkaConfigSanitizer { +public class KafkaConfigSanitizer { private static final String SANITIZED_VALUE = "******"; diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index f7d6948d1..adb656b5b 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -5,9 +5,11 @@ import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.ClusterInfo; +import io.kafbat.ui.connect.model.ConnectorExpand; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTopics; +import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.connect.model.TaskStatus; import io.kafbat.ui.exception.ConnectorOffsetsResetException; import io.kafbat.ui.exception.NotFoundException; @@ -24,6 +26,7 @@ import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.TaskDTO; +import io.kafbat.ui.model.TaskIdDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.service.index.KafkaConnectNgramFilter; import io.kafbat.ui.util.ReactiveFailover; @@ -32,7 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.function.Predicate; -import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -47,8 +49,6 @@ public class KafkaConnectService { private final KafkaConnectMapper kafkaConnectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; private final ClustersProperties clustersProperties; - - private final AsyncCache> cachedConnectors; private final AsyncCache cacheClusterInfo; public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, @@ -57,9 +57,6 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, this.kafkaConnectMapper = kafkaConnectMapper; this.kafkaConfigSanitizer = kafkaConfigSanitizer; this.clustersProperties = clustersProperties; - this.cachedConnectors = Caffeine.newBuilder() - .expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry()) - .buildAsync(); this.cacheClusterInfo = Caffeine.newBuilder() .expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry()) .buildAsync(); @@ -74,9 +71,10 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { Flux.fromIterable(connects).flatMap(c -> getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci)) ).flatMap(tuple -> ( - getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1())) + getConnectConnectors(cluster, tuple.getT1()) + .collectList() .map(connectors -> - kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats) + kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), true) ) ) ) @@ -85,29 +83,17 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { return Flux.fromIterable(connectClusters.orElse(List.of())) .flatMap(c -> getClusterInfo(cluster, c.getName()).map(info -> - kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats) + kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false) ) ); } } - private Mono> getConnectConnectorsFromCache(ConnectCacheKey key) { - if (clustersProperties.getCache().isEnabled()) { - return Mono.fromFuture( - cachedConnectors.get(key, (t, e) -> - getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture() - ) - ); - } else { - return getConnectConnectors(key.cluster(), key.connect()).collectList(); - } - } - private Mono getClusterInfo(KafkaCluster cluster, String connectName) { return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) -> api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo) .onErrorResume(th -> { - log.error("Error on collecting cluster info" + th.getMessage(), th); + log.error("Error on collecting cluster info", th); return Mono.just(new ClusterInfo()); }).toFuture() )); @@ -116,17 +102,11 @@ private Mono getClusterInfo(KafkaCluster cluster, String connectNam private Flux getConnectConnectors( KafkaCluster cluster, ClustersProperties.ConnectCluster connect) { - return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> - Mono.zip( - getConnector(cluster, connect.getName(), connectorName), - getConnectorTasks(cluster, connect.getName(), connectorName).collectList() - ).map(tuple -> - InternalConnectorInfo.builder() - .connector(tuple.getT1()) - .config(null) - .tasks(tuple.getT2()) - .topics(null) - .build() + return getConnectorsWithErrorsSuppress(cluster, connect.getName()).flatMapMany(connectors -> + Flux.fromStream( + connectors.values().stream().map(c -> + kafkaConnectMapper.fromClient(connect.getName(), c, null) + ) ) ); } @@ -135,21 +115,20 @@ public Flux getAllConnectors(final KafkaCluster cluster, @Nullable final String search, Boolean fts) { return getConnects(cluster, false) .flatMap(connect -> - getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) - .flatMap(connectorName -> - Mono.zip( - getConnector(cluster, connect.getName(), connectorName), - getConnectorConfig(cluster, connect.getName(), connectorName), - getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), - getConnectorTopics(cluster, connect.getName(), connectorName) - ).map(tuple -> - InternalConnectorInfo.builder() - .connector(tuple.getT1()) - .config(tuple.getT2()) - .tasks(tuple.getT3()) - .topics(tuple.getT4().getTopics()) - .build()))) - .map(kafkaConnectMapper::fullConnectorInfo) + getConnectorsWithErrorsSuppress(cluster, connect.getName()) + .flatMapMany(connectors -> + Flux.fromIterable(connectors.entrySet()) + .flatMap(e -> + getConnectorTopics( + cluster, + connect.getName(), + e.getKey() + ).map(topics -> + kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics()) + ) + ) + ) + ).map(kafkaConnectMapper::fullConnectorInfo) .collectList() .map(lst -> filterConnectors(lst, search, fts)) .flatMapMany(Flux::fromIterable); @@ -165,14 +144,6 @@ private List filterConnectors( return filter.find(search); } - private Stream getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) { - return Stream.of( - fullConnectorInfo.getName(), - fullConnectorInfo.getConnect(), - fullConnectorInfo.getStatus().getState().getValue(), - fullConnectorInfo.getType().getValue()); - } - public Mono getConnectorTopics(KafkaCluster cluster, String connectClusterName, String connectorName) { return api(cluster, connectClusterName) @@ -183,15 +154,17 @@ public Mono getConnectorTopics(KafkaCluster cluster, String con .onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of()))); } - public Flux getConnectorNames(KafkaCluster cluster, String connectName) { + public Mono> getConnectors(KafkaCluster cluster, String connectName) { return api(cluster, connectName) - .mono(client -> client.getConnectors(null)) - .flatMapMany(Flux::fromIterable); + .mono(client -> + client.getConnectors(null, List.of(ConnectorExpand.INFO, ConnectorExpand.STATUS)) + ); } // returns empty flux if there was an error communicating with Connect - public Flux getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) { - return getConnectorNames(cluster, connectName).onErrorComplete(); + public Mono> getConnectorsWithErrorsSuppress( + KafkaCluster cluster, String connectName) { + return getConnectors(cluster, connectName).onErrorComplete(); } public Mono createConnector(KafkaCluster cluster, String connectName, @@ -216,8 +189,8 @@ public Mono createConnector(KafkaCluster cluster, String connectNa private Mono connectorExists(KafkaCluster cluster, String connectName, String connectorName) { - return getConnectorNames(cluster, connectName) - .any(name -> name.equals(connectorName)); + return getConnectors(cluster, connectName) + .map(m -> m.containsKey(connectorName)); } public Mono getConnector(KafkaCluster cluster, String connectName, @@ -306,8 +279,11 @@ private Mono restartTasks(KafkaCluster cluster, String connectName, return getConnectorTasks(cluster, connectName, connectorName) .filter(taskFilter) .flatMap(t -> - restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask())) - .then(); + restartConnectorTask( + cluster, connectName, connectorName, + Optional.ofNullable(t.getId()).map(TaskIdDTO::getTask).orElseThrow() + ) + ).then(); } public Flux getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) { @@ -318,8 +294,9 @@ public Flux getConnectorTasks(KafkaCluster cluster, String connectName, .map(kafkaConnectMapper::fromClient) .flatMap(task -> client - .getConnectorTaskStatus(connectorName, task.getId().getTask()) - .onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty()) + .getConnectorTaskStatus(connectorName, + Optional.ofNullable(task.getId()).map(TaskIdDTO::getTask).orElseThrow() + ).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty()) .map(kafkaConnectMapper::fromClient) .map(task::status) )); @@ -372,6 +349,4 @@ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName .formatted(connectorName, connectName)); }); } - - record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {} } diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java index 186c4a8a1..eed5aaef8 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java @@ -1,6 +1,9 @@ package io.kafbat.ui.service.integration.odd; -import io.kafbat.ui.model.ConnectorTypeDTO; +import static io.kafbat.ui.connect.model.Connector.TypeEnum.SINK; +import static io.kafbat.ui.connect.model.Connector.TypeEnum.SOURCE; + +import io.kafbat.ui.connect.model.Connector; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -19,7 +22,7 @@ record ConnectorInfo(List inputs, List outputs) { static ConnectorInfo extract(String className, - ConnectorTypeDTO type, + Connector.TypeEnum type, Map config, List topicsFromApi, // can be empty for old Connect API versions UnaryOperator topicOddrnBuilder) { @@ -40,7 +43,7 @@ static ConnectorInfo extract(String className, }; } - private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type, + private static ConnectorInfo extractFileIoConnector(Connector.TypeEnum type, List topics, Map config, UnaryOperator topicOddrnBuilder) { @@ -50,7 +53,7 @@ private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type, ); } - private static ConnectorInfo extractJdbcSink(ConnectorTypeDTO type, + private static ConnectorInfo extractJdbcSink(Connector.TypeEnum type, List topics, Map config, UnaryOperator topicOddrnBuilder) { @@ -103,7 +106,7 @@ private static ConnectorInfo extractDebeziumMysql(Map config) { return new ConnectorInfo(inputs, List.of()); } - private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type, + private static ConnectorInfo extractS3Sink(Connector.TypeEnum type, List topics, Map config, UnaryOperator topicOrrdnBuilder) { @@ -119,20 +122,20 @@ private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type, ); } - private static List extractInputs(ConnectorTypeDTO type, + private static List extractInputs(Connector.TypeEnum type, List topicsFromApi, Map config, UnaryOperator topicOrrdnBuilder) { - return type == ConnectorTypeDTO.SINK + return type == SINK ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder) : List.of(); } - private static List extractOutputs(ConnectorTypeDTO type, + private static List extractOutputs(Connector.TypeEnum type, List topicsFromApi, Map config, UnaryOperator topicOrrdnBuilder) { - return type == ConnectorTypeDTO.SOURCE + return type == SOURCE ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder) : List.of(); } diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java index a58c4e881..cf3298379 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java @@ -1,8 +1,9 @@ package io.kafbat.ui.service.integration.odd; +import io.kafbat.ui.connect.model.Connector; import io.kafbat.ui.connect.model.ConnectorTopics; +import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.model.ConnectDTO; -import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.service.KafkaConnectService; import java.net.URI; @@ -25,12 +26,12 @@ class ConnectorsExporter { Flux export(KafkaCluster cluster) { return kafkaConnectService.getConnects(cluster, false) - .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) - .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName)) - .flatMap(connectorDTO -> - kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName()) - .map(topics -> createConnectorDataEntity(cluster, connect, connectorDTO, topics))) - .buffer(100) + .flatMap(connect -> kafkaConnectService.getConnectorsWithErrorsSuppress(cluster, connect.getName()) + .flatMapMany(connectors -> + Flux.fromIterable(connectors.entrySet()).flatMap(e -> + kafkaConnectService.getConnectorTopics(cluster, connect.getName(), e.getKey()) + .map(topics -> createConnectorDataEntity(cluster, connect, e.getValue(), topics))) + ).buffer(100) .map(connectDataEntities -> { String dsOddrn = Oddrn.connectDataSourceOddrn(connect.getAddress()); return new DataEntityList() @@ -54,10 +55,11 @@ private static DataSource toDataSource(ConnectDTO connect) { private static DataEntity createConnectorDataEntity(KafkaCluster cluster, ConnectDTO connect, - ConnectorDTO connector, + ExpandedConnector connector, ConnectorTopics connectorTopics) { + Connector connectorInfo = connector.getInfo(); var metadata = new HashMap<>(extractMetadata(connector)); - metadata.put("type", connector.getType().name()); + metadata.put("type", connectorInfo.getType().name()); var info = extractConnectorInfo(cluster, connector, connectorTopics); DataTransformer transformer = new DataTransformer(); @@ -65,9 +67,9 @@ private static DataEntity createConnectorDataEntity(KafkaCluster cluster, transformer.setOutputs(info.outputs()); return new DataEntity() - .oddrn(Oddrn.connectorOddrn(connect.getAddress(), connector.getName())) - .name(connector.getName()) - .description("Kafka Connector \"%s\" (%s)".formatted(connector.getName(), connector.getType())) + .oddrn(Oddrn.connectorOddrn(connect.getAddress(), connectorInfo.getName())) + .name(connectorInfo.getName()) + .description("Kafka Connector \"%s\" (%s)".formatted(connectorInfo.getName(), connectorInfo.getType())) .type(DataEntityType.JOB) .dataTransformer(transformer) .metadata(List.of( @@ -76,18 +78,18 @@ private static DataEntity createConnectorDataEntity(KafkaCluster cluster, .metadata(metadata))); } - private static Map extractMetadata(ConnectorDTO connector) { + private static Map extractMetadata(ExpandedConnector connector) { // will be sanitized by KafkaConfigSanitizer (if it's enabled) - return connector.getConfig(); + return connector.getInfo().getConfig(); } private static ConnectorInfo extractConnectorInfo(KafkaCluster cluster, - ConnectorDTO connector, + ExpandedConnector connector, ConnectorTopics topics) { return ConnectorInfo.extract( - (String) connector.getConfig().get("connector.class"), - connector.getType(), - connector.getConfig(), + (String) connector.getInfo().getConfig().get("connector.class"), + connector.getInfo().getType(), + connector.getInfo().getConfig(), topics.getTopics(), topic -> Oddrn.topicOddrn(cluster, topic) ); diff --git a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java index 2e95f353a..b82d34d20 100644 --- a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java @@ -4,7 +4,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.kafbat.ui.connect.model.Connector; +import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorTopics; +import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorTypeDTO; @@ -34,11 +37,11 @@ void exportsConnectorsAsDataTransformers() { connect.setName("testConnect"); connect.setAddress("http://kconnect:8083"); - ConnectorDTO sinkConnector = new ConnectorDTO(); - sinkConnector.setName("testSink"); - sinkConnector.setType(ConnectorTypeDTO.SINK); - sinkConnector.setConnect(connect.getName()); - sinkConnector.setConfig( + ConnectorDTO sinkConnectorDto = new ConnectorDTO(); + sinkConnectorDto.setName("testSink"); + sinkConnectorDto.setType(ConnectorTypeDTO.SINK); + sinkConnectorDto.setConnect(connect.getName()); + sinkConnectorDto.setConfig( Map.of( "connector.class", "FileStreamSink", "file", "filePathHere", @@ -46,11 +49,11 @@ void exportsConnectorsAsDataTransformers() { ) ); - ConnectorDTO sourceConnector = new ConnectorDTO(); - sourceConnector.setName("testSource"); - sourceConnector.setConnect(connect.getName()); - sourceConnector.setType(ConnectorTypeDTO.SOURCE); - sourceConnector.setConfig( + ConnectorDTO sourceConnectorDto = new ConnectorDTO(); + sourceConnectorDto.setName("testSource"); + sourceConnectorDto.setConnect(connect.getName()); + sourceConnectorDto.setType(ConnectorTypeDTO.SOURCE); + sourceConnectorDto.setConfig( Map.of( "connector.class", "FileStreamSource", "file", "filePathHere", @@ -58,22 +61,49 @@ void exportsConnectorsAsDataTransformers() { ) ); + ExpandedConnector sinkConnector = new ExpandedConnector() + .status( + new ConnectorStatus() + .name(sinkConnectorDto.getName()) + .tasks(List.of()) + ) + .info( + new Connector() + .name(sinkConnectorDto.getName()) + .type(Connector.TypeEnum.SINK) + .config(sinkConnectorDto.getConfig()) + .tasks(List.of()) + ); + + ExpandedConnector sourceConnector = new ExpandedConnector() + .status( + new ConnectorStatus() + .name(sourceConnectorDto.getName()) + .tasks(List.of()) + ) + .info( + new Connector() + .name(sourceConnectorDto.getName()) + .type(Connector.TypeEnum.SOURCE) + .config(sourceConnectorDto.getConfig()) + .tasks(List.of()) + ); + + Map connectors = Map.of( + sinkConnectorDto.getName(), sinkConnector, + sourceConnectorDto.getName(), sourceConnector + ); + when(kafkaConnectService.getConnects(CLUSTER, false)) .thenReturn(Flux.just(connect)); - when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName())) - .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName())); - - when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName())) - .thenReturn(Mono.just(sinkConnector)); - - when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sourceConnector.getName())) - .thenReturn(Mono.just(sourceConnector)); + when(kafkaConnectService.getConnectorsWithErrorsSuppress(CLUSTER, connect.getName())) + .thenReturn(Mono.just(connectors)); - when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sourceConnector.getName())) + when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sourceConnectorDto.getName())) .thenReturn(Mono.just(new ConnectorTopics().topics(List.of("outputTopic")))); - when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sinkConnector.getName())) + when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sinkConnectorDto.getName())) .thenReturn(Mono.just(new ConnectorTopics().topics(List.of("inputTopic")))); StepVerifier.create(exporter.export(CLUSTER)) @@ -90,7 +120,7 @@ void exportsConnectorsAsDataTransformers() { .satisfies(sink -> { assertThat(sink.getMetadata()).isNotNull(); assertThat(sink.getDataTransformer()).isNotNull(); - assertThat(sink.getMetadata().get(0).getMetadata()) + assertThat(sink.getMetadata().getFirst().getMetadata()) .containsOnlyKeys("type", "connector.class", "file", "topic"); assertThat(sink.getDataTransformer().getInputs()).contains( "//kafka/cluster/localhost:9092/topics/inputTopic"); @@ -102,7 +132,7 @@ void exportsConnectorsAsDataTransformers() { .satisfies(source -> { assertThat(source.getMetadata()).isNotNull(); assertThat(source.getDataTransformer()).isNotNull(); - assertThat(source.getMetadata().get(0).getMetadata()) + assertThat(source.getMetadata().getFirst().getMetadata()) .containsOnlyKeys("type", "connector.class", "file", "topic"); assertThat(source.getDataTransformer().getOutputs()).contains( "//kafka/cluster/localhost:9092/topics/outputTopic"); diff --git a/contract-typespec/api/config.tsp b/contract-typespec/api/config.tsp index eea72c412..39647a3b1 100644 --- a/contract-typespec/api/config.tsp +++ b/contract-typespec/api/config.tsp @@ -138,8 +138,6 @@ model ApplicationConfig { cache?: { enabled?: boolean; @format("duration") - connectCacheExpiry?: string; - @format("duration") connectClusterCacheExpiry?: string; }; clusters?: { diff --git a/contract/src/main/resources/swagger/kafka-connect-api.yaml b/contract/src/main/resources/swagger/kafka-connect-api.yaml index 98b60e2c7..292e40095 100644 --- a/contract/src/main/resources/swagger/kafka-connect-api.yaml +++ b/contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -26,7 +26,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/ClusterInfo' + $ref: '#/components/schemas/ClusterInfo' /connectors: get: @@ -40,15 +40,25 @@ paths: required: false schema: type: string + - name: expand + in: query + style: form + required: false + explode: true + schema: + type: array + items: + $ref: '#/components/schemas/ConnectorExpand' + responses: 200: description: OK content: application/json: schema: - type: array - items: - type: string + type: object + additionalProperties: + $ref: '#/components/schemas/ExpandedConnector' post: tags: - KafkaConnectClient @@ -179,7 +189,7 @@ paths: application/json: schema: $ref: '#/components/schemas/ConnectorOffsetsError' - + get: tags: - KafkaConnectClient @@ -437,12 +447,12 @@ components: ClusterInfo: type: object properties: - version: + version: type: string commit: type: string kafka_cluster_id: - type: string + type: string ConnectorConfig: type: object @@ -491,6 +501,14 @@ components: - source - sink + ExpandedConnector: + type: object + properties: + status: + $ref: '#/components/schemas/ConnectorStatus' + info: + $ref: '#/components/schemas/Connector' + TaskStatus: type: object properties: @@ -646,6 +664,11 @@ components: items: type: string + ConnectorExpand: + type: string + enum: + - info + - status security: - basicAuth: []