1818# Example use of AdminClient operations.
1919
2020from confluent_kafka import (KafkaException , ConsumerGroupTopicPartitions ,
21- TopicPartition , ConsumerGroupState )
21+ TopicPartition , ConsumerGroupState , TopicCollection )
2222from confluent_kafka .admin import (AdminClient , NewTopic , NewPartitions , ConfigResource ,
2323 ConfigEntry , ConfigSource , AclBinding ,
2424 AclBindingFilter , ResourceType , ResourcePatternType ,
@@ -492,8 +492,9 @@ def example_describe_consumer_groups(a, args):
492492 """
493493 Describe Consumer Groups
494494 """
495-
496- futureMap = a .describe_consumer_groups (args , request_timeout = 10 )
495+ include_auth_ops = bool (int (args [0 ]))
496+ args = args [1 :]
497+ futureMap = a .describe_consumer_groups (args , include_authorized_operations = include_auth_ops , request_timeout = 10 )
497498
498499 for group_id , future in futureMap .items ():
499500 try :
@@ -502,7 +503,8 @@ def example_describe_consumer_groups(a, args):
502503 print (" Is Simple : {}" .format (g .is_simple_consumer_group ))
503504 print (" State : {}" .format (g .state ))
504505 print (" Partition Assignor : {}" .format (g .partition_assignor ))
505- print (" Coordinator : ({}) {}:{}" .format (g .coordinator .id , g .coordinator .host , g .coordinator .port ))
506+ print (
507+ f" Coordinator : { g .coordinator } " )
506508 print (" Members: " )
507509 for member in g .members :
508510 print (" Id : {}" .format (member .member_id ))
@@ -513,12 +515,93 @@ def example_describe_consumer_groups(a, args):
513515 print (" Assignments :" )
514516 for toppar in member .assignment .topic_partitions :
515517 print (" {} [{}]" .format (toppar .topic , toppar .partition ))
518+ if (include_auth_ops ):
519+ print (" Authorized operations: " )
520+ op_string = ""
521+ for acl_op in g .authorized_operations :
522+ op_string += acl_op .name + " "
523+ print (" {}" .format (op_string ))
516524 except KafkaException as e :
517525 print ("Error while describing group id '{}': {}" .format (group_id , e ))
518526 except Exception :
519527 raise
520528
521529
530+ def example_describe_topics (a , args ):
531+ """
532+ Describe Topics
533+ """
534+ include_auth_ops = bool (int (args [0 ]))
535+ args = args [1 :]
536+ topics = TopicCollection (topic_names = args )
537+ futureMap = a .describe_topics (topics , request_timeout = 10 , include_authorized_operations = include_auth_ops )
538+
539+ for topic_name , future in futureMap .items ():
540+ try :
541+ t = future .result ()
542+ print ("Topic name : {}" .format (t .name ))
543+ if (t .is_internal ):
544+ print ("Topic is Internal" )
545+
546+ if (include_auth_ops ):
547+ print ("Authorized operations : " )
548+ op_string = ""
549+ for acl_op in t .authorized_operations :
550+ op_string += acl_op .name + " "
551+ print (" {}" .format (op_string ))
552+
553+ print ("Partition Information" )
554+ for partition in t .partitions :
555+ print (" Id : {}" .format (partition .id ))
556+ leader = partition .leader
557+ print (f" Leader : { leader } " )
558+ print (" Replicas : {}" .format (len (partition .replicas )))
559+ for replica in partition .replicas :
560+ print (f" Replica : { replica } " )
561+ print (" In-Sync Replicas : {}" .format (len (partition .isr )))
562+ for isr in partition .isr :
563+ print (f" In-Sync Replica : { isr } " )
564+ print ("" )
565+ print ("" )
566+
567+ except KafkaException as e :
568+ print ("Error while describing topic '{}': {}" .format (topic_name , e ))
569+ except Exception :
570+ raise
571+
572+
573+ def example_describe_cluster (a , args ):
574+ """
575+ Describe Cluster
576+ """
577+ include_auth_ops = bool (int (args [0 ]))
578+ args = args [1 :]
579+ future = a .describe_cluster (request_timeout = 10 , include_authorized_operations = include_auth_ops )
580+ try :
581+ c = future .result ()
582+ print ("Cluster_id : {}" .format (c .cluster_id ))
583+
584+ if (c .controller ):
585+ print (f"Controller: { c .controller } " )
586+ else :
587+ print ("No Controller Information Available" )
588+
589+ print ("Nodes :" )
590+ for node in c .nodes :
591+ print (f" Node: { node } " )
592+
593+ if (include_auth_ops ):
594+ print ("Authorized operations: " )
595+ op_string = ""
596+ for acl_op in c .authorized_operations :
597+ op_string += acl_op .name + " "
598+ print (" {}" .format (op_string ))
599+ except KafkaException as e :
600+ print ("Error while describing cluster: {}" .format (e ))
601+ except Exception :
602+ raise
603+
604+
522605def example_delete_consumer_groups (a , args ):
523606 """
524607 Delete Consumer Groups
@@ -704,7 +787,9 @@ def example_alter_user_scram_credentials(a, args):
704787 '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
705788 sys .stderr .write (' list [<all|topics|brokers|groups>]\n ' )
706789 sys .stderr .write (' list_consumer_groups [<state1> <state2> ..]\n ' )
707- sys .stderr .write (' describe_consumer_groups <group1> <group2> ..\n ' )
790+ sys .stderr .write (' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n ' )
791+ sys .stderr .write (' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n ' )
792+ sys .stderr .write (' describe_cluster <include_authorized_operations>\n ' )
708793 sys .stderr .write (' delete_consumer_groups <group1> <group2> ..\n ' )
709794 sys .stderr .write (' list_consumer_group_offsets <group> [<topic1> <partition1> <topic2> <partition2> ..]\n ' )
710795 sys .stderr .write (
@@ -737,6 +822,8 @@ def example_alter_user_scram_credentials(a, args):
737822 'list' : example_list ,
738823 'list_consumer_groups' : example_list_consumer_groups ,
739824 'describe_consumer_groups' : example_describe_consumer_groups ,
825+ 'describe_topics' : example_describe_topics ,
826+ 'describe_cluster' : example_describe_cluster ,
740827 'delete_consumer_groups' : example_delete_consumer_groups ,
741828 'list_consumer_group_offsets' : example_list_consumer_group_offsets ,
742829 'alter_consumer_group_offsets' : example_alter_consumer_group_offsets ,
0 commit comments