@@ -25,7 +25,8 @@ public class FeatureService {
2525
2626 private final AdminClientService adminClientService ;
2727
28- public Mono <List <ClusterFeature >> getAvailableFeatures (KafkaCluster cluster , @ Nullable Node controller ) {
28+ public Mono <List <ClusterFeature >> getAvailableFeatures (KafkaCluster cluster ,
29+ ReactiveAdminClient .ClusterDescription clusterDescription ) {
2930 List <Mono <ClusterFeature >> features = new ArrayList <>();
3031
3132 if (Optional .ofNullable (cluster .getConnectsClients ())
@@ -42,17 +43,15 @@ public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster, @Nu
4243 features .add (Mono .just (ClusterFeature .SCHEMA_REGISTRY ));
4344 }
4445
45- if (controller != null ) {
46- features .add (
47- isTopicDeletionEnabled (cluster , controller )
48- .flatMap (r -> Boolean .TRUE .equals (r ) ? Mono .just (ClusterFeature .TOPIC_DELETION ) : Mono .empty ())
49- );
50- }
46+ features .add (topicDeletionEnabled (cluster , clusterDescription .getController ()));
5147
5248 return Flux .fromIterable (features ).flatMap (m -> m ).collectList ();
5349 }
5450
55- private Mono <Boolean > isTopicDeletionEnabled (KafkaCluster cluster , Node controller ) {
51+ private Mono <ClusterFeature > topicDeletionEnabled (KafkaCluster cluster , @ Nullable Node controller ) {
52+ if (controller == null ) {
53+ return Mono .just (ClusterFeature .TOPIC_DELETION ); // assuming it is enabled by default
54+ }
5655 return adminClientService .get (cluster )
5756 .flatMap (ac -> ac .loadBrokersConfig (List .of (controller .id ())))
5857 .map (config ->
@@ -61,6 +60,9 @@ private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster, Node controll
6160 .filter (e -> e .name ().equals (DELETE_TOPIC_ENABLED_SERVER_PROPERTY ))
6261 .map (e -> Boolean .parseBoolean (e .value ()))
6362 .findFirst ()
64- .orElse (true ));
63+ .orElse (true ))
64+ .flatMap (enabled -> enabled
65+ ? Mono .just (ClusterFeature .TOPIC_DELETION )
66+ : Mono .empty ());
6567 }
6668}
0 commit comments