diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index fb5c839ee..b58ef1543 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -192,6 +192,7 @@ public enum LogLevel { public static class CacheProperties { boolean enabled = true; Duration connectCacheExpiry = Duration.ofMinutes(1); + Duration connectClusterCacheExpiry = Duration.ofHours(24); } @PostConstruct diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 1a4073191..5f17724fd 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -1,6 +1,7 @@ package io.kafbat.ui.mapper; import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.connect.model.ClusterInfo; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; import io.kafbat.ui.connect.model.NewConnector; @@ -45,6 +46,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient( default ConnectDTO toKafkaConnect( ClustersProperties.ConnectCluster connect, List connectors, + ClusterInfo clusterInfo, boolean withStats) { Integer connectorCount = null; Integer failedConnectors = null; @@ -66,12 +68,17 @@ default ConnectDTO toKafkaConnect( .filter(ConnectorStateDTO.FAILED::equals) .map(s -> 1).orElse(0); - tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); - - for (TaskDTO task : connector.getTasks()) { - if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { - failedTasksCount += tasksCount; - } + tasksCount += internalConnector.map(ConnectorDTO::getTasks).map(List::size).orElse(0); + + if (connector.getTasks() != null) { + failedTasksCount += (int) connector.getTasks().stream() + .filter(t -> + Optional.ofNullable(t) + .map(TaskDTO::getStatus) + .map(TaskStatusDTO::getState) + .map(ConnectorTaskStatusDTO.FAILED::equals) + .orElse(false) + ).count(); } } @@ -83,7 +90,10 @@ default ConnectDTO toKafkaConnect( .connectorsCount(connectorCount) .failedConnectorsCount(failedConnectors) .tasksCount(tasksCount) - .failedTasksCount(failedTasksCount); + .failedTasksCount(failedTasksCount) + .version(clusterInfo.getVersion()) + .commit(clusterInfo.getCommit()) + .clusterId(clusterInfo.getKafkaClusterId()); } default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 797e463ce..c1aefcd4f 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.api.KafkaConnectClientApi; +import io.kafbat.ui.connect.model.ClusterInfo; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTopics; @@ -12,7 +13,6 @@ import io.kafbat.ui.exception.ConnectorOffsetsResetException; import io.kafbat.ui.exception.NotFoundException; import io.kafbat.ui.exception.ValidationException; -import io.kafbat.ui.mapper.ClusterMapper; import io.kafbat.ui.mapper.KafkaConnectMapper; import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorActionDTO; @@ -40,49 +40,59 @@ import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; @Service @Slf4j public class KafkaConnectService { - private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; private final ClustersProperties clustersProperties; private final AsyncCache> cachedConnectors; + private final AsyncCache cacheClusterInfo; - public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper, + public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, KafkaConfigSanitizer kafkaConfigSanitizer, ClustersProperties clustersProperties) { - this.clusterMapper = clusterMapper; this.kafkaConnectMapper = kafkaConnectMapper; this.kafkaConfigSanitizer = kafkaConfigSanitizer; this.clustersProperties = clustersProperties; this.cachedConnectors = Caffeine.newBuilder() .expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry()) .buildAsync(); + this.cacheClusterInfo = Caffeine.newBuilder() + .expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry()) + .buildAsync(); } public Flux getConnects(KafkaCluster cluster, boolean withStats) { Optional> connectClusters = Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()); + if (withStats) { return connectClusters.map(connects -> - Flux.fromIterable(connects).flatMap(connect -> ( - getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map( - connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats) - ) + Flux.fromIterable(connects).flatMap(c -> + getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci)) + ).flatMap(tuple -> ( + getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1())) + .map(connectors -> + kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats) + ) ) ) ).orElse(Flux.fromIterable(List.of())); } else { - return Flux.fromIterable(connectClusters.map(connects -> - connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList() - ).orElse(List.of())); + return Flux.fromIterable(connectClusters.orElse(List.of())) + .flatMap(c -> + getClusterInfo(cluster, c.getName()).map(info -> + kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats) + ) + ); } } - private Mono> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) { + private Mono> getConnectConnectorsFromCache(ConnectCacheKey key) { if (clustersProperties.getCache().isEnabled()) { return Mono.fromFuture( cachedConnectors.get(key, (t, e) -> @@ -94,6 +104,16 @@ private Mono> getConnectConnectorsFromCache(ConnectC } } + private Mono getClusterInfo(KafkaCluster cluster, String connectName) { + return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) -> + api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo) + .onErrorResume(th -> { + log.error("Error on collecting cluster info" + th.getMessage(), th); + return Mono.just(new ClusterInfo()); + }).toFuture() + )); + } + private Flux getConnectConnectors( KafkaCluster cluster, ClustersProperties.ConnectCluster connect) { @@ -177,12 +197,13 @@ public Mono createConnector(KafkaCluster cluster, String connectNa .mono(client -> connector .flatMap(c -> connectorExists(cluster, connectName, c.getName()) - .map(exists -> { + .flatMap(exists -> { if (Boolean.TRUE.equals(exists)) { - throw new ValidationException( - String.format("Connector with name %s already exists", c.getName())); + return Mono.error(new ValidationException( + String.format("Connector with name %s already exists", c.getName()))); + } else { + return Mono.just(c); } - return c; })) .map(kafkaConnectMapper::toClient) .flatMap(client::createConnector) diff --git a/api/src/test/java/io/kafbat/ui/mapper/KafkaConnectMapperTest.java b/api/src/test/java/io/kafbat/ui/mapper/KafkaConnectMapperTest.java new file mode 100644 index 000000000..ab404046f --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/mapper/KafkaConnectMapperTest.java @@ -0,0 +1,115 @@ +package io.kafbat.ui.mapper; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.connect.model.ClusterInfo; +import io.kafbat.ui.model.ConnectDTO; +import io.kafbat.ui.model.ConnectorDTO; +import io.kafbat.ui.model.ConnectorStateDTO; +import io.kafbat.ui.model.ConnectorStatusDTO; +import io.kafbat.ui.model.ConnectorTaskStatusDTO; +import io.kafbat.ui.model.TaskDTO; +import io.kafbat.ui.model.TaskIdDTO; +import io.kafbat.ui.model.TaskStatusDTO; +import io.kafbat.ui.model.connect.InternalConnectorInfo; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.jupiter.api.Test; +import org.openapitools.jackson.nullable.JsonNullable; + +class KafkaConnectMapperTest { + + @Test + void toKafkaConnect() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + List connectors = new ArrayList<>(); + int failedConnectors = 0; + int failedTasks = 0; + int tasksPerConnector = random.nextInt(1, 10); + + for (int i = 0; i < 10; i++) { + ConnectorStateDTO connectorState; + if (random.nextBoolean()) { + connectorState = ConnectorStateDTO.FAILED; + failedConnectors++; + } else { + connectorState = ConnectorStateDTO.RUNNING; + } + + ConnectorDTO connectorDto = new ConnectorDTO(); + connectorDto.setName(UUID.randomUUID().toString()); + connectorDto.setStatus( + new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString()) + ); + + List tasks = new ArrayList<>(); + List taskIds = new ArrayList<>(); + + for (int j = 0; j < tasksPerConnector; j++) { + TaskDTO task = new TaskDTO(); + TaskIdDTO taskId = new TaskIdDTO(UUID.randomUUID().toString(), j); + task.setId(taskId); + + ConnectorTaskStatusDTO state; + if (random.nextBoolean()) { + state = ConnectorTaskStatusDTO.FAILED; + failedTasks++; + } else { + state = ConnectorTaskStatusDTO.RUNNING; + } + + TaskStatusDTO status = new TaskStatusDTO(); + status.setState(state); + task.setStatus(status); + tasks.add(task); + taskIds.add(taskId); + } + + connectorDto.setTasks(taskIds); + InternalConnectorInfo connector = InternalConnectorInfo.builder() + .connector(connectorDto) + .tasks(tasks) + .build(); + + connectors.add(connector); + } + + ClusterInfo clusterInfo = new ClusterInfo(); + clusterInfo.setVersion(UUID.randomUUID().toString()); + clusterInfo.setCommit(UUID.randomUUID().toString()); + clusterInfo.setKafkaClusterId(UUID.randomUUID().toString()); + + ClustersProperties.ConnectCluster connectCluster = ClustersProperties.ConnectCluster.builder() + .name(UUID.randomUUID().toString()) + .address("http://localhost:" + random.nextInt(1000, 5000)) + .username(UUID.randomUUID().toString()) + .password(UUID.randomUUID().toString()).build(); + + ConnectDTO connectDto = new ConnectDTO(); + connectDto.setName(connectCluster.getName()); + connectDto.setAddress(connectCluster.getAddress()); + connectDto.setVersion(JsonNullable.of(clusterInfo.getVersion())); + connectDto.setCommit(JsonNullable.of(clusterInfo.getCommit())); + connectDto.setClusterId(JsonNullable.of(clusterInfo.getKafkaClusterId())); + connectDto.setConnectorsCount(JsonNullable.of(connectors.size())); + connectDto.setFailedConnectorsCount(JsonNullable.of(failedConnectors)); + connectDto.setTasksCount(JsonNullable.of(connectors.size() * tasksPerConnector)); + connectDto.setFailedTasksCount(JsonNullable.of(failedTasks)); + + KafkaConnectMapper mapper = new KafkaConnectMapperImpl(); + ConnectDTO kafkaConnect = mapper.toKafkaConnect( + connectCluster, + connectors, + clusterInfo, + true + ); + + assertThat(kafkaConnect).isNotNull(); + assertThat(kafkaConnect).isEqualTo(connectDto); + + } +} diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index ba54bc7c5..eabf136c2 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -3449,6 +3449,16 @@ components: failedTasksCount: type: integer nullable: true + version: + type: string + nullable: true + commit: + type: string + nullable: true + clusterId: + type: string + nullable: true + required: - name diff --git a/contract/src/main/resources/swagger/kafka-connect-api.yaml b/contract/src/main/resources/swagger/kafka-connect-api.yaml index 5fa8dc230..98b60e2c7 100644 --- a/contract/src/main/resources/swagger/kafka-connect-api.yaml +++ b/contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -14,6 +14,20 @@ servers: - url: /localhost paths: + /: + get: + tags: + - KafkaConnectClient + summary: get kafka connect info + operationId: getClusterInfo + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ClusterInfo' + /connectors: get: tags: @@ -419,6 +433,17 @@ components: type: http scheme: basic schemas: + + ClusterInfo: + type: object + properties: + version: + type: string + commit: + type: string + kafka_cluster_id: + type: string + ConnectorConfig: type: object additionalProperties: