1313import  io .kafbat .ui .exception .NotFoundException ;
1414import  io .kafbat .ui .exception .ValidationException ;
1515import  io .kafbat .ui .util .KafkaVersion ;
16+ import  io .kafbat .ui .util .MetadataVersion ;
1617import  io .kafbat .ui .util .annotation .KafkaClientInternalsDependant ;
1718import  java .io .Closeable ;
1819import  java .time .Duration ;
4950import  org .apache .kafka .clients .admin .DescribeClusterOptions ;
5051import  org .apache .kafka .clients .admin .DescribeClusterResult ;
5152import  org .apache .kafka .clients .admin .DescribeConfigsOptions ;
53+ import  org .apache .kafka .clients .admin .FeatureMetadata ;
54+ import  org .apache .kafka .clients .admin .FinalizedVersionRange ;
5255import  org .apache .kafka .clients .admin .ListConsumerGroupOffsetsSpec ;
5356import  org .apache .kafka .clients .admin .ListOffsetsResult ;
5457import  org .apache .kafka .clients .admin .ListTopicsOptions ;
9699@ Slf4j 
97100@ AllArgsConstructor 
98101public  class  ReactiveAdminClient  implements  Closeable  {
102+   private  static  final  String  DEFAULT_UNKNOWN_VERSION  = "Unknown" ;
99103
100104  public  enum  SupportedFeature  {
101105    INCREMENTAL_ALTER_CONFIGS (2.3f ),
@@ -114,8 +118,8 @@ public enum SupportedFeature {
114118      this .predicate  = (admin , ver ) -> Mono .just (ver  != null  && ver  >= fromVersion );
115119    }
116120
117-     static  Mono <Set <SupportedFeature >> forVersion (AdminClient  ac , String  kafkaVersionStr ) {
118-       @ Nullable  Float  kafkaVersion  = KafkaVersion . parse ( kafkaVersionStr ).orElse (null );
121+     static  Mono <Set <SupportedFeature >> forVersion (AdminClient  ac , Optional < String >  kafkaVersionStr ) {
122+       @ Nullable  Float  kafkaVersion  = kafkaVersionStr . flatMap ( KafkaVersion :: parse ).orElse (null );
119123      return  Flux .fromArray (SupportedFeature .values ())
120124          .flatMap (f  -> f .predicate .apply (ac , kafkaVersion ).map (enabled  -> Tuples .of (f , enabled )))
121125          .filter (Tuple2 ::getT2 )
@@ -150,18 +154,28 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
150154                .orElse (desc .getNodes ().iterator ().next ().id ());
151155            return  loadBrokersConfig (ac , List .of (targetNodeId ))
152156                .map (map  -> map .isEmpty () ? List .<ConfigEntry >of () : map .get (targetNodeId ))
153-                 .flatMap (configs  -> {
154-                   String  version  = "1.0-UNKNOWN" ;
157+                 .zipWith (toMono (ac .describeFeatures ().featureMetadata ()))
158+                 .flatMap (tuple  -> {
159+                   List <ConfigEntry > configs  = tuple .getT1 ();
160+                   FeatureMetadata  featureMetadata  = tuple .getT2 ();
161+                   Optional <String > version  = Optional .empty ();
155162                  boolean  topicDeletionEnabled  = true ;
156163                  for  (ConfigEntry  entry  : configs ) {
157164                    if  (entry .name ().contains ("inter.broker.protocol.version" )) {
158-                       version  = entry .value ();
165+                       version  = Optional . of ( entry .value () );
159166                    }
160167                    if  (entry .name ().equals ("delete.topic.enable" )) {
161168                      topicDeletionEnabled  = Boolean .parseBoolean (entry .value ());
162169                    }
163170                  }
164-                   final  String  finalVersion  = version ;
171+                   if  (version .isEmpty ()) {
172+                     FinalizedVersionRange  metadataVersion  =
173+                         featureMetadata .finalizedFeatures ().get ("metadata.version" );
174+                     if  (metadataVersion  != null ) {
175+                       version  = MetadataVersion .findVersion (metadataVersion .maxVersionLevel ());
176+                     }
177+                   }
178+                   final  String  finalVersion  = version .orElse (DEFAULT_UNKNOWN_VERSION );
165179                  final  boolean  finalTopicDeletionEnabled  = topicDeletionEnabled ;
166180                  return  SupportedFeature .forVersion (ac , version )
167181                      .map (features  -> new  ConfigRelatedInfo (finalVersion , features , finalTopicDeletionEnabled ));
0 commit comments