1616import io .kafbat .ui .util .MetadataVersion ;
1717import io .kafbat .ui .util .annotation .KafkaClientInternalsDependant ;
1818import java .io .Closeable ;
19+ import java .io .PrintWriter ;
20+ import java .io .StringWriter ;
1921import java .time .Duration ;
2022import java .time .temporal .ChronoUnit ;
2123import java .util .ArrayList ;
8890import org .apache .kafka .common .quota .ClientQuotaAlteration ;
8991import org .apache .kafka .common .quota .ClientQuotaEntity ;
9092import org .apache .kafka .common .quota .ClientQuotaFilter ;
91- import org .apache .kafka .common .requests .DescribeLogDirsResponse ;
9293import org .apache .kafka .common .resource .ResourcePatternFilter ;
9394import reactor .core .publisher .Flux ;
9495import reactor .core .publisher .Mono ;
@@ -192,7 +193,7 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
192193
193194 public static Mono <ReactiveAdminClient > create (AdminClient adminClient ) {
194195 Mono <ConfigRelatedInfo > configRelatedInfoMono = ConfigRelatedInfo .extract (adminClient );
195- return configRelatedInfoMono .map (info -> new ReactiveAdminClient (adminClient , configRelatedInfoMono , info ));
196+ return configRelatedInfoMono .map (info -> new ReactiveAdminClient (adminClient , configRelatedInfoMono , info , null ));
196197 }
197198
198199
@@ -230,23 +231,43 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
230231 .publishOn (Schedulers .parallel ());
231232 }
232233
234+ private record ClosedContext (StackTraceElement [] stackTrace ) {
235+ @ Override
236+ public String toString () {
237+ // Convert stack trace to a string for logging
238+ StringWriter sw = new StringWriter ();
239+ PrintWriter pw = new PrintWriter (sw );
240+ for (StackTraceElement element : stackTrace ) {
241+ pw .println ("\t at " + element );
242+ }
243+ return sw .toString ();
244+ }
245+
246+ static ClosedContext capture () {
247+ return new ClosedContext (Thread .currentThread ().getStackTrace ());
248+ }
249+ }
250+
233251 //---------------------------------------------------------------------------------
234252
235253 @ Getter (AccessLevel .PACKAGE ) // visible for testing
236254 private final AdminClient client ;
237255 private final Mono <ConfigRelatedInfo > configRelatedInfoMono ;
238256
239257 private volatile ConfigRelatedInfo configRelatedInfo ;
258+ private volatile ClosedContext closedContext ;
240259
241260 public Set <SupportedFeature > getClusterFeatures () {
242261 return configRelatedInfo .features ();
243262 }
244263
245264 public Mono <Set <String >> listTopics (boolean listInternal ) {
265+ checkClosed ();
246266 return toMono (client .listTopics (new ListTopicsOptions ().listInternal (listInternal )).names ());
247267 }
248268
249269 public Mono <Void > deleteTopic (String topicName ) {
270+ checkClosed ();
250271 return toMono (client .deleteTopics (List .of (topicName )).all ());
251272 }
252273
@@ -287,6 +308,8 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
287308 }
288309
289310 private Mono <Map <String , List <ConfigEntry >>> getTopicsConfigImpl (Collection <String > topicNames , boolean includeDoc ) {
311+ checkClosed ();
312+
290313 List <ConfigResource > resources = topicNames .stream ()
291314 .map (topicName -> new ConfigResource (ConfigResource .Type .TOPIC , topicName ))
292315 .collect (toList ());
@@ -337,6 +360,8 @@ private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClie
337360 * Return per-broker configs or empty map if broker's configs retrieval not supported.
338361 */
339362 public Mono <Map <Integer , List <ConfigEntry >>> loadBrokersConfig (List <Integer > brokerIds ) {
363+ checkClosed ();
364+
340365 return loadBrokersConfig (client , brokerIds );
341366 }
342367
@@ -355,6 +380,8 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
355380 }
356381
357382 private Mono <Map <String , TopicDescription >> describeTopicsImpl (Collection <String > topics ) {
383+ checkClosed ();
384+
358385 return toMonoWithExceptionFilter (
359386 client .describeTopics (topics ).topicNameValues (),
360387 UnknownTopicOrPartitionException .class ,
@@ -411,6 +438,8 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
411438
412439 public Mono <Map <Integer , Map <String , LogDirDescription >>> describeLogDirs (
413440 Collection <Integer > brokerIds ) {
441+ checkClosed ();
442+
414443 return toMono (client .describeLogDirs (brokerIds ).allDescriptions ())
415444 .onErrorResume (UnsupportedVersionException .class , th -> Mono .just (Map .of ()))
416445 .onErrorResume (ClusterAuthorizationException .class , th -> Mono .just (Map .of ()))
@@ -421,6 +450,8 @@ public Mono<Map<Integer, Map<String, LogDirDescription>>> describeLogDirs(
421450 }
422451
423452 public Mono <ClusterDescription > describeCluster () {
453+ checkClosed ();
454+
424455 return describeClusterImpl (client , getClusterFeatures ());
425456 }
426457
@@ -444,6 +475,8 @@ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
444475 }
445476
446477 public Mono <Void > deleteConsumerGroups (Collection <String > groupIds ) {
478+ checkClosed ();
479+
447480 return toMono (client .deleteConsumerGroups (groupIds ).all ())
448481 .onErrorResume (GroupIdNotFoundException .class ,
449482 th -> Mono .error (new NotFoundException ("The group id does not exist" )))
@@ -452,6 +485,8 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
452485 }
453486
454487 public Mono <Void > deleteConsumerGroupOffsets (String groupId , String topicName ) {
488+ checkClosed ();
489+
455490 return listConsumerGroupOffsets (List .of (groupId ), null )
456491 .flatMap (table -> {
457492 // filter TopicPartitions by topicName
@@ -476,6 +511,8 @@ public Mono<Void> createTopic(String name,
476511 int numPartitions ,
477512 @ Nullable Integer replicationFactor ,
478513 Map <String , String > configs ) {
514+ checkClosed ();
515+
479516 var newTopic = new NewTopic (
480517 name ,
481518 Optional .of (numPartitions ),
@@ -486,10 +523,14 @@ public Mono<Void> createTopic(String name,
486523
487524 public Mono <Void > alterPartitionReassignments (
488525 Map <TopicPartition , Optional <NewPartitionReassignment >> reassignments ) {
526+ checkClosed ();
527+
489528 return toMono (client .alterPartitionReassignments (reassignments ).all ());
490529 }
491530
492531 public Mono <Void > createPartitions (Map <String , NewPartitions > newPartitionsMap ) {
532+ checkClosed ();
533+
493534 return toMono (client .createPartitions (newPartitionsMap ).all ());
494535 }
495536
@@ -511,10 +552,14 @@ public Mono<List<String>> listConsumerGroupNames() {
511552 }
512553
513554 public Mono <Collection <ConsumerGroupListing >> listConsumerGroups () {
555+ checkClosed ();
556+
514557 return toMono (client .listConsumerGroups ().all ());
515558 }
516559
517560 public Mono <Map <String , ConsumerGroupDescription >> describeConsumerGroups (Collection <String > groupIds ) {
561+ checkClosed ();
562+
518563 return partitionCalls (
519564 groupIds ,
520565 25 ,
@@ -529,6 +574,8 @@ public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collec
529574 public Mono <Table <String , TopicPartition , Long >> listConsumerGroupOffsets (List <String > consumerGroups ,
530575 // all partitions if null passed
531576 @ Nullable List <TopicPartition > partitions ) {
577+ checkClosed ();
578+
532579 Function <Collection <String >, Mono <Map <String , Map <TopicPartition , OffsetAndMetadata >>>> call =
533580 groups -> toMono (
534581 client .listConsumerGroupOffsets (
@@ -560,6 +607,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S
560607 }
561608
562609 public Mono <Void > alterConsumerGroupOffsets (String groupId , Map <TopicPartition , Long > offsets ) {
610+ checkClosed ();
611+
563612 return toMono (client .alterConsumerGroupOffsets (
564613 groupId ,
565614 offsets .entrySet ().stream ()
@@ -646,6 +695,8 @@ static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescr
646695 @ KafkaClientInternalsDependant
647696 @ VisibleForTesting
648697 Mono <Map <TopicPartition , Long >> listOffsetsUnsafe (Collection <TopicPartition > partitions , OffsetSpec offsetSpec ) {
698+ checkClosed ();
699+
649700 if (partitions .isEmpty ()) {
650701 return Mono .just (Map .of ());
651702 }
@@ -672,49 +723,60 @@ Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> par
672723 }
673724
674725 public Mono <Collection <AclBinding >> listAcls (ResourcePatternFilter filter ) {
726+ checkClosed ();
727+
675728 Preconditions .checkArgument (getClusterFeatures ().contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
676729 return toMono (client .describeAcls (new AclBindingFilter (filter , AccessControlEntryFilter .ANY )).values ());
677730 }
678731
679732 public Mono <Void > createAcls (Collection <AclBinding > aclBindings ) {
733+ checkClosed ();
734+
680735 Preconditions .checkArgument (getClusterFeatures ().contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
681736 return toMono (client .createAcls (aclBindings ).all ());
682737 }
683738
684739 public Mono <Void > deleteAcls (Collection <AclBinding > aclBindings ) {
740+ checkClosed ();
685741 Preconditions .checkArgument (getClusterFeatures ().contains (SupportedFeature .AUTHORIZED_SECURITY_ENABLED ));
686742 var filters = aclBindings .stream ().map (AclBinding ::toFilter ).collect (Collectors .toSet ());
687743 return toMono (client .deleteAcls (filters ).all ()).then ();
688744 }
689745
690746 public Mono <Void > updateBrokerConfigByName (Integer brokerId , String name , String value ) {
747+ checkClosed ();
691748 ConfigResource cr = new ConfigResource (ConfigResource .Type .BROKER , String .valueOf (brokerId ));
692749 AlterConfigOp op = new AlterConfigOp (new ConfigEntry (name , value ), AlterConfigOp .OpType .SET );
693750 return toMono (client .incrementalAlterConfigs (Map .of (cr , List .of (op ))).all ());
694751 }
695752
696753 public Mono <Void > deleteRecords (Map <TopicPartition , Long > offsets ) {
754+ checkClosed ();
697755 var records = offsets .entrySet ().stream ()
698756 .map (entry -> Map .entry (entry .getKey (), RecordsToDelete .beforeOffset (entry .getValue ())))
699757 .collect (toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
700758 return toMono (client .deleteRecords (records ).all ());
701759 }
702760
703761 public Mono <Void > alterReplicaLogDirs (Map <TopicPartitionReplica , String > replicaAssignment ) {
762+ checkClosed ();
704763 return toMono (client .alterReplicaLogDirs (replicaAssignment ).all ());
705764 }
706765
707766 public Mono <Map <ClientQuotaEntity , Map <String , Double >>> getClientQuotas (ClientQuotaFilter filter ) {
767+ checkClosed ();
708768 return toMono (client .describeClientQuotas (filter ).entities ());
709769 }
710770
711771 public Mono <Void > alterClientQuota (ClientQuotaAlteration alteration ) {
772+ checkClosed ();
712773 return toMono (client .alterClientQuotas (List .of (alteration )).all ());
713774 }
714775
715776
716777 // returns tp -> list of active producer's states (if any)
717778 public Mono <Map <TopicPartition , List <ProducerState >>> getActiveProducersState (String topic ) {
779+ checkClosed ();
718780 return describeTopic (topic )
719781 .map (td -> client .describeProducers (
720782 IntStream .range (0 , td .partitions ().size ())
@@ -731,6 +793,7 @@ public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(St
731793 private Mono <Void > incrementalAlterConfig (String topicName ,
732794 List <ConfigEntry > currentConfigs ,
733795 Map <String , String > newConfigs ) {
796+ checkClosed ();
734797 var configsToDelete = currentConfigs .stream ()
735798 .filter (e -> e .source () == ConfigEntry .ConfigSource .DYNAMIC_TOPIC_CONFIG ) //manually set configs only
736799 .filter (e -> !newConfigs .containsKey (e .name ()))
@@ -748,6 +811,7 @@ private Mono<Void> incrementalAlterConfig(String topicName,
748811
749812 @ SuppressWarnings ("deprecation" )
750813 private Mono <Void > alterConfig (String topicName , Map <String , String > configs ) {
814+ checkClosed ();
751815 List <ConfigEntry > configEntries = configs .entrySet ().stream ()
752816 .flatMap (cfg -> Stream .of (new ConfigEntry (cfg .getKey (), cfg .getValue ())))
753817 .collect (toList ());
@@ -800,8 +864,15 @@ private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
800864 };
801865 }
802866
867+ private void checkClosed () {
868+ if (this .closedContext != null ) {
869+ log .error ("AdminClient is already closed at:\n {}" , closedContext );
870+ }
871+ }
872+
803873 @ Override
804874 public void close () {
875+ this .closedContext = ClosedContext .capture ();
805876 client .close ();
806877 }
807878}
0 commit comments