From 253b303dcea9a809470c6dbc3020093586e575fe Mon Sep 17 00:00:00 2001 From: German Osin Date: Sun, 27 Jul 2025 17:12:06 +0300 Subject: [PATCH 1/5] BE: Fixes #445 Added connect stats --- BE__Fixes_#445_be_connect_info.patch | 214 ++++++++++++++++++ ...ctInfo.java => InternalConnectorInfo.java} | 0 2 files changed, 214 insertions(+) create mode 100644 BE__Fixes_#445_be_connect_info.patch rename api/src/main/java/io/kafbat/ui/model/connect/{InternalConnectInfo.java => InternalConnectorInfo.java} (100%) diff --git a/BE__Fixes_#445_be_connect_info.patch b/BE__Fixes_#445_be_connect_info.patch new file mode 100644 index 000000000..b29259faf --- /dev/null +++ b/BE__Fixes_#445_be_connect_info.patch @@ -0,0 +1,214 @@ +Subject: [PATCH] BE: Fixes #445 be connect info +--- +Index: api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +--- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) ++++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e) +@@ -15,6 +15,10 @@ + import io.kafbat.ui.model.ConfigSourceDTO; + import io.kafbat.ui.model.ConfigSynonymDTO; + import io.kafbat.ui.model.ConnectDTO; ++import io.kafbat.ui.model.ConnectorDTO; ++import io.kafbat.ui.model.ConnectorStateDTO; ++import io.kafbat.ui.model.ConnectorStatusDTO; ++import io.kafbat.ui.model.ConnectorTaskStatusDTO; + import io.kafbat.ui.model.InternalBroker; + import io.kafbat.ui.model.InternalBrokerConfig; + import io.kafbat.ui.model.InternalClusterState; +@@ -29,16 +33,19 @@ + import io.kafbat.ui.model.Metrics; + import io.kafbat.ui.model.PartitionDTO; + import io.kafbat.ui.model.ReplicaDTO; ++import io.kafbat.ui.model.TaskDTO; + import io.kafbat.ui.model.TopicConfigDTO; + import io.kafbat.ui.model.TopicDTO; + import io.kafbat.ui.model.TopicDetailsDTO; + import io.kafbat.ui.model.TopicProducerStateDTO; ++import io.kafbat.ui.model.connect.InternalConnectorInfo; + import io.kafbat.ui.service.metrics.SummarizedMetrics; + import io.prometheus.metrics.model.snapshots.Label; + import io.prometheus.metrics.model.snapshots.MetricSnapshot; + import java.math.BigDecimal; + import java.util.List; + import java.util.Map; ++import java.util.Optional; + import java.util.stream.Stream; + import org.apache.kafka.clients.admin.ConfigEntry; + import org.apache.kafka.clients.admin.ProducerState; +@@ -118,7 +125,38 @@ + + ReplicaDTO toReplica(InternalReplica replica); + +- ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect); ++ default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List connectors) { ++ int connectorCount = connectors.size(); ++ int failedConnectors = 0; ++ int tasksCount = 0; ++ int failedTasksCount = 0; ++ ++ for (InternalConnectorInfo connector : connectors) { ++ Optional internalConnector = Optional.ofNullable(connector.getConnector()); ++ ++ failedConnectors += internalConnector ++ .map(ConnectorDTO::getStatus) ++ .map(ConnectorStatusDTO::getState) ++ .filter(ConnectorStateDTO.FAILED::equals) ++ .map(s -> 1).orElse(0); ++ ++ tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); ++ ++ for (TaskDTO task : connector.getTasks()) { ++ if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { ++ failedTasksCount += tasksCount; ++ } ++ } ++ } ++ ++ return new ConnectDTO() ++ .address(connect.getAddress()) ++ .name(connect.getName()) ++ .connectorsCount(connectorCount) ++ .failedConnectorsCount(failedConnectors) ++ .tasksCount(tasksCount) ++ .failedTasksCount(failedTasksCount); ++ } + + List toFeaturesEnum(List features); + +Index: api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +--- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) ++++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e) +@@ -11,7 +11,7 @@ + import io.kafbat.ui.model.FullConnectorInfoDTO; + import io.kafbat.ui.model.TaskDTO; + import io.kafbat.ui.model.TaskStatusDTO; +-import io.kafbat.ui.model.connect.InternalConnectInfo; ++import io.kafbat.ui.model.connect.InternalConnectorInfo; + import java.util.List; + import org.mapstruct.Mapper; + import org.mapstruct.Mapping; +@@ -38,7 +38,7 @@ + io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse + connectorPluginConfigValidationResponse); + +- default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) { ++ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { + ConnectorDTO connector = connectInfo.getConnector(); + List tasks = connectInfo.getTasks(); + int failedTasksCount = (int) tasks.stream() +Index: api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java +rename from api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java +rename to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java +--- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) ++++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java (revision abb46f1075df173e25157e5add12606ccbc7547e) +@@ -9,7 +9,7 @@ + + @Data + @Builder(toBuilder = true) +-public class InternalConnectInfo { ++public class InternalConnectorInfo { + private final ConnectorDTO connector; + private final Map config; + private final List tasks; +Index: api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +--- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) ++++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision abb46f1075df173e25157e5add12606ccbc7547e) +@@ -23,7 +23,8 @@ + import io.kafbat.ui.model.KafkaCluster; + import io.kafbat.ui.model.NewConnectorDTO; + import io.kafbat.ui.model.TaskDTO; +-import io.kafbat.ui.model.connect.InternalConnectInfo; ++import io.kafbat.ui.model.TaskStatusDTO; ++import io.kafbat.ui.model.connect.InternalConnectorInfo; + import io.kafbat.ui.util.ReactiveFailover; + import java.util.List; + import java.util.Map; +@@ -33,7 +34,6 @@ + import javax.annotation.Nullable; + import lombok.RequiredArgsConstructor; + import lombok.extern.slf4j.Slf4j; +-import org.apache.commons.lang3.StringUtils; + import org.springframework.stereotype.Service; + import org.springframework.web.reactive.function.client.WebClientResponseException; + import reactor.core.publisher.Flux; +@@ -48,11 +48,24 @@ + private final KafkaConfigSanitizer kafkaConfigSanitizer; + + public Flux getConnects(KafkaCluster cluster) { +- return Flux.fromIterable( +- Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) +- .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList()) +- .orElse(List.of()) +- ); ++ return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) ++ .map(connects -> Flux.fromIterable(connects).flatMap(connect -> ++ getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> ++ Mono.zip( ++ getConnector(cluster, connect.getName(), connectorName), ++ getConnectorTasks(cluster, connect.getName(), connectorName).collectList() ++ ).map(tuple -> ++ InternalConnectorInfo.builder() ++ .connector(tuple.getT1()) ++ .config(null) ++ .tasks(tuple.getT2()) ++ .topics(null) ++ .build() ++ ) ++ ).collectList().map(connectors -> ++ clusterMapper.toKafkaConnect(connect, connectors) ++ ) ++ )).orElse(Flux.fromIterable(List.of())); + } + + public Flux getAllConnectors(final KafkaCluster cluster, +@@ -67,7 +80,7 @@ + getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), + getConnectorTopics(cluster, connect.getName(), connectorName) + ).map(tuple -> +- InternalConnectInfo.builder() ++ InternalConnectorInfo.builder() + .connector(tuple.getT1()) + .config(tuple.getT2()) + .tasks(tuple.getT3()) +Index: contract/src/main/resources/swagger/kafbat-ui-api.yaml +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml +--- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) ++++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision abb46f1075df173e25157e5add12606ccbc7547e) +@@ -3494,6 +3494,14 @@ + type: string + address: + type: string ++ connectors_count: ++ type: integer ++ failed_connectors_count: ++ type: integer ++ tasks_count: ++ type: integer ++ failed_tasks_count: ++ type: integer + required: + - name + diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java similarity index 100% rename from api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java rename to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java From d3f2f3aa4e17bf46d83562ecae1ab9c9632f4e77 Mon Sep 17 00:00:00 2001 From: German Osin Date: Sun, 27 Jul 2025 17:13:50 +0300 Subject: [PATCH 2/5] Removed patch --- BE__Fixes_#445_be_connect_info.patch | 214 --------------------------- 1 file changed, 214 deletions(-) delete mode 100644 BE__Fixes_#445_be_connect_info.patch diff --git a/BE__Fixes_#445_be_connect_info.patch b/BE__Fixes_#445_be_connect_info.patch deleted file mode 100644 index b29259faf..000000000 --- a/BE__Fixes_#445_be_connect_info.patch +++ /dev/null @@ -1,214 +0,0 @@ -Subject: [PATCH] BE: Fixes #445 be connect info ---- -Index: api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java ---- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) -+++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e) -@@ -15,6 +15,10 @@ - import io.kafbat.ui.model.ConfigSourceDTO; - import io.kafbat.ui.model.ConfigSynonymDTO; - import io.kafbat.ui.model.ConnectDTO; -+import io.kafbat.ui.model.ConnectorDTO; -+import io.kafbat.ui.model.ConnectorStateDTO; -+import io.kafbat.ui.model.ConnectorStatusDTO; -+import io.kafbat.ui.model.ConnectorTaskStatusDTO; - import io.kafbat.ui.model.InternalBroker; - import io.kafbat.ui.model.InternalBrokerConfig; - import io.kafbat.ui.model.InternalClusterState; -@@ -29,16 +33,19 @@ - import io.kafbat.ui.model.Metrics; - import io.kafbat.ui.model.PartitionDTO; - import io.kafbat.ui.model.ReplicaDTO; -+import io.kafbat.ui.model.TaskDTO; - import io.kafbat.ui.model.TopicConfigDTO; - import io.kafbat.ui.model.TopicDTO; - import io.kafbat.ui.model.TopicDetailsDTO; - import io.kafbat.ui.model.TopicProducerStateDTO; -+import io.kafbat.ui.model.connect.InternalConnectorInfo; - import io.kafbat.ui.service.metrics.SummarizedMetrics; - import io.prometheus.metrics.model.snapshots.Label; - import io.prometheus.metrics.model.snapshots.MetricSnapshot; - import java.math.BigDecimal; - import java.util.List; - import java.util.Map; -+import java.util.Optional; - import java.util.stream.Stream; - import org.apache.kafka.clients.admin.ConfigEntry; - import org.apache.kafka.clients.admin.ProducerState; -@@ -118,7 +125,38 @@ - - ReplicaDTO toReplica(InternalReplica replica); - -- ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect); -+ default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List connectors) { -+ int connectorCount = connectors.size(); -+ int failedConnectors = 0; -+ int tasksCount = 0; -+ int failedTasksCount = 0; -+ -+ for (InternalConnectorInfo connector : connectors) { -+ Optional internalConnector = Optional.ofNullable(connector.getConnector()); -+ -+ failedConnectors += internalConnector -+ .map(ConnectorDTO::getStatus) -+ .map(ConnectorStatusDTO::getState) -+ .filter(ConnectorStateDTO.FAILED::equals) -+ .map(s -> 1).orElse(0); -+ -+ tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); -+ -+ for (TaskDTO task : connector.getTasks()) { -+ if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { -+ failedTasksCount += tasksCount; -+ } -+ } -+ } -+ -+ return new ConnectDTO() -+ .address(connect.getAddress()) -+ .name(connect.getName()) -+ .connectorsCount(connectorCount) -+ .failedConnectorsCount(failedConnectors) -+ .tasksCount(tasksCount) -+ .failedTasksCount(failedTasksCount); -+ } - - List toFeaturesEnum(List features); - -Index: api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java ---- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) -+++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java (revision abb46f1075df173e25157e5add12606ccbc7547e) -@@ -11,7 +11,7 @@ - import io.kafbat.ui.model.FullConnectorInfoDTO; - import io.kafbat.ui.model.TaskDTO; - import io.kafbat.ui.model.TaskStatusDTO; --import io.kafbat.ui.model.connect.InternalConnectInfo; -+import io.kafbat.ui.model.connect.InternalConnectorInfo; - import java.util.List; - import org.mapstruct.Mapper; - import org.mapstruct.Mapping; -@@ -38,7 +38,7 @@ - io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse - connectorPluginConfigValidationResponse); - -- default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) { -+ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { - ConnectorDTO connector = connectInfo.getConnector(); - List tasks = connectInfo.getTasks(); - int failedTasksCount = (int) tasks.stream() -Index: api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java -rename from api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java -rename to api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java ---- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectInfo.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) -+++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java (revision abb46f1075df173e25157e5add12606ccbc7547e) -@@ -9,7 +9,7 @@ - - @Data - @Builder(toBuilder = true) --public class InternalConnectInfo { -+public class InternalConnectorInfo { - private final ConnectorDTO connector; - private final Map config; - private final List tasks; -Index: api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java ---- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) -+++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java (revision abb46f1075df173e25157e5add12606ccbc7547e) -@@ -23,7 +23,8 @@ - import io.kafbat.ui.model.KafkaCluster; - import io.kafbat.ui.model.NewConnectorDTO; - import io.kafbat.ui.model.TaskDTO; --import io.kafbat.ui.model.connect.InternalConnectInfo; -+import io.kafbat.ui.model.TaskStatusDTO; -+import io.kafbat.ui.model.connect.InternalConnectorInfo; - import io.kafbat.ui.util.ReactiveFailover; - import java.util.List; - import java.util.Map; -@@ -33,7 +34,6 @@ - import javax.annotation.Nullable; - import lombok.RequiredArgsConstructor; - import lombok.extern.slf4j.Slf4j; --import org.apache.commons.lang3.StringUtils; - import org.springframework.stereotype.Service; - import org.springframework.web.reactive.function.client.WebClientResponseException; - import reactor.core.publisher.Flux; -@@ -48,11 +48,24 @@ - private final KafkaConfigSanitizer kafkaConfigSanitizer; - - public Flux getConnects(KafkaCluster cluster) { -- return Flux.fromIterable( -- Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) -- .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList()) -- .orElse(List.of()) -- ); -+ return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) -+ .map(connects -> Flux.fromIterable(connects).flatMap(connect -> -+ getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> -+ Mono.zip( -+ getConnector(cluster, connect.getName(), connectorName), -+ getConnectorTasks(cluster, connect.getName(), connectorName).collectList() -+ ).map(tuple -> -+ InternalConnectorInfo.builder() -+ .connector(tuple.getT1()) -+ .config(null) -+ .tasks(tuple.getT2()) -+ .topics(null) -+ .build() -+ ) -+ ).collectList().map(connectors -> -+ clusterMapper.toKafkaConnect(connect, connectors) -+ ) -+ )).orElse(Flux.fromIterable(List.of())); - } - - public Flux getAllConnectors(final KafkaCluster cluster, -@@ -67,7 +80,7 @@ - getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), - getConnectorTopics(cluster, connect.getName(), connectorName) - ).map(tuple -> -- InternalConnectInfo.builder() -+ InternalConnectorInfo.builder() - .connector(tuple.getT1()) - .config(tuple.getT2()) - .tasks(tuple.getT3()) -Index: contract/src/main/resources/swagger/kafbat-ui-api.yaml -IDEA additional info: -Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP -<+>UTF-8 -=================================================================== -diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml ---- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision 9ea69c7868652a04a7af949eff383ee09e8cc243) -+++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml (revision abb46f1075df173e25157e5add12606ccbc7547e) -@@ -3494,6 +3494,14 @@ - type: string - address: - type: string -+ connectors_count: -+ type: integer -+ failed_connectors_count: -+ type: integer -+ tasks_count: -+ type: integer -+ failed_tasks_count: -+ type: integer - required: - - name - From e398596ad64d366a5873f0932a33b574bfd7656e Mon Sep 17 00:00:00 2001 From: German Osin Date: Sun, 27 Jul 2025 17:14:35 +0300 Subject: [PATCH 3/5] BE: Fixes #445 Added connect stats --- .../io/kafbat/ui/mapper/ClusterMapper.java | 40 ++++++++++++++++++- .../kafbat/ui/mapper/KafkaConnectMapper.java | 4 +- .../model/connect/InternalConnectorInfo.java | 2 +- .../ui/service/KafkaConnectService.java | 27 +++++++++---- .../main/resources/swagger/kafbat-ui-api.yaml | 8 ++++ 5 files changed, 70 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java index f67b4aad3..41211cd3c 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java @@ -12,6 +12,10 @@ import io.kafbat.ui.model.ConfigSourceDTO; import io.kafbat.ui.model.ConfigSynonymDTO; import io.kafbat.ui.model.ConnectDTO; +import io.kafbat.ui.model.ConnectorDTO; +import io.kafbat.ui.model.ConnectorStateDTO; +import io.kafbat.ui.model.ConnectorStatusDTO; +import io.kafbat.ui.model.ConnectorTaskStatusDTO; import io.kafbat.ui.model.InternalBroker; import io.kafbat.ui.model.InternalBrokerConfig; import io.kafbat.ui.model.InternalBrokerDiskUsage; @@ -27,13 +31,16 @@ import io.kafbat.ui.model.Metrics; import io.kafbat.ui.model.PartitionDTO; import io.kafbat.ui.model.ReplicaDTO; +import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TopicConfigDTO; import io.kafbat.ui.model.TopicDTO; import io.kafbat.ui.model.TopicDetailsDTO; import io.kafbat.ui.model.TopicProducerStateDTO; +import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.common.acl.AccessControlEntry; @@ -107,7 +114,38 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) { ReplicaDTO toReplica(InternalReplica replica); - ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect); + default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List connectors) { + int connectorCount = connectors.size(); + int failedConnectors = 0; + int tasksCount = 0; + int failedTasksCount = 0; + + for (InternalConnectorInfo connector : connectors) { + Optional internalConnector = Optional.ofNullable(connector.getConnector()); + + failedConnectors += internalConnector + .map(ConnectorDTO::getStatus) + .map(ConnectorStatusDTO::getState) + .filter(ConnectorStateDTO.FAILED::equals) + .map(s -> 1).orElse(0); + + tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); + + for (TaskDTO task : connector.getTasks()) { + if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { + failedTasksCount += tasksCount; + } + } + } + + return new ConnectDTO() + .address(connect.getAddress()) + .name(connect.getName()) + .connectorsCount(connectorCount) + .failedConnectorsCount(failedConnectors) + .tasksCount(tasksCount) + .failedTasksCount(failedTasksCount); + } List toFeaturesEnum(List features); diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 2bef74b51..897479650 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -11,7 +11,7 @@ import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TaskStatusDTO; -import io.kafbat.ui.model.connect.InternalConnectInfo; +import io.kafbat.ui.model.connect.InternalConnectorInfo; import java.util.List; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -38,7 +38,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient( io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); - default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) { + default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { ConnectorDTO connector = connectInfo.getConnector(); List tasks = connectInfo.getTasks(); int failedTasksCount = (int) tasks.stream() diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java index ba8306cde..6c884316a 100644 --- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java +++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java @@ -9,7 +9,7 @@ @Data @Builder(toBuilder = true) -public class InternalConnectInfo { +public class InternalConnectorInfo { private final ConnectorDTO connector; private final Map config; private final List tasks; diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 31f552885..46da6c179 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -21,7 +21,7 @@ import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.TaskDTO; -import io.kafbat.ui.model.connect.InternalConnectInfo; +import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.util.ReactiveFailover; import java.util.List; import java.util.Map; @@ -46,11 +46,24 @@ public class KafkaConnectService { private final KafkaConfigSanitizer kafkaConfigSanitizer; public Flux getConnects(KafkaCluster cluster) { - return Flux.fromIterable( - Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) - .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList()) - .orElse(List.of()) - ); + return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) + .map(connects -> Flux.fromIterable(connects).flatMap(connect -> + getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> + Mono.zip( + getConnector(cluster, connect.getName(), connectorName), + getConnectorTasks(cluster, connect.getName(), connectorName).collectList() + ).map(tuple -> + InternalConnectorInfo.builder() + .connector(tuple.getT1()) + .config(null) + .tasks(tuple.getT2()) + .topics(null) + .build() + ) + ).collectList().map(connectors -> + clusterMapper.toKafkaConnect(connect, connectors) + ) + )).orElse(Flux.fromIterable(List.of())); } public Flux getAllConnectors(final KafkaCluster cluster, @@ -65,7 +78,7 @@ public Flux getAllConnectors(final KafkaCluster cluster, getConnectorTasks(cluster, connect.getName(), connectorName).collectList(), getConnectorTopics(cluster, connect.getName(), connectorName) ).map(tuple -> - InternalConnectInfo.builder() + InternalConnectorInfo.builder() .connector(tuple.getT1()) .config(tuple.getT2()) .tasks(tuple.getT3()) diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 77614253e..b2de5091c 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -3434,6 +3434,14 @@ components: type: string address: type: string + connectors_count: + type: integer + failed_connectors_count: + type: integer + tasks_count: + type: integer + failed_tasks_count: + type: integer required: - name From a27ff0299bcaba7cb863deb0f2743f84bfac0218 Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 28 Jul 2025 11:01:00 +0300 Subject: [PATCH 4/5] Added configs, stats and cache --- api/build.gradle | 2 + .../kafbat/ui/config/ClustersProperties.java | 11 +++ .../ui/controller/KafkaConnectController.java | 6 +- .../io/kafbat/ui/mapper/ClusterMapper.java | 42 ---------- .../kafbat/ui/mapper/KafkaConnectMapper.java | 48 +++++++++++ .../ui/service/KafkaConnectService.java | 81 ++++++++++++++----- .../integration/odd/ConnectorsExporter.java | 4 +- .../ui/util/DynamicConfigOperations.java | 15 +--- .../ui/util/YamlNullSkipRepresenter.java | 28 +++++++ .../odd/ConnectorsExporterTest.java | 2 +- .../main/resources/swagger/kafbat-ui-api.yaml | 9 +++ .../io/kafbat/ui/services/ApiService.java | 2 +- gradle/libs.versions.toml | 1 + 13 files changed, 167 insertions(+), 84 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java diff --git a/api/build.gradle b/api/build.gradle index 8dfeea404..7ac4b9744 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -25,6 +25,7 @@ dependencies { implementation libs.spring.starter.oauth2.client implementation libs.spring.security.oauth2.resource.server implementation libs.spring.boot.actuator + compileOnly libs.spring.boot.devtools implementation libs.spring.security.ldap @@ -48,6 +49,7 @@ dependencies { implementation libs.jackson.databind.nullable implementation libs.cel + implementation libs.caffeine antlr libs.antlr implementation libs.antlr.runtime diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 23be86369..fb5c839ee 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -5,6 +5,7 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -36,6 +37,8 @@ public class ClustersProperties { PollingProperties polling = new PollingProperties(); + CacheProperties cache = new CacheProperties(); + @Data public static class Cluster { @NotBlank(message = "field name for for cluster could not be blank") @@ -183,6 +186,14 @@ public enum LogLevel { } } + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class CacheProperties { + boolean enabled = true; + Duration connectCacheExpiry = Duration.ofMinutes(1); + } + @PostConstruct public void validateAndSetDefaults() { if (clusters != null) { diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 945d77f92..d7c0f4560 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -45,10 +45,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC @Override public Mono>> getConnects(String clusterName, + Boolean withStats, ServerWebExchange exchange) { - Flux availableConnects = kafkaConnectService.getConnects(getCluster(clusterName)) - .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName)); + Flux availableConnects = kafkaConnectService.getConnects( + getCluster(clusterName), withStats != null ? withStats : false + ).filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName)); return Mono.just(ResponseEntity.ok(availableConnects)); } diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java index 41211cd3c..53e624528 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java @@ -1,6 +1,5 @@ package io.kafbat.ui.mapper; -import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.BrokerConfigDTO; import io.kafbat.ui.model.BrokerDTO; import io.kafbat.ui.model.BrokerDiskUsageDTO; @@ -11,11 +10,6 @@ import io.kafbat.ui.model.ClusterStatsDTO; import io.kafbat.ui.model.ConfigSourceDTO; import io.kafbat.ui.model.ConfigSynonymDTO; -import io.kafbat.ui.model.ConnectDTO; -import io.kafbat.ui.model.ConnectorDTO; -import io.kafbat.ui.model.ConnectorStateDTO; -import io.kafbat.ui.model.ConnectorStatusDTO; -import io.kafbat.ui.model.ConnectorTaskStatusDTO; import io.kafbat.ui.model.InternalBroker; import io.kafbat.ui.model.InternalBrokerConfig; import io.kafbat.ui.model.InternalBrokerDiskUsage; @@ -31,16 +25,13 @@ import io.kafbat.ui.model.Metrics; import io.kafbat.ui.model.PartitionDTO; import io.kafbat.ui.model.ReplicaDTO; -import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TopicConfigDTO; import io.kafbat.ui.model.TopicDTO; import io.kafbat.ui.model.TopicDetailsDTO; import io.kafbat.ui.model.TopicProducerStateDTO; -import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.common.acl.AccessControlEntry; @@ -114,39 +105,6 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) { ReplicaDTO toReplica(InternalReplica replica); - default ConnectDTO toKafkaConnect(ClustersProperties.ConnectCluster connect, List connectors) { - int connectorCount = connectors.size(); - int failedConnectors = 0; - int tasksCount = 0; - int failedTasksCount = 0; - - for (InternalConnectorInfo connector : connectors) { - Optional internalConnector = Optional.ofNullable(connector.getConnector()); - - failedConnectors += internalConnector - .map(ConnectorDTO::getStatus) - .map(ConnectorStatusDTO::getState) - .filter(ConnectorStateDTO.FAILED::equals) - .map(s -> 1).orElse(0); - - tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); - - for (TaskDTO task : connector.getTasks()) { - if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { - failedTasksCount += tasksCount; - } - } - } - - return new ConnectDTO() - .address(connect.getAddress()) - .name(connect.getName()) - .connectorsCount(connectorCount) - .failedConnectorsCount(failedConnectors) - .tasksCount(tasksCount) - .failedTasksCount(failedTasksCount); - } - List toFeaturesEnum(List features); default List map(Map map) { diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 897479650..1a4073191 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -1,11 +1,14 @@ package io.kafbat.ui.mapper; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; import io.kafbat.ui.connect.model.NewConnector; +import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO; import io.kafbat.ui.model.ConnectorPluginDTO; +import io.kafbat.ui.model.ConnectorStateDTO; import io.kafbat.ui.model.ConnectorStatusDTO; import io.kafbat.ui.model.ConnectorTaskStatusDTO; import io.kafbat.ui.model.FullConnectorInfoDTO; @@ -13,6 +16,7 @@ import io.kafbat.ui.model.TaskStatusDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import java.util.List; +import java.util.Optional; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -38,6 +42,50 @@ ConnectorPluginConfigValidationResponseDTO fromClient( io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); + default ConnectDTO toKafkaConnect( + ClustersProperties.ConnectCluster connect, + List connectors, + boolean withStats) { + Integer connectorCount = null; + Integer failedConnectors = null; + Integer tasksCount = null; + Integer failedTasksCount = null; + + if (withStats) { + connectorCount = connectors.size(); + failedConnectors = 0; + tasksCount = 0; + failedTasksCount = 0; + + for (InternalConnectorInfo connector : connectors) { + Optional internalConnector = Optional.ofNullable(connector.getConnector()); + + failedConnectors += internalConnector + .map(ConnectorDTO::getStatus) + .map(ConnectorStatusDTO::getState) + .filter(ConnectorStateDTO.FAILED::equals) + .map(s -> 1).orElse(0); + + tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0); + + for (TaskDTO task : connector.getTasks()) { + if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) { + failedTasksCount += tasksCount; + } + } + } + + } + + return new ConnectDTO() + .address(connect.getAddress()) + .name(connect.getName()) + .connectorsCount(connectorCount) + .failedConnectorsCount(failedConnectors) + .tasksCount(tasksCount) + .failedTasksCount(failedTasksCount); + } + default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { ConnectorDTO connector = connectInfo.getConnector(); List tasks = connectInfo.getTasks(); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 46da6c179..4f96a212e 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -1,5 +1,9 @@ package io.kafbat.ui.service; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; @@ -23,13 +27,13 @@ import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.util.ReactiveFailover; +import jakarta.validation.Valid; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -39,36 +43,67 @@ @Service @Slf4j -@RequiredArgsConstructor public class KafkaConnectService { private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; + private final ClustersProperties clustersProperties; - public Flux getConnects(KafkaCluster cluster) { - return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()) - .map(connects -> Flux.fromIterable(connects).flatMap(connect -> - getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> - Mono.zip( - getConnector(cluster, connect.getName(), connectorName), - getConnectorTasks(cluster, connect.getName(), connectorName).collectList() - ).map(tuple -> - InternalConnectorInfo.builder() - .connector(tuple.getT1()) - .config(null) - .tasks(tuple.getT2()) - .topics(null) - .build() - ) - ).collectList().map(connectors -> - clusterMapper.toKafkaConnect(connect, connectors) - ) - )).orElse(Flux.fromIterable(List.of())); + private final AsyncCache> cachedConnectors; + + public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper, + KafkaConfigSanitizer kafkaConfigSanitizer, + ClustersProperties clustersProperties) { + this.clusterMapper = clusterMapper; + this.kafkaConnectMapper = kafkaConnectMapper; + this.kafkaConfigSanitizer = kafkaConfigSanitizer; + this.clustersProperties = clustersProperties; + this.cachedConnectors = Caffeine.newBuilder() + .expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry()) + .buildAsync(); + } + + public Flux getConnects(KafkaCluster cluster, boolean withStats) { + Optional> connectClusters = + Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()); + if (withStats) { + return connectClusters.map(connects -> Flux.fromIterable(connects).flatMap(connect -> ( + clustersProperties.getCache().isEnabled() ? Mono.fromFuture( + cachedConnectors.get(new ConnectCacheKey(cluster, connect), (t, e) -> + getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()) + ) : getConnectConnectors(cluster, connect).collectList()).map(connectors -> + kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats) + ) + ) + ).orElse(Flux.fromIterable(List.of())); + } else { + return Flux.fromIterable(connectClusters.map(connects -> + connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList() + ).orElse(List.of())); + } + } + + private Flux getConnectConnectors( + KafkaCluster cluster, + ClustersProperties.ConnectCluster connect) { + return getConnectorNamesWithErrorsSuppress(cluster, connect.getName()).flatMap(connectorName -> + Mono.zip( + getConnector(cluster, connect.getName(), connectorName), + getConnectorTasks(cluster, connect.getName(), connectorName).collectList() + ).map(tuple -> + InternalConnectorInfo.builder() + .connector(tuple.getT1()) + .config(null) + .tasks(tuple.getT2()) + .topics(null) + .build() + ) + ); } public Flux getAllConnectors(final KafkaCluster cluster, @Nullable final String search) { - return getConnects(cluster) + return getConnects(cluster, false) .flatMap(connect -> getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) .flatMap(connectorName -> @@ -302,4 +337,6 @@ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName .formatted(connectorName, connectName)); }); } + + record ConnectCacheKey(KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {} } diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java index 2e7c7ca8b..a58c4e881 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorsExporter.java @@ -24,7 +24,7 @@ class ConnectorsExporter { private final KafkaConnectService kafkaConnectService; Flux export(KafkaCluster cluster) { - return kafkaConnectService.getConnects(cluster) + return kafkaConnectService.getConnects(cluster, false) .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName()) .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName)) .flatMap(connectorDTO -> @@ -41,7 +41,7 @@ Flux export(KafkaCluster cluster) { } Flux getConnectDataSources(KafkaCluster cluster) { - return kafkaConnectService.getConnects(cluster) + return kafkaConnectService.getConnects(cluster, false) .map(ConnectorsExporter::toDataSource); } diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 0686de2c4..df0ac5426 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -187,20 +187,7 @@ private void writeYamlToFile(String yaml, Path path) { } private String serializeToYaml(PropertiesStructure props) { - //representer, that skips fields with null values - Representer representer = new Representer(new DumperOptions()) { - @Override - protected NodeTuple representJavaBeanProperty(Object javaBean, - Property property, - Object propertyValue, - Tag customTag) { - if (propertyValue == null) { - return null; // if value of property is null, ignore it. - } else { - return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag); - } - } - }; + Representer representer = new YamlNullSkipRepresenter(new DumperOptions()); var propertyUtils = new PropertyUtils(); propertyUtils.setBeanAccess(BeanAccess.FIELD); representer.setPropertyUtils(propertyUtils); diff --git a/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java b/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java new file mode 100644 index 000000000..c1879a6fc --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/YamlNullSkipRepresenter.java @@ -0,0 +1,28 @@ +package io.kafbat.ui.util; + +import java.time.Duration; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.introspector.Property; +import org.yaml.snakeyaml.nodes.NodeTuple; +import org.yaml.snakeyaml.nodes.Tag; +import org.yaml.snakeyaml.representer.Representer; + +// representer, that skips fields with null values +public class YamlNullSkipRepresenter extends Representer { + public YamlNullSkipRepresenter(DumperOptions options) { + super(options); + this.representers.put(Duration.class, data -> this.representScalar(Tag.STR, data.toString())); + } + + @Override + protected NodeTuple representJavaBeanProperty(Object javaBean, + Property property, + Object propertyValue, + Tag customTag) { + if (propertyValue == null) { + return null; // if value of property is null, ignore it. + } else { + return super.representJavaBeanProperty(javaBean, property, propertyValue, customTag); + } + } +} diff --git a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java index fb80338e7..2e95f353a 100644 --- a/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/integration/odd/ConnectorsExporterTest.java @@ -58,7 +58,7 @@ void exportsConnectorsAsDataTransformers() { ) ); - when(kafkaConnectService.getConnects(CLUSTER)) + when(kafkaConnectService.getConnects(CLUSTER, false)) .thenReturn(Flux.just(connect)); when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName())) diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index b2de5091c..b8a7ae81b 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1401,6 +1401,11 @@ paths: required: true schema: type: string + - name: withStats + in: query + required: false + schema: + type: boolean responses: 200: description: OK @@ -3436,12 +3441,16 @@ components: type: string connectors_count: type: integer + nullable: true failed_connectors_count: type: integer + nullable: true tasks_count: type: integer + nullable: true failed_tasks_count: type: integer + nullable: true required: - name diff --git a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java index 2f480b8ca..231f79cd7 100644 --- a/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java +++ b/e2e-tests/src/main/java/io/kafbat/ui/services/ApiService.java @@ -177,7 +177,7 @@ public ApiService createConnector(Connector connector) { @Step public String getFirstConnectName(String clusterName) { - return Objects.requireNonNull(connectorApi().getConnects(clusterName).blockFirst()).getName(); + return Objects.requireNonNull(connectorApi().getConnects(clusterName, false).blockFirst()).getName(); } @SneakyThrows diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 10ab35a76..dfddc8eb0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -98,6 +98,7 @@ micrometer-registry-prometheus = { module = 'io.micrometer:micrometer-registry-p antlr = { module = 'org.antlr:antlr4', version.ref = 'antlr' } antlr-runtime = { module = 'org.antlr:antlr4-runtime', version.ref = 'antlr' } cel = { module = 'dev.cel:cel', version.ref = 'cel' } +caffeine = { module = 'com.github.ben-manes.caffeine:caffeine', version = '3.2.2'} testcontainers = { module = 'org.testcontainers:testcontainers', version.ref = 'testcontainers' } testcontainers-kafka = { module = 'org.testcontainers:kafka', version.ref = 'testcontainers' } From 5e4c55fc14bf7f54c180affd077711a3b8c6bb0d Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 28 Jul 2025 14:59:55 +0300 Subject: [PATCH 5/5] Simplified connect method --- .../ui/service/KafkaConnectService.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 4f96a212e..797e463ce 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -67,12 +67,11 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { Optional> connectClusters = Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect()); if (withStats) { - return connectClusters.map(connects -> Flux.fromIterable(connects).flatMap(connect -> ( - clustersProperties.getCache().isEnabled() ? Mono.fromFuture( - cachedConnectors.get(new ConnectCacheKey(cluster, connect), (t, e) -> - getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture()) - ) : getConnectConnectors(cluster, connect).collectList()).map(connectors -> - kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats) + return connectClusters.map(connects -> + Flux.fromIterable(connects).flatMap(connect -> ( + getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map( + connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats) + ) ) ) ).orElse(Flux.fromIterable(List.of())); @@ -83,6 +82,18 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { } } + private Mono> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) { + if (clustersProperties.getCache().isEnabled()) { + return Mono.fromFuture( + cachedConnectors.get(key, (t, e) -> + getConnectConnectors(t.cluster(), t.connect()).collectList().toFuture() + ) + ); + } else { + return getConnectConnectors(key.cluster(), key.connect()).collectList(); + } + } + private Flux getConnectConnectors( KafkaCluster cluster, ClustersProperties.ConnectCluster connect) {