From ed0b77dddada9ac4632aacab04c80391bb831bbd Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 12 May 2025 13:24:50 +0200 Subject: [PATCH 1/3] Support version getter from metadata.version --- .../ui/service/ReactiveAdminClient.java | 16 ++++++- .../io/kafbat/ui/util/MetadataVersion.java | 48 +++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/util/MetadataVersion.java diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 6aea290c3..f79f51d43 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -13,6 +13,7 @@ import io.kafbat.ui.exception.NotFoundException; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.util.KafkaVersion; +import io.kafbat.ui.util.MetadataVersion; import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; import java.time.Duration; @@ -49,6 +50,8 @@ import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsOptions; +import org.apache.kafka.clients.admin.FeatureMetadata; +import org.apache.kafka.clients.admin.FinalizedVersionRange; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; @@ -96,6 +99,7 @@ @Slf4j @AllArgsConstructor public class ReactiveAdminClient implements Closeable { + private final static String DEFAULT_UNKNOWN_VERSION = "Unknown"; public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), @@ -150,8 +154,11 @@ private static Mono extract(AdminClient ac) { .orElse(desc.getNodes().iterator().next().id()); return loadBrokersConfig(ac, List.of(targetNodeId)) .map(map -> map.isEmpty() ? List.of() : map.get(targetNodeId)) - .flatMap(configs -> { - String version = "1.0-UNKNOWN"; + .zipWith(toMono(ac.describeFeatures().featureMetadata())) + .flatMap(tuple -> { + List configs = tuple.getT1(); + FeatureMetadata featureMetadata = tuple.getT2(); + String version = DEFAULT_UNKNOWN_VERSION; boolean topicDeletionEnabled = true; for (ConfigEntry entry : configs) { if (entry.name().contains("inter.broker.protocol.version")) { @@ -161,6 +168,11 @@ private static Mono extract(AdminClient ac) { topicDeletionEnabled = Boolean.parseBoolean(entry.value()); } } + FinalizedVersionRange metadataVersion = + featureMetadata.finalizedFeatures().get("metadata.version"); + if (metadataVersion != null) { + version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel(), version); + } final String finalVersion = version; final boolean finalTopicDeletionEnabled = topicDeletionEnabled; return SupportedFeature.forVersion(ac, version) diff --git a/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java b/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java new file mode 100644 index 000000000..f5b83a9e1 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java @@ -0,0 +1,48 @@ +package io.kafbat.ui.util; + +import java.util.Arrays; + +public enum MetadataVersion { + IBP_3_0_IV1(1, "3.0-IV1"), + IBP_3_1_IV0(2, "3.1-IV0"), + IBP_3_2_IV0(3, "3.2-IV0"), + IBP_3_3_IV0(4, "3.3-IV0"), + IBP_3_3_IV1(5, "3.3-IV1"), + IBP_3_3_IV2(6, "3.3-IV2"), + IBP_3_3_IV3(7, "3.3-IV3"), + IBP_3_4_IV0(8, "3.4-IV0"), + IBP_3_5_IV0(9, "3.5-IV0"), + IBP_3_5_IV1(10, "3.5-IV1"), + IBP_3_5_IV2(11, "3.5-IV2"), + IBP_3_6_IV0(12, "3.6-IV0"), + IBP_3_6_IV1(13, "3.6-IV1"), + IBP_3_6_IV2(14, "3.6-IV2"), + IBP_3_7_IV0(15, "3.7-IV0"), + IBP_3_7_IV1(16, "3.7-IV1"), + IBP_3_7_IV2(17, "3.7-IV2"), + IBP_3_7_IV3(18, "3.7-IV3"), + IBP_3_7_IV4(19, "3.7-IV4"), + IBP_3_8_IV0(20, "3.8-IV0"), + IBP_3_9_IV0(21, "3.9-IV0"), + IBP_4_0_IV0(22, "4.0-IV0"), + IBP_4_0_IV1(23, "4.0-IV1"), + IBP_4_0_IV2(24, "4.0-IV2"), + IBP_4_0_IV3(25, "4.0-IV3"), + IBP_4_1_IV0(26, "4.1-IV0"); + + private final int featureLevel; + private final String release; + + MetadataVersion(int featureLevel, String release) { + this.featureLevel = featureLevel; + this.release = release; + } + + public static String findVersion(int featureLevel, String defaultValue) { + return Arrays.stream(values()) + .filter(v -> v.featureLevel == featureLevel) + .findFirst().map(v -> v.release) + .orElse(defaultValue); + } + +} From c3640a4d6c4dfc61411a9e1fe498b9ea2f5cdae0 Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 12 May 2025 13:28:54 +0200 Subject: [PATCH 2/3] fixed checkstyle --- api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index f79f51d43..04d25fa91 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -99,7 +99,7 @@ @Slf4j @AllArgsConstructor public class ReactiveAdminClient implements Closeable { - private final static String DEFAULT_UNKNOWN_VERSION = "Unknown"; + private static final String DEFAULT_UNKNOWN_VERSION = "Unknown"; public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), From c0134dce42c91633429714a50620ba710ddf4fb4 Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 12 May 2025 14:08:10 +0200 Subject: [PATCH 3/3] small refactoring --- .../ui/service/ReactiveAdminClient.java | 20 ++++++++++--------- .../io/kafbat/ui/util/MetadataVersion.java | 6 +++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 04d25fa91..2503711c6 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -118,8 +118,8 @@ public enum SupportedFeature { this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); } - static Mono> forVersion(AdminClient ac, String kafkaVersionStr) { - @Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null); + static Mono> forVersion(AdminClient ac, Optional kafkaVersionStr) { + @Nullable Float kafkaVersion = kafkaVersionStr.flatMap(KafkaVersion::parse).orElse(null); return Flux.fromArray(SupportedFeature.values()) .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) .filter(Tuple2::getT2) @@ -158,22 +158,24 @@ private static Mono extract(AdminClient ac) { .flatMap(tuple -> { List configs = tuple.getT1(); FeatureMetadata featureMetadata = tuple.getT2(); - String version = DEFAULT_UNKNOWN_VERSION; + Optional version = Optional.empty(); boolean topicDeletionEnabled = true; for (ConfigEntry entry : configs) { if (entry.name().contains("inter.broker.protocol.version")) { - version = entry.value(); + version = Optional.of(entry.value()); } if (entry.name().equals("delete.topic.enable")) { topicDeletionEnabled = Boolean.parseBoolean(entry.value()); } } - FinalizedVersionRange metadataVersion = - featureMetadata.finalizedFeatures().get("metadata.version"); - if (metadataVersion != null) { - version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel(), version); + if (version.isEmpty()) { + FinalizedVersionRange metadataVersion = + featureMetadata.finalizedFeatures().get("metadata.version"); + if (metadataVersion != null) { + version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel()); + } } - final String finalVersion = version; + final String finalVersion = version.orElse(DEFAULT_UNKNOWN_VERSION); final boolean finalTopicDeletionEnabled = topicDeletionEnabled; return SupportedFeature.forVersion(ac, version) .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled)); diff --git a/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java b/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java index f5b83a9e1..c68813050 100644 --- a/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java +++ b/api/src/main/java/io/kafbat/ui/util/MetadataVersion.java @@ -1,6 +1,7 @@ package io.kafbat.ui.util; import java.util.Arrays; +import java.util.Optional; public enum MetadataVersion { IBP_3_0_IV1(1, "3.0-IV1"), @@ -38,11 +39,10 @@ public enum MetadataVersion { this.release = release; } - public static String findVersion(int featureLevel, String defaultValue) { + public static Optional findVersion(int featureLevel) { return Arrays.stream(values()) .filter(v -> v.featureLevel == featureLevel) - .findFirst().map(v -> v.release) - .orElse(defaultValue); + .findFirst().map(v -> v.release); } }