Skip to content

Commit 321de2e

Browse files
authored
Add admin#delete_group utility (#353)
* Add admin#delete_group utility * change test order
1 parent 555a384 commit 321de2e

File tree

7 files changed

+230
-0
lines changed

7 files changed

+230
-0
lines changed

lib/rdkafka.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
require "rdkafka/admin"
1212
require "rdkafka/admin/create_topic_handle"
1313
require "rdkafka/admin/create_topic_report"
14+
require "rdkafka/admin/delete_groups_handle"
15+
require "rdkafka/admin/delete_groups_report"
1416
require "rdkafka/admin/delete_topic_handle"
1517
require "rdkafka/admin/delete_topic_report"
1618
require "rdkafka/admin/create_acl_handle"

lib/rdkafka/admin.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,59 @@ def create_topic(topic_name, partition_count, replication_factor, topic_config={
106106
create_topic_handle
107107
end
108108

109+
def delete_group(group_id)
110+
closed_admin_check(__method__)
111+
112+
# Create a rd_kafka_DeleteGroup_t representing the new topic
113+
delete_groups_ptr = Rdkafka::Bindings.rd_kafka_DeleteGroup_new(
114+
FFI::MemoryPointer.from_string(group_id)
115+
)
116+
117+
pointer_array = [delete_groups_ptr]
118+
groups_array_ptr = FFI::MemoryPointer.new(:pointer)
119+
groups_array_ptr.write_array_of_pointer(pointer_array)
120+
121+
# Get a pointer to the queue that our request will be enqueued on
122+
queue_ptr = @native_kafka.with_inner do |inner|
123+
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
124+
end
125+
if queue_ptr.null?
126+
Rdkafka::Bindings.rd_kafka_DeleteTopic_destroy(delete_topic_ptr)
127+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
128+
end
129+
130+
# Create and register the handle we will return to the caller
131+
delete_groups_handle = DeleteGroupsHandle.new
132+
delete_groups_handle[:pending] = true
133+
delete_groups_handle[:response] = -1
134+
DeleteGroupsHandle.register(delete_groups_handle)
135+
admin_options_ptr = @native_kafka.with_inner do |inner|
136+
Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_DELETETOPICS)
137+
end
138+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, delete_groups_handle.to_ptr)
139+
140+
begin
141+
@native_kafka.with_inner do |inner|
142+
Rdkafka::Bindings.rd_kafka_DeleteGroups(
143+
inner,
144+
groups_array_ptr,
145+
1,
146+
admin_options_ptr,
147+
queue_ptr
148+
)
149+
end
150+
rescue Exception
151+
DeleteGroupsHandle.remove(delete_groups_handle.to_ptr.address)
152+
raise
153+
ensure
154+
Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
155+
Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
156+
Rdkafka::Bindings.rd_kafka_DeleteGroup_destroy(delete_groups_ptr)
157+
end
158+
159+
delete_groups_handle
160+
end
161+
109162
# Deletes the named topic
110163
#
111164
# @return [DeleteTopicHandle] Delete topic handle that can be used to wait for the result of
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DeleteGroupsHandle < AbstractHandle
6+
layout :pending, :bool, # TODO: ???
7+
:response, :int,
8+
:error_string, :pointer,
9+
:result_name, :pointer
10+
11+
# @return [String] the name of the operation
12+
def operation_name
13+
"delete groups"
14+
end
15+
16+
def create_result
17+
DeleteGroupsReport.new(self[:error_string], self[:result_name])
18+
end
19+
20+
def raise_error
21+
raise RdkafkaError.new(
22+
self[:response],
23+
broker_message: create_result.error_string
24+
)
25+
end
26+
end
27+
end
28+
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
class Admin
5+
class DeleteGroupsReport
6+
# Any error message generated from the DeleteTopic
7+
# @return [String]
8+
attr_reader :error_string
9+
10+
# The name of the topic deleted
11+
# @return [String]
12+
attr_reader :result_name
13+
14+
def initialize(error_string, result_name)
15+
if error_string != FFI::Pointer::NULL
16+
@error_string = error_string.read_string
17+
end
18+
if result_name != FFI::Pointer::NULL
19+
@result_name = @result_name = result_name.read_string
20+
end
21+
end
22+
end
23+
end
24+
end

lib/rdkafka/bindings.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,17 @@ def self.partitioner(str, partition_count, partitioner_name = "consistent_random
302302
attach_function :rd_kafka_event_DeleteTopics_result, [:pointer], :pointer, blocking: true
303303
attach_function :rd_kafka_DeleteTopics_result_topics, [:pointer, :pointer], :pointer, blocking: true
304304

305+
# Delete Group
306+
307+
RD_KAFKA_ADMIN_OP_DELETEGROUPS = 7 # rd_kafka_admin_op_t
308+
RD_KAFKA_EVENT_DELETEGROUPS_RESULT = 106 # rd_kafka_event_type_t
309+
310+
attach_function :rd_kafka_DeleteGroups, [:pointer, :pointer, :size_t, :pointer, :pointer], :void, blocking: true
311+
attach_function :rd_kafka_DeleteGroup_new, [:pointer], :pointer, blocking: true
312+
attach_function :rd_kafka_DeleteGroup_destroy, [:pointer], :void, blocking: true
313+
attach_function :rd_kafka_event_DeleteGroups_result, [:pointer], :pointer, blocking: true # rd_kafka_event_t* => rd_kafka_DeleteGroups_result_t*
314+
attach_function :rd_kafka_DeleteGroups_result_groups, [:pointer, :pointer], :pointer, blocking: true # rd_kafka_DeleteGroups_result_t*, size_t* => rd_kafka_group_result_t**
315+
305316
# Background Queue and Callback
306317

307318
attach_function :rd_kafka_queue_get_background, [:pointer], :pointer
@@ -408,5 +419,18 @@ def self.partitioner(str, partition_count, partitioner_name = "consistent_random
408419
attach_function :rd_kafka_event_error, [:pointer], :int32
409420
attach_function :rd_kafka_event_error_string, [:pointer], :pointer
410421
attach_function :rd_kafka_AclBinding_error, [:pointer], :pointer
422+
423+
424+
# Extracting data from group results
425+
class NativeError < FFI::Struct # rd_kafka_error_t
426+
layout :code, :int32,
427+
:errstr, :pointer,
428+
:fatal, :uint8_t,
429+
:retriable, :uint8_t,
430+
:txn_requires_abort, :uint8_t
431+
end
432+
433+
attach_function :rd_kafka_group_result_error, [:pointer], NativeError.by_ref # rd_kafka_group_result_t* => rd_kafka_error_t*
434+
attach_function :rd_kafka_group_result_name, [:pointer], :pointer
411435
end
412436
end

lib/rdkafka/callbacks.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,29 @@ def self.create_topic_results_from_array(count, array_pointer)
2323
end
2424
end
2525

26+
class GroupResult
27+
attr_reader :result_error, :error_string, :result_name
28+
def initialize(group_result_pointer)
29+
native_error = Rdkafka::Bindings.rd_kafka_group_result_error(group_result_pointer)
30+
31+
if native_error.null?
32+
@result_error = 0
33+
@error_string = FFI::Pointer::NULL
34+
else
35+
@result_error = native_error[:code]
36+
@error_string = native_error[:errstr]
37+
end
38+
39+
@result_name = Rdkafka::Bindings.rd_kafka_group_result_name(group_result_pointer)
40+
end
41+
def self.create_group_results_from_array(count, array_pointer)
42+
(1..count).map do |index|
43+
result_pointer = (array_pointer + (index - 1)).read_pointer
44+
new(result_pointer)
45+
end
46+
end
47+
end
48+
2649
# Extracts attributes of rd_kafka_acl_result_t
2750
#
2851
# @private
@@ -111,6 +134,8 @@ def self.call(_, event_ptr, _)
111134
process_delete_acl(event_ptr)
112135
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
113136
process_describe_acl(event_ptr)
137+
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
138+
process_delete_groups(event_ptr)
114139
end
115140
end
116141

@@ -133,6 +158,23 @@ def self.process_create_topic(event_ptr)
133158
end
134159
end
135160

161+
def self.process_delete_groups(event_ptr)
162+
delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)
163+
164+
# Get the number of delete group results
165+
pointer_to_size_t = FFI::MemoryPointer.new(:size_t)
166+
delete_group_result_array = Rdkafka::Bindings.rd_kafka_DeleteGroups_result_groups(delete_groups_result, pointer_to_size_t)
167+
delete_group_results = GroupResult.create_group_results_from_array(pointer_to_size_t.read_int, delete_group_result_array) # TODO fix this
168+
delete_group_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
169+
170+
if (delete_group_handle = Rdkafka::Admin::DeleteGroupsHandle.remove(delete_group_handle_ptr.address))
171+
delete_group_handle[:response] = delete_group_results[0].result_error
172+
delete_group_handle[:error_string] = delete_group_results[0].error_string
173+
delete_group_handle[:result_name] = delete_group_results[0].result_name
174+
delete_group_handle[:pending] = false
175+
end
176+
end
177+
136178
def self.process_delete_topic(event_ptr)
137179
delete_topics_result = Rdkafka::Bindings.rd_kafka_event_DeleteTopics_result(event_ptr)
138180

spec/rdkafka/admin_spec.rb

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
let(:topic_replication_factor) { 1 }
2121
let(:topic_config) { {"cleanup.policy" => "compact", "min.cleanable.dirty.ratio" => 0.8} }
2222
let(:invalid_topic_config) { {"cleeeeenup.policee" => "campact"} }
23+
let(:group_name) { "test-group-#{Random.new.rand(0..1_000_000)}" }
2324

2425
let(:resource_name) {"acl-test-topic"}
2526
let(:resource_type) {Rdkafka::Bindings::RD_KAFKA_RESOURCE_TOPIC}
@@ -306,7 +307,63 @@
306307
delete_acl_report = delete_acl_handle.wait(max_wait_timeout: 15.0)
307308
expect(delete_acl_handle[:response]).to eq(0)
308309
expect(delete_acl_report.deleted_acls.length).to eq(2)
310+
311+
end
312+
end
313+
end
314+
315+
describe('Group tests') do
316+
describe "#delete_group" do
317+
describe("with an existing group") do
318+
let(:consumer_config) { rdkafka_consumer_config('group.id': group_name) }
319+
let(:producer_config) { rdkafka_producer_config }
320+
let(:producer) { producer_config.producer }
321+
let(:consumer) { consumer_config.consumer }
322+
323+
before do
324+
# Create a topic, post a message to it, consume it and commit offsets, this will create a group that we can then delete.
325+
admin.create_topic(topic_name, topic_partition_count, topic_replication_factor).wait(max_wait_timeout: 15.0)
326+
327+
producer.produce(topic: topic_name, payload: "test", key: "test").wait(max_wait_timeout: 15.0)
328+
329+
consumer.subscribe(topic_name)
330+
wait_for_assignment(consumer)
331+
message = consumer.poll(100)
332+
333+
expect(message).to_not be_nil
334+
335+
consumer.commit
336+
consumer.close
337+
end
338+
339+
after do
340+
producer.close
341+
consumer.close
342+
end
343+
344+
it "deletes the group" do
345+
delete_group_handle = admin.delete_group(group_name)
346+
report = delete_group_handle.wait(max_wait_timeout: 15.0)
347+
348+
expect(report.result_name).to eql(group_name)
349+
end
309350
end
351+
352+
describe "called with invalid input" do
353+
describe "with the name of a group that does not exist" do
354+
it "raises an exception" do
355+
delete_group_handle = admin.delete_group(group_name)
356+
357+
expect {
358+
delete_group_handle.wait(max_wait_timeout: 15.0)
359+
}.to raise_exception { |ex|
360+
expect(ex).to be_a(Rdkafka::RdkafkaError)
361+
expect(ex.message).to match(/Broker: The group id does not exist \(group_id_not_found\)/)
362+
}
363+
end
364+
end
365+
end
366+
310367
end
311368
end
312369
end

0 commit comments

Comments
 (0)