diff --git a/api/build.gradle b/api/build.gradle index 8dfeea404..7ac4b9744 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -25,6 +25,7 @@ dependencies { implementation libs.spring.starter.oauth2.client implementation libs.spring.security.oauth2.resource.server implementation libs.spring.boot.actuator + compileOnly libs.spring.boot.devtools implementation libs.spring.security.ldap @@ -48,6 +49,7 @@ dependencies { implementation libs.jackson.databind.nullable implementation libs.cel + implementation libs.caffeine antlr libs.antlr implementation libs.antlr.runtime diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 23be86369..fb5c839ee 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -5,6 +5,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -36,6 +37,8 @@ public class ClustersProperties { PollingProperties polling = new PollingProperties(); + CacheProperties cache = new CacheProperties(); + @Data public static class Cluster { @NotBlank(message = "field name for for cluster could not be blank") @@ -183,6 +186,14 @@ public enum LogLevel { } } + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class CacheProperties { + boolean enabled = true; + Duration connectCacheExpiry = Duration.ofMinutes(1); + } + @PostConstruct public void validateAndSetDefaults() { if (clusters != null) { diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 945d77f92..d7c0f4560 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono>> getConnects(String clusterName, + Boolean withStats, ServerWebExchange exchange) { - Flux availableConnects = kafkaConnectService.getConnects(getCluster(clusterName)) - .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName)); + Flux availableConnects = kafkaConnectService.getConnects( + getCluster(clusterName), withStats != null ? withStats : false + ).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName)); return Mono.just(ResponseEntity.ok(availableConnects)); } diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java index f67b4aad3..53e624528 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java @@ -1,6 +1,5 @@ package io.kafbat.ui.mapper; -import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.BrokerConfigDTO; import io.kafbat.ui.model.BrokerDTO; import io.kafbat.ui.model.BrokerDiskUsageDTO; @@ -11,7 +10,6 @@ import io.kafbat.ui.model.ClusterStatsDTO; import io.kafbat.ui.model.ConfigSourceDTO; import io.kafbat.ui.model.ConfigSynonymDTO; -import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.InternalBroker; import io.kafbat.ui.model.InternalBrokerConfig; import io.kafbat.ui.model.InternalBrokerDiskUsage; @@ -107,8 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) { ReplicaDTO toReplica(InternalReplica replica); - ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect); - List toFeaturesEnum(List features); default List map(Map map) { diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 2bef74b51..1a4073191 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -1,18 +1,22 @@ package io.kafbat.ui.mapper; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; import io.kafbat.ui.connect.model.NewConnector; +import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO; import io.kafbat.ui.model.ConnectorPluginDTO; +import io.kafbat.ui.model.ConnectorStateDTO; import io.kafbat.ui.model.ConnectorStatusDTO; import io.kafbat.ui.model.ConnectorTaskStatusDTO; import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TaskStatusDTO; -import io.kafbat.ui.model.connect.InternalConnectInfo; +import io.kafbat.ui.model.connect.InternalConnectorInfo; import java.util.List; +import java.util.Optional; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -38,7 +42,51 @@ ConnectorPluginConfigValidationResponseDTO fromClient( io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); - default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) { + default ConnectDTO toKafkaConnect( + ClustersProperties.ConnectCluster connect, + List connectors, + boolean withStats) { + Integer connectorCount = null; + Integer failedConnectors = null; + Integer tasksCount = null; + Integer failedTasksCount = null; + + if (withStats) { + connectorCount = connectors.size(); + failedConnectors = 0; + tasksCount = 0; + failedTasksCount = 0; + + for (InternalConnectorInfo connector : connectors) { + Optional internalConnector = Optional.ofNullable(connector.getConnector()); + + failedConnectors += internalConnector + .map(ConnectorDTO::getStatus) + .map(ConnectorStatusDTO::getState) + .filter(ConnectorStateDTO.FAILED::equals) + .map(s -> 1).orElse(0); + + tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); + + for (TaskDTO task : connector.getTasks()) { + if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { + failedTasksCount += tasksCount; + } + } + } + + } + + return new ConnectDTO() + .address(connect.getAddress()) + .name(connect.getName()) + .connectorsCount(connectorCount) + .failedConnectorsCount(failedConnectors) + .tasksCount(tasksCount) + .failedTasksCount(failedTasksCount); + } + + default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { ConnectorDTO connector = connectInfo.getConnector(); List tasks = connectInfo.getTasks(); int failedTasksCount = (int) tasks.stream() diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java similarity index 91% rename from api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java rename to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java index ba8306cde..6c884316a 100644 --- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java +++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java @@ -9,7 +9,7 @@ @Data @Builder(toBuilder = true) -public class InternalConnectInfo { +public class InternalConnectorInfo { private final ConnectorDTO connector; private final Map config; private final List tasks; diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 31f552885..797e463ce 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -1,5 +1,9 @@ package io.kafbat.ui.service; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; @@ -21,15 +25,15 @@ import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.TaskDTO; -import io.kafbat.ui.model.connect.InternalConnectInfo; +import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.util.ReactiveFailover; +import jakarta.validation.Valid; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -39,23 +43,78 @@ @Service @Slf4j -@RequiredArgsConstructor public class KafkaConnectService { private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; + private final ClustersProperties clustersProperties; + + private final AsyncCache> cachedConnectors; + + public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper, + KafkaConfigSanitizer kafkaConfigSanitizer, + ClustersProperties clustersProperties) { + this.clusterMapper = clusterMapper; + this.kafkaConnectMapper = kafkaConnectMapper; + this.kafkaConfigSanitizer = kafkaConfigSanitizer; + this.clustersProperties = clustersProperties; + this.cachedConnectors = Caffeine.newBuilder() + .expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry()) + .buildAsync(); + } - public Flux getConnects(KafkaCluster cluster) { - return Flux.fromIterable( - Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) - .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList()) - .orElse(List.of()) + public Flux getConnects(KafkaCluster cluster, boolean withStats) { + Optional> connectClusters = + Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()); + if (withStats) { + return connectClusters.map(connects -> + Flux.fromIterable(connects).flatMap(connect -> ( + getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map( + connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats) + ) + ) + ) + ).orElse(Flux.fromIterable(List.of())); + } else { + return Flux.fromIterable(connectClusters.map(connects -> + connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList() + ).orElse(List.of())); + } + } + + private Mono> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) { + if (clustersProperties.getCache().isEnabled()) { + return Mono.fromFuture( + cachedConnectors.get(key, (t, e) -> + getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture() + ) + ); + } else { + return getConnectConnectors(key.cluster(), key.connect()).collectList(); + } + } + + private Flux getConnectConnectors( + KafkaCluster cluster, + ClustersProperties.ConnectCluster connect) { + return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> + Mono.zip( + getConnector(cluster, connect.getName(), connectorName), + getConnectorTasks(cluster, connect.getName(), connectorName).collectList() + ).map(tuple -> + InternalConnectorInfo.builder() + .connector(tuple.getT1()) + .config(null) + .tasks(tuple.getT2()) + .topics(null) + .build() + ) ); } public Flux getAllConnectors(final KafkaCluster cluster, @Nullable final String search) { - return getConnects(cluster) + return getConnects(cluster, false) .flatMap(connect -> getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) .flatMap(connectorName -> @@ -65,7 +124,7 @@ public Flux getAllConnectors(final KafkaCluster cluster, getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), getConnectorTopics(cluster, connect.getName(), connectorName) ).map(tuple -> - InternalConnectInfo.builder() + InternalConnectorInfo.builder() .connector(tuple.getT1()) .config(tuple.getT2()) .tasks(tuple.getT3()) @@ -289,4 +348,6 @@ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName .formatted(connectorName, connectName)); }); } + + record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {} } diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java index 2e7c7ca8b..a58c4e881 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java @@ -24,7 +24,7 @@ class ConnectorsExporter { private final KafkaConnectService kafkaConnectService; Flux export(KafkaCluster cluster) { - return kafkaConnectService.getConnects(cluster) + return kafkaConnectService.getConnects(cluster, false) .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName)) .flatMap(connectorDTO -> @@ -41,7 +41,7 @@ Flux export(KafkaCluster cluster) { } Flux getConnectDataSources(KafkaCluster cluster) { - return kafkaConnectService.getConnects(cluster) + return kafkaConnectService.getConnects(cluster, false) .map(ConnectorsExporter::toDataSource); } diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 0686de2c4..df0ac5426 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) { } private String serializeToYaml(PropertiesStructure props) { - //representer, that skips fields with null values - Representer representer = new Representer(new DumperOptions()) { - @Override - protected NodeTuple representJavaBeanProperty(Object javaBean, - Property property, - Object propertyValue, - Tag customTag) { - if (propertyValue == null) { - return null; // if value of property is null, ignore it. - } else { - return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag); - } - } - }; + Representer representer = new YamlNullSkipRepresenter(new DumperOptions()); var propertyUtils = new PropertyUtils(); propertyUtils.setBeanAccess(BeanAccess.FIELD); representer.setPropertyUtils(propertyUtils); diff --git a/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java b/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java new file mode 100644 index 000000000..c1879a6fc --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java @@ -0,0 +1,28 @@ +package io.kafbat.ui.util; + +import java.time.Duration; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.introspector.Property; +import org.yaml.snakeyaml.nodes.NodeTuple; +import org.yaml.snakeyaml.nodes.Tag; +import org.yaml.snakeyaml.representer.Representer; + +// representer, that skips fields with null values +public class YamlNullSkipRepresenter extends Representer { + public YamlNullSkipRepresenter(DumperOptions options) { + super(options); + this.representers.put(Duration.class, data -> this.representScalar(Tag.STR, data.toString())); + } + + @Override + protected NodeTuple representJavaBeanProperty(Object javaBean, + Property property, + Object propertyValue, + Tag customTag) { + if (propertyValue == null) { + return null; // if value of property is null, ignore it. + } else { + return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag); + } + } +} diff --git a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java index fb80338e7..2e95f353a 100644 --- a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java @@ -58,7 +58,7 @@ void exportsConnectorsAsDataTransformers() { ) ); - when(kafkaConnectService.getConnects(CLUSTER)) + when(kafkaConnectService.getConnects(CLUSTER, false)) .thenReturn(Flux.just(connect)); when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName())) diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 77614253e..b8a7ae81b 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1401,6 +1401,11 @@ paths: required: true schema: type: string + - name: withStats + in: query + required: false + schema: + type: boolean responses: 200: description: OK @@ -3434,6 +3439,18 @@ components: type: string address: type: string + connectors_count: + type: integer + nullable: true + failed_connectors_count: + type: integer + nullable: true + tasks_count: + type: integer + nullable: true + failed_tasks_count: + type: integer + nullable: true required: - name diff --git a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java index 2f480b8ca..231f79cd7 100644 --- a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java +++ b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java @@ -177,7 +177,7 @@ public ApiService createConnector(Connector connector) { @Step public String getFirstConnectName(String clusterName) { - return Objects.requireNonNull(connectorApi().getConnects(clusterName).blockFirst()).getName(); + return Objects.requireNonNull(connectorApi().getConnects(clusterName, false).blockFirst()).getName(); } @SneakyThrows diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 10ab35a76..dfddc8eb0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -98,6 +98,7 @@ micrometer-registry-prometheus = { module = 'io.micrometer:micrometer-registry-p antlr = { module = 'org.antlr:antlr4', version.ref = 'antlr' } antlr-runtime = { module = 'org.antlr:antlr4-runtime', version.ref = 'antlr' } cel = { module = 'dev.cel:cel', version.ref = 'cel' } +caffeine = { module = 'com.github.ben-manes.caffeine:caffeine', version = '3.2.2'} testcontainers = { module = 'org.testcontainers:testcontainers', version.ref = 'testcontainers' } testcontainers-kafka = { module = 'org.testcontainers:kafka', version.ref = 'testcontainers' }