1919# Example Admin clients.
2020#
2121
22- from confluent_kafka .admin import AdminClient , NewTopic , NewPartitions , ConfigResource , ConfigSource
22+ from confluent_kafka .admin import (AdminClient , NewTopic , NewPartitions , ConfigResource , ConfigSource ,
23+ AclBinding , AclBindingFilter , ResourceType , ResourcePatternType ,
24+ AclOperation , AclPermissionType )
2325from confluent_kafka import KafkaException
2426import sys
2527import threading
2830logging .basicConfig ()
2931
3032
33+ def parse_nullable_string (s ):
34+ if s == "None" :
35+ return None
36+ else :
37+ return s
38+
39+
3140def example_create_topics (a , topics ):
3241 """ Create topics """
3342
@@ -117,6 +126,133 @@ def example_describe_configs(a, args):
117126 raise
118127
119128
129+ def example_create_acls (a , args ):
130+ """ create acls """
131+
132+ acl_bindings = [
133+ AclBinding (
134+ ResourceType [restype ],
135+ parse_nullable_string (resname ),
136+ ResourcePatternType [resource_pattern_type ],
137+ parse_nullable_string (principal ),
138+ parse_nullable_string (host ),
139+ AclOperation [operation ],
140+ AclPermissionType [permission_type ]
141+ )
142+ for restype , resname , resource_pattern_type ,
143+ principal , host , operation , permission_type
144+ in zip (
145+ args [0 ::7 ],
146+ args [1 ::7 ],
147+ args [2 ::7 ],
148+ args [3 ::7 ],
149+ args [4 ::7 ],
150+ args [5 ::7 ],
151+ args [6 ::7 ],
152+ )
153+ ]
154+
155+ fs = a .create_acls (acl_bindings , request_timeout = 10 )
156+
157+ # Wait for operation to finish.
158+ for res , f in fs .items ():
159+ try :
160+ result = f .result ()
161+ if result is None :
162+ print ("Created {}" .format (res ))
163+
164+ except KafkaException as e :
165+ print ("Failed to create ACL {}: {}" .format (res , e ))
166+ except Exception :
167+ raise
168+
169+
170+ def example_describe_acls (a , args ):
171+ """ describe acls """
172+
173+ acl_binding_filters = [
174+ AclBindingFilter (
175+ ResourceType [restype ],
176+ parse_nullable_string (resname ),
177+ ResourcePatternType [resource_pattern_type ],
178+ parse_nullable_string (principal ),
179+ parse_nullable_string (host ),
180+ AclOperation [operation ],
181+ AclPermissionType [permission_type ]
182+ )
183+ for restype , resname , resource_pattern_type ,
184+ principal , host , operation , permission_type
185+ in zip (
186+ args [0 ::7 ],
187+ args [1 ::7 ],
188+ args [2 ::7 ],
189+ args [3 ::7 ],
190+ args [4 ::7 ],
191+ args [5 ::7 ],
192+ args [6 ::7 ],
193+ )
194+ ]
195+
196+ fs = [
197+ a .describe_acls (acl_binding_filter , request_timeout = 10 )
198+ for acl_binding_filter in acl_binding_filters
199+ ]
200+ # Wait for operations to finish.
201+ for acl_binding_filter , f in zip (acl_binding_filters , fs ):
202+ try :
203+ print ("Acls matching filter: {}" .format (acl_binding_filter ))
204+ acl_bindings = f .result ()
205+ for acl_binding in acl_bindings :
206+ print (acl_binding )
207+
208+ except KafkaException as e :
209+ print ("Failed to describe {}: {}" .format (acl_binding_filter , e ))
210+ except Exception :
211+ raise
212+
213+
214+ def example_delete_acls (a , args ):
215+ """ delete acls """
216+
217+ acl_binding_filters = [
218+ AclBindingFilter (
219+ ResourceType [restype ],
220+ parse_nullable_string (resname ),
221+ ResourcePatternType [resource_pattern_type ],
222+ parse_nullable_string (principal ),
223+ parse_nullable_string (host ),
224+ AclOperation [operation ],
225+ AclPermissionType [permission_type ]
226+ )
227+ for restype , resname , resource_pattern_type ,
228+ principal , host , operation , permission_type
229+ in zip (
230+ args [0 ::7 ],
231+ args [1 ::7 ],
232+ args [2 ::7 ],
233+ args [3 ::7 ],
234+ args [4 ::7 ],
235+ args [5 ::7 ],
236+ args [6 ::7 ],
237+ )
238+ ]
239+
240+ fs = a .delete_acls (acl_binding_filters , request_timeout = 10 )
241+
242+ # Wait for operation to finish.
243+ for res , f in fs .items ():
244+ try :
245+ acl_bindings = f .result ()
246+ print ("Deleted acls matching filter: {}" .format (res ))
247+ for acl_binding in acl_bindings :
248+ print (" " , acl_binding )
249+
250+ except KafkaException as e :
251+ print ("Failed to delete {}: {}" .format (res , e ))
252+ except Exception :
253+ raise
254+
255+
120256def example_alter_configs (a , args ):
121257 """ Alter configs atomically, replacing non-specified
122258 configuration properties with their default values.
@@ -300,6 +436,12 @@ def example_list(a, args):
300436 '<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n ' )
301437 sys .stderr .write (' delta_alter_configs <resource_type1> <resource_name1> ' +
302438 '<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n ' )
439+ sys .stderr .write (' create_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
440+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
441+ sys .stderr .write (' describe_acls <resource_type1 <resource_name1> <resource_patter_type1> ' +
442+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
443+ sys .stderr .write (' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
444+ '<principal1> <host1> <operation1> <permission_type1> ..\n ' )
303445 sys .stderr .write (' list [<all|topics|brokers|groups>]\n ' )
304446 sys .exit (1 )
305447
@@ -316,6 +458,9 @@ def example_list(a, args):
316458 'describe_configs' : example_describe_configs ,
317459 'alter_configs' : example_alter_configs ,
318460 'delta_alter_configs' : example_delta_alter_configs ,
461+ 'create_acls' : example_create_acls ,
462+ 'describe_acls' : example_describe_acls ,
463+ 'delete_acls' : example_delete_acls ,
319464 'list' : example_list }
320465
321466 if operation not in opsmap :
0 commit comments