Skip to content

Commit fb56f26

Browse files
mensfeldvgnanasekaranVenkatesh Gnanasekaran
authored
Added Create and Delete ACL Feature To Admin Functions (#244) (#347)
* Added Create and Delete ACL Feature To Admin Functions (#244) * Added support for createAcls * use named variables and binding changes * Added support for delete acls * handled deleteAcl matching acls response array * removed special handling for matching_acls failure * Renamed variables in acl handler and report classes as the success and failure responses are stored in them * Added DescribeAcl feature and fixed code review comments * fixed describeAcl handle and report comments * fixed callback result parsing for delete and describe acl * fixed describe acl and delete acl response array processing * Added unit tests for create acl handle and report * Added unit tests for describe acl handle and report * Added unit tests for delete acl handle and report * handled acl pointers created for describe and delete acl unit tests * Structured the acl unit test code * Added non existing resource test for create acls * Added resource literal any * fixed describe_acl to support null resource name principal and host * fixed segment fault error in fetching multiple acls * fixed deletion of multiple acls matching the request filters --------- Co-authored-by: Venkatesh Gnanasekaran <[email protected]> Co-authored-by: Maciej Mensfeld <[email protected]> * align admin api * enable acls --------- Co-authored-by: vgnanasekaran <[email protected]> Co-authored-by: Venkatesh Gnanasekaran <[email protected]>
1 parent a67e30d commit fb56f26

19 files changed

+1220
-0
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,5 @@ services:
2323
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
2424
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
2525
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
26+
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
27+
KAFKA_AUTHORIZER_CLASS_NAME: org.apache.kafka.metadata.authorizer.StandardAuthorizer

lib/rdkafka.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
require "rdkafka/admin/create_topic_report"
1414
require "rdkafka/admin/delete_topic_handle"
1515
require "rdkafka/admin/delete_topic_report"
16+
require "rdkafka/admin/create_acl_handle"
17+
require "rdkafka/admin/create_acl_report"
18+
require "rdkafka/admin/delete_acl_handle"
19+
require "rdkafka/admin/delete_acl_report"
20+
require "rdkafka/admin/describe_acl_handle"
21+
require "rdkafka/admin/describe_acl_report"
22+
require "rdkafka/admin/acl_binding_result"
1623
require "rdkafka/bindings"
1724
require "rdkafka/callbacks"
1825
require "rdkafka/config"

lib/rdkafka/admin.rb

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,316 @@ def delete_topic(topic_name)
163163
delete_topic_handle
164164
end
165165

166+
# Create acl
167+
# @param resource_type - values of type rd_kafka_ResourceType_t
168+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
169+
# valid values are:
170+
# RD_KAFKA_RESOURCE_TOPIC = 2
171+
# RD_KAFKA_RESOURCE_GROUP = 3
172+
# RD_KAFKA_RESOURCE_BROKER = 4
173+
# @param resource_pattern_type - values of type rd_kafka_ResourcePatternType_t
174+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
175+
# valid values are:
176+
# RD_KAFKA_RESOURCE_PATTERN_MATCH = 2
177+
# RD_KAFKA_RESOURCE_PATTERN_LITERAL = 3
178+
# RD_KAFKA_RESOURCE_PATTERN_PREFIXED = 4
179+
# @param operation - values of type rd_kafka_AclOperation_t
180+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
181+
# valid values are:
182+
# RD_KAFKA_ACL_OPERATION_ALL = 2
183+
# RD_KAFKA_ACL_OPERATION_READ = 3
184+
# RD_KAFKA_ACL_OPERATION_WRITE = 4
185+
# RD_KAFKA_ACL_OPERATION_CREATE = 5
186+
# RD_KAFKA_ACL_OPERATION_DELETE = 6
187+
# RD_KAFKA_ACL_OPERATION_ALTER = 7
188+
# RD_KAFKA_ACL_OPERATION_DESCRIBE = 8
189+
# RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION = 9
190+
# RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS = 10
191+
# RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS = 11
192+
# RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE = 12
193+
# @param permission_type - values of type rd_kafka_AclPermissionType_t
194+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
195+
# valid values are:
196+
# RD_KAFKA_ACL_PERMISSION_TYPE_DENY = 2
197+
# RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW = 3
198+
# @raise [RdkafkaError]
199+
#
200+
# @return [CreateAclHandle] Create acl handle that can be used to wait for the result of creating the acl
201+
def create_acl(resource_type:, resource_name:, resource_pattern_type:, principal:, host:, operation:, permission_type:)
202+
closed_admin_check(__method__)
203+
204+
# Create a rd_kafka_AclBinding_t representing the new acl
205+
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
206+
new_acl_ptr = Rdkafka::Bindings.rd_kafka_AclBinding_new(
207+
resource_type,
208+
FFI::MemoryPointer.from_string(resource_name),
209+
resource_pattern_type,
210+
FFI::MemoryPointer.from_string(principal),
211+
FFI::MemoryPointer.from_string(host),
212+
operation,
213+
permission_type,
214+
error_buffer,
215+
256
216+
)
217+
if new_acl_ptr.null?
218+
raise Rdkafka::Config::ConfigError.new(error_buffer.read_string)
219+
end
220+
221+
# Note that rd_kafka_CreateAcls can create more than one acl at a time
222+
pointer_array = [new_acl_ptr]
223+
acls_array_ptr = FFI::MemoryPointer.new(:pointer)
224+
acls_array_ptr.write_array_of_pointer(pointer_array)
225+
226+
# Get a pointer to the queue that our request will be enqueued on
227+
queue_ptr = @native_kafka.with_inner do |inner|
228+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
229+
end
230+
231+
if queue_ptr.null?
232+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(new_acl_ptr)
233+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
234+
end
235+
236+
# Create and register the handle that we will return to the caller
237+
create_acl_handle = CreateAclHandle.new
238+
create_acl_handle[:pending] = true
239+
create_acl_handle[:response] = -1
240+
CreateAclHandle.register(create_acl_handle)
241+
242+
admin_options_ptr = @native_kafka.with_inner do |inner|
243+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEACLS)
244+
end
245+
246+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, create_acl_handle.to_ptr)
247+
248+
begin
249+
@native_kafka.with_inner do |inner|
250+
Rdkafka::Bindings.rd_kafka_CreateAcls(
251+
inner,
252+
acls_array_ptr,
253+
1,
254+
admin_options_ptr,
255+
queue_ptr
256+
)
257+
end
258+
rescue Exception
259+
CreateAclHandle.remove(create_acl_handle.to_ptr.address)
260+
raise
261+
ensure
262+
Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
263+
Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
264+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(new_acl_ptr)
265+
end
266+
267+
create_acl_handle
268+
end
269+
270+
# Delete acl
271+
#
272+
# @param resource_type - values of type rd_kafka_ResourceType_t
273+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
274+
# valid values are:
275+
# RD_KAFKA_RESOURCE_TOPIC = 2
276+
# RD_KAFKA_RESOURCE_GROUP = 3
277+
# RD_KAFKA_RESOURCE_BROKER = 4
278+
# @param resource_pattern_type - values of type rd_kafka_ResourcePatternType_t
279+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
280+
# valid values are:
281+
# RD_KAFKA_RESOURCE_PATTERN_MATCH = 2
282+
# RD_KAFKA_RESOURCE_PATTERN_LITERAL = 3
283+
# RD_KAFKA_RESOURCE_PATTERN_PREFIXED = 4
284+
# @param operation - values of type rd_kafka_AclOperation_t
285+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
286+
# valid values are:
287+
# RD_KAFKA_ACL_OPERATION_ALL = 2
288+
# RD_KAFKA_ACL_OPERATION_READ = 3
289+
# RD_KAFKA_ACL_OPERATION_WRITE = 4
290+
# RD_KAFKA_ACL_OPERATION_CREATE = 5
291+
# RD_KAFKA_ACL_OPERATION_DELETE = 6
292+
# RD_KAFKA_ACL_OPERATION_ALTER = 7
293+
# RD_KAFKA_ACL_OPERATION_DESCRIBE = 8
294+
# RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION = 9
295+
# RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS = 10
296+
# RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS = 11
297+
# RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE = 12
298+
# @param permission_type - values of type rd_kafka_AclPermissionType_t
299+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
300+
# valid values are:
301+
# RD_KAFKA_ACL_PERMISSION_TYPE_DENY = 2
302+
# RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW = 3
303+
# @raise [RdkafkaError]
304+
#
305+
# @return [DeleteAclHandle] Delete acl handle that can be used to wait for the result of deleting the acl
306+
def delete_acl(resource_type:, resource_name:, resource_pattern_type:, principal:, host:, operation:, permission_type:)
307+
closed_admin_check(__method__)
308+
309+
# Create a rd_kafka_AclBinding_t representing the acl to be deleted
310+
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
311+
312+
delete_acl_ptr = Rdkafka::Bindings.rd_kafka_AclBindingFilter_new(
313+
resource_type,
314+
resource_name ? FFI::MemoryPointer.from_string(resource_name) : nil,
315+
resource_pattern_type,
316+
principal ? FFI::MemoryPointer.from_string(principal) : nil,
317+
host ? FFI::MemoryPointer.from_string(host) : nil,
318+
operation,
319+
permission_type,
320+
error_buffer,
321+
256
322+
)
323+
324+
if delete_acl_ptr.null?
325+
raise Rdkafka::Config::ConfigError.new(error_buffer.read_string)
326+
end
327+
328+
# Note that rd_kafka_DeleteAcls can delete more than one acl at a time
329+
pointer_array = [delete_acl_ptr]
330+
acls_array_ptr = FFI::MemoryPointer.new(:pointer)
331+
acls_array_ptr.write_array_of_pointer(pointer_array)
332+
333+
# Get a pointer to the queue that our request will be enqueued on
334+
queue_ptr = @native_kafka.with_inner do |inner|
335+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
336+
end
337+
338+
if queue_ptr.null?
339+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(new_acl_ptr)
340+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
341+
end
342+
343+
# Create and register the handle that we will return to the caller
344+
delete_acl_handle = DeleteAclHandle.new
345+
delete_acl_handle[:pending] = true
346+
delete_acl_handle[:response] = -1
347+
DeleteAclHandle.register(delete_acl_handle)
348+
349+
admin_options_ptr = @native_kafka.with_inner do |inner|
350+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DELETEACLS)
351+
end
352+
353+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, delete_acl_handle.to_ptr)
354+
355+
begin
356+
@native_kafka.with_inner do |inner|
357+
Rdkafka::Bindings.rd_kafka_DeleteAcls(
358+
inner,
359+
acls_array_ptr,
360+
1,
361+
admin_options_ptr,
362+
queue_ptr
363+
)
364+
end
365+
rescue Exception
366+
DeleteAclHandle.remove(delete_acl_handle.to_ptr.address)
367+
raise
368+
ensure
369+
Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
370+
Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
371+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(delete_acl_ptr)
372+
end
373+
374+
delete_acl_handle
375+
end
376+
377+
# Describe acls
378+
#
379+
# @param resource_type - values of type rd_kafka_ResourceType_t
380+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
381+
# valid values are:
382+
# RD_KAFKA_RESOURCE_TOPIC = 2
383+
# RD_KAFKA_RESOURCE_GROUP = 3
384+
# RD_KAFKA_RESOURCE_BROKER = 4
385+
# @param resource_pattern_type - values of type rd_kafka_ResourcePatternType_t
386+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
387+
# valid values are:
388+
# RD_KAFKA_RESOURCE_PATTERN_MATCH = 2
389+
# RD_KAFKA_RESOURCE_PATTERN_LITERAL = 3
390+
# RD_KAFKA_RESOURCE_PATTERN_PREFIXED = 4
391+
# @param operation - values of type rd_kafka_AclOperation_t
392+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8403
393+
# valid values are:
394+
# RD_KAFKA_ACL_OPERATION_ALL = 2
395+
# RD_KAFKA_ACL_OPERATION_READ = 3
396+
# RD_KAFKA_ACL_OPERATION_WRITE = 4
397+
# RD_KAFKA_ACL_OPERATION_CREATE = 5
398+
# RD_KAFKA_ACL_OPERATION_DELETE = 6
399+
# RD_KAFKA_ACL_OPERATION_ALTER = 7
400+
# RD_KAFKA_ACL_OPERATION_DESCRIBE = 8
401+
# RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION = 9
402+
# RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS = 10
403+
# RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS = 11
404+
# RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE = 12
405+
# @param permission_type - values of type rd_kafka_AclPermissionType_t
406+
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L8435
407+
# valid values are:
408+
# RD_KAFKA_ACL_PERMISSION_TYPE_DENY = 2
409+
# RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW = 3
410+
# @raise [RdkafkaError]
411+
#
412+
# @return [DescribeAclHandle] Describe acl handle that can be used to wait for the result of fetching acls
413+
def describe_acl(resource_type:, resource_name:, resource_pattern_type:, principal:, host:, operation:, permission_type:)
414+
closed_admin_check(__method__)
415+
416+
# Create a rd_kafka_AclBinding_t with the filters to fetch existing acls
417+
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
418+
describe_acl_ptr = Rdkafka::Bindings.rd_kafka_AclBindingFilter_new(
419+
resource_type,
420+
resource_name ? FFI::MemoryPointer.from_string(resource_name) : nil,
421+
resource_pattern_type,
422+
principal ? FFI::MemoryPointer.from_string(principal) : nil,
423+
host ? FFI::MemoryPointer.from_string(host) : nil,
424+
operation,
425+
permission_type,
426+
error_buffer,
427+
256
428+
)
429+
if describe_acl_ptr.null?
430+
raise Rdkafka::Config::ConfigError.new(error_buffer.read_string)
431+
end
432+
433+
# Get a pointer to the queue that our request will be enqueued on
434+
queue_ptr = @native_kafka.with_inner do |inner|
435+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
436+
end
437+
438+
if queue_ptr.null?
439+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(new_acl_ptr)
440+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
441+
end
442+
443+
# Create and register the handle that we will return to the caller
444+
describe_acl_handle = DescribeAclHandle.new
445+
describe_acl_handle[:pending] = true
446+
describe_acl_handle[:response] = -1
447+
DescribeAclHandle.register(describe_acl_handle)
448+
449+
admin_options_ptr = @native_kafka.with_inner do |inner|
450+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DESCRIBEACLS)
451+
end
452+
453+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, describe_acl_handle.to_ptr)
454+
455+
begin
456+
@native_kafka.with_inner do |inner|
457+
Rdkafka::Bindings.rd_kafka_DescribeAcls(
458+
inner,
459+
describe_acl_ptr,
460+
admin_options_ptr,
461+
queue_ptr
462+
)
463+
end
464+
rescue Exception
465+
DescribeAclHandle.remove(describe_acl_handle.to_ptr.address)
466+
raise
467+
ensure
468+
Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
469+
Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
470+
Rdkafka::Bindings.rd_kafka_AclBinding_destroy(describe_acl_ptr)
471+
end
472+
473+
describe_acl_handle
474+
end
475+
166476
private
167477
def closed_admin_check(method)
168478
raise Rdkafka::ClosedAdminError.new(method) if closed?
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
6+
# Extracts attributes of rd_kafka_AclBinding_t
7+
#
8+
class AclBindingResult
9+
attr_reader :result_error, :error_string, :matching_acl_resource_type, :matching_acl_resource_name, :matching_acl_pattern_type, :matching_acl_principal, :matching_acl_host, :matching_acl_operation, :matching_acl_permission_type
10+
11+
def initialize(matching_acl)
12+
rd_kafka_error_pointer = Rdkafka::Bindings.rd_kafka_AclBinding_error(matching_acl)
13+
@result_error = Rdkafka::Bindings.rd_kafka_error_code(rd_kafka_error_pointer)
14+
error_string = Rdkafka::Bindings.rd_kafka_error_string(rd_kafka_error_pointer)
15+
if error_string != FFI::Pointer::NULL
16+
@error_string = error_string.read_string
17+
end
18+
@matching_acl_resource_type = Rdkafka::Bindings.rd_kafka_AclBinding_restype(matching_acl)
19+
matching_acl_resource_name = Rdkafka::Bindings.rd_kafka_AclBinding_name(matching_acl)
20+
if matching_acl_resource_name != FFI::Pointer::NULL
21+
@matching_acl_resource_name = matching_acl_resource_name.read_string
22+
end
23+
@matching_acl_pattern_type = Rdkafka::Bindings.rd_kafka_AclBinding_resource_pattern_type(matching_acl)
24+
matching_acl_principal = Rdkafka::Bindings.rd_kafka_AclBinding_principal(matching_acl)
25+
if matching_acl_principal != FFI::Pointer::NULL
26+
@matching_acl_principal = matching_acl_principal.read_string
27+
end
28+
matching_acl_host = Rdkafka::Bindings.rd_kafka_AclBinding_host(matching_acl)
29+
if matching_acl_host != FFI::Pointer::NULL
30+
@matching_acl_host = matching_acl_host.read_string
31+
end
32+
@matching_acl_operation = Rdkafka::Bindings.rd_kafka_AclBinding_operation(matching_acl)
33+
@matching_acl_permission_type = Rdkafka::Bindings.rd_kafka_AclBinding_permission_type(matching_acl)
34+
end
35+
end
36+
end
37+
end

0 commit comments

Comments
 (0)