@@ -118,8 +118,8 @@ public enum SupportedFeature {
118118 this .predicate = (admin , ver ) -> Mono .just (ver != null && ver >= fromVersion );
119119 }
120120
121- static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , String kafkaVersionStr ) {
122- @ Nullable Float kafkaVersion = KafkaVersion . parse ( kafkaVersionStr ).orElse (null );
121+ static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , Optional < String > kafkaVersionStr ) {
122+ @ Nullable Float kafkaVersion = kafkaVersionStr . flatMap ( KafkaVersion :: parse ).orElse (null );
123123 return Flux .fromArray (SupportedFeature .values ())
124124 .flatMap (f -> f .predicate .apply (ac , kafkaVersion ).map (enabled -> Tuples .of (f , enabled )))
125125 .filter (Tuple2 ::getT2 )
@@ -158,22 +158,24 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
158158 .flatMap (tuple -> {
159159 List <ConfigEntry > configs = tuple .getT1 ();
160160 FeatureMetadata featureMetadata = tuple .getT2 ();
161- String version = DEFAULT_UNKNOWN_VERSION ;
161+ Optional < String > version = Optional . empty () ;
162162 boolean topicDeletionEnabled = true ;
163163 for (ConfigEntry entry : configs ) {
164164 if (entry .name ().contains ("inter.broker.protocol.version" )) {
165- version = entry .value ();
165+ version = Optional . of ( entry .value () );
166166 }
167167 if (entry .name ().equals ("delete.topic.enable" )) {
168168 topicDeletionEnabled = Boolean .parseBoolean (entry .value ());
169169 }
170170 }
171- FinalizedVersionRange metadataVersion =
172- featureMetadata .finalizedFeatures ().get ("metadata.version" );
173- if (metadataVersion != null ) {
174- version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel (), version );
171+ if (version .isEmpty ()) {
172+ FinalizedVersionRange metadataVersion =
173+ featureMetadata .finalizedFeatures ().get ("metadata.version" );
174+ if (metadataVersion != null ) {
175+ version = MetadataVersion .findVersion (metadataVersion .maxVersionLevel ());
176+ }
175177 }
176- final String finalVersion = version ;
178+ final String finalVersion = version . orElse ( DEFAULT_UNKNOWN_VERSION ) ;
177179 final boolean finalTopicDeletionEnabled = topicDeletionEnabled ;
178180 return SupportedFeature .forVersion (ac , version )
179181 .map (features -> new ConfigRelatedInfo (finalVersion , features , finalTopicDeletionEnabled ));
0 commit comments