3232import java .util .stream .Stream ;
3333import javax .annotation .Nullable ;
3434import lombok .AccessLevel ;
35+ import lombok .AllArgsConstructor ;
36+ import lombok .Builder ;
3537import lombok .Getter ;
36- import lombok .RequiredArgsConstructor ;
3738import lombok .Value ;
3839import lombok .extern .slf4j .Slf4j ;
3940import org .apache .kafka .clients .admin .AdminClient ;
7576import org .apache .kafka .common .errors .UnknownTopicOrPartitionException ;
7677import org .apache .kafka .common .errors .UnsupportedVersionException ;
7778import org .apache .kafka .common .requests .DescribeLogDirsResponse ;
78- import org .apache .kafka .common .resource .ResourcePattern ;
7979import org .apache .kafka .common .resource .ResourcePatternFilter ;
8080import reactor .core .publisher .Flux ;
8181import reactor .core .publisher .Mono ;
8585
8686
8787@ Slf4j
88- @ RequiredArgsConstructor
88+ @ AllArgsConstructor
8989public class ReactiveAdminClient implements Closeable {
9090
9191 public enum SupportedFeature {
@@ -104,7 +104,8 @@ public enum SupportedFeature {
104104 this .predicate = (admin , ver ) -> Mono .just (ver != null && ver >= fromVersion );
105105 }
106106
107- static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , @ Nullable Float kafkaVersion ) {
107+ static Mono <Set <SupportedFeature >> forVersion (AdminClient ac , String kafkaVersionStr ) {
108+ @ Nullable Float kafkaVersion = KafkaVersion .parse (kafkaVersionStr ).orElse (null );
108109 return Flux .fromArray (SupportedFeature .values ())
109110 .flatMap (f -> f .predicate .apply (ac , kafkaVersion ).map (enabled -> Tuples .of (f , enabled )))
110111 .filter (Tuple2 ::getT2 )
@@ -123,19 +124,46 @@ public static class ClusterDescription {
123124 Set <AclOperation > authorizedOperations ;
124125 }
125126
126- public static Mono <ReactiveAdminClient > create (AdminClient adminClient ) {
127- return getClusterVersion (adminClient )
128- .flatMap (ver ->
129- getSupportedUpdateFeaturesForVersion (adminClient , ver )
130- .map (features ->
131- new ReactiveAdminClient (adminClient , ver , features )));
127+ @ Builder
128+ private record ConfigRelatedInfo (String version ,
129+ Set <SupportedFeature > features ,
130+ boolean topicDeletionIsAllowed ) {
131+
132+ private static Mono <ConfigRelatedInfo > extract (AdminClient ac , int controllerId ) {
133+ return loadBrokersConfig (ac , List .of (controllerId ))
134+ .map (map -> map .isEmpty () ? List .<ConfigEntry >of () : map .get (controllerId ))
135+ .flatMap (configs -> {
136+ String version = "1.0-UNKNOWN" ;
137+ boolean topicDeletionEnabled = true ;
138+ for (ConfigEntry entry : configs ) {
139+ if (entry .name ().contains ("inter.broker.protocol.version" )) {
140+ version = entry .value ();
141+ }
142+ if (entry .name ().equals ("delete.topic.enable" )) {
143+ topicDeletionEnabled = Boolean .parseBoolean (entry .value ());
144+ }
145+ }
146+ var builder = ConfigRelatedInfo .builder ()
147+ .version (version )
148+ .topicDeletionIsAllowed (topicDeletionEnabled );
149+ return SupportedFeature .forVersion (ac , version )
150+ .map (features -> builder .features (features ).build ());
151+ });
152+ }
132153 }
133154
134- private static Mono <Set <SupportedFeature >> getSupportedUpdateFeaturesForVersion (AdminClient ac , String versionStr ) {
135- @ Nullable Float kafkaVersion = KafkaVersion .parse (versionStr ).orElse (null );
136- return SupportedFeature .forVersion (ac , kafkaVersion );
155+ public static Mono <ReactiveAdminClient > create (AdminClient adminClient ) {
156+ return describeClusterImpl (adminClient , Set .of ())
157+ // choosing node from which we will get configs (starting with controller)
158+ .flatMap (descr -> descr .controller != null
159+ ? Mono .just (descr .controller )
160+ : Mono .justOrEmpty (descr .nodes .stream ().findFirst ())
161+ )
162+ .flatMap (node -> ConfigRelatedInfo .extract (adminClient , node .id ()))
163+ .map (info -> new ReactiveAdminClient (adminClient , info ));
137164 }
138165
166+
139167 private static Mono <Boolean > isAuthorizedSecurityEnabled (AdminClient ac , @ Nullable Float kafkaVersion ) {
140168 return toMono (ac .describeAcls (AclBindingFilter .ANY ).values ())
141169 .thenReturn (true )
@@ -174,11 +202,10 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
174202
175203 @ Getter (AccessLevel .PACKAGE ) // visible for testing
176204 private final AdminClient client ;
177- private final String version ;
178- private final Set <SupportedFeature > features ;
205+ private volatile ConfigRelatedInfo configRelatedInfo ;
179206
180207 public Set <SupportedFeature > getClusterFeatures () {
181- return features ;
208+ return configRelatedInfo . features () ;
182209 }
183210
184211 public Mono <Set <String >> listTopics (boolean listInternal ) {
@@ -190,7 +217,20 @@ public Mono<Void> deleteTopic(String topicName) {
190217 }
191218
192219 public String getVersion () {
193- return version ;
220+ return configRelatedInfo .version ();
221+ }
222+
223+ public boolean isTopicDeletionEnabled () {
224+ return configRelatedInfo .topicDeletionIsAllowed ();
225+ }
226+
227+ public Mono <Void > updateInternalStats (@ Nullable Node controller ) {
228+ if (controller == null ) {
229+ return Mono .empty ();
230+ }
231+ return ConfigRelatedInfo .extract (client , controller .id ())
232+ .doOnNext (info -> this .configRelatedInfo = info )
233+ .then ();
194234 }
195235
196236 public Mono <Map <String , List <ConfigEntry >>> getTopicsConfig () {
@@ -200,7 +240,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
200240 //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
201241 //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
202242 public Mono <Map <String , List <ConfigEntry >>> getTopicsConfig (Collection <String > topicNames , boolean includeDoc ) {
203- var includeDocFixed = features .contains (SupportedFeature .CONFIG_DOCUMENTATION_RETRIEVAL ) && includeDoc ;
243+ var includeDocFixed = includeDoc && getClusterFeatures () .contains (SupportedFeature .CONFIG_DOCUMENTATION_RETRIEVAL );
204244 // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
205245 return partitionCalls (
206246 topicNames ,
@@ -349,7 +389,7 @@ public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> descr
349389 }
350390
351391 public Mono <ClusterDescription > describeCluster () {
352- return describeClusterImpl (client , features );
392+ return describeClusterImpl (client , getClusterFeatures () );
353393 }
354394
355395 private static Mono <ClusterDescription > describeClusterImpl (AdminClient client , Set <SupportedFeature > features ) {
@@ -371,23 +411,6 @@ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
371411 );
372412 }
373413
374- private static Mono <String > getClusterVersion (AdminClient client ) {
375- return describeClusterImpl (client , Set .of ())
376- // choosing node from which we will get configs (starting with controller)
377- .flatMap (descr -> descr .controller != null
378- ? Mono .just (descr .controller )
379- : Mono .justOrEmpty (descr .nodes .stream ().findFirst ())
380- )
381- .flatMap (node -> loadBrokersConfig (client , List .of (node .id ())))
382- .flatMap (configs -> configs .values ().stream ()
383- .flatMap (Collection ::stream )
384- .filter (entry -> entry .name ().contains ("inter.broker.protocol.version" ))
385- .findFirst ()
386- .map (configEntry -> Mono .just (configEntry .value ()))
387- .orElse (Mono .empty ()))
388- .switchIfEmpty (Mono .just ("1.0-UNKNOWN" ));
389- }
390-
391414 public Mono <Void > deleteConsumerGroups (Collection <String > groupIds ) {
392415 return toMono (client .deleteConsumerGroups (groupIds ).all ())
393416 .onErrorResume (GroupIdNotFoundException .class ,
@@ -421,7 +444,7 @@ public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap)
421444 // NOTE: places whole current topic config with new one. Entries that were present in old config,
422445 // but missed in new will be set to default
423446 public Mono <Void > updateTopicConfig (String topicName , Map <String , String > configs ) {
424- if (features .contains (SupportedFeature .INCREMENTAL_ALTER_CONFIGS )) {
447+ if (getClusterFeatures () .contains (SupportedFeature .INCREMENTAL_ALTER_CONFIGS )) {
425448 return getTopicsConfigImpl (List .of (topicName ), false )
426449 .map (conf -> conf .getOrDefault (topicName , List .of ()))
427450 .flatMap (currentConfigs -> incrementalAlterConfig (topicName , currentConfigs , configs ));
@@ -596,17 +619,17 @@ Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> par
596619 }
597620
598621 public Mono <Collection <AclBinding >> listAcls (ResourcePatternFilter filter ) {
599- Preconditions .checkArgument (features .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
622+ Preconditions .checkArgument (getClusterFeatures () .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
600623 return toMono (client .describeAcls (new AclBindingFilter (filter , AccessControlEntryFilter .ANY )).values ());
601624 }
602625
603626 public Mono <Void > createAcls (Collection <AclBinding > aclBindings ) {
604- Preconditions .checkArgument (features .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
627+ Preconditions .checkArgument (getClusterFeatures () .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
605628 return toMono (client .createAcls (aclBindings ).all ());
606629 }
607630
608631 public Mono <Void > deleteAcls (Collection <AclBinding > aclBindings ) {
609- Preconditions .checkArgument (features .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
632+ Preconditions .checkArgument (getClusterFeatures () .contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
610633 var filters = aclBindings .stream ().map (AclBinding ::toFilter ).collect (Collectors .toSet ());
611634 return toMono (client .deleteAcls (filters ).all ()).then ();
612635 }
0 commit comments