Skip to content

Commit 77bf4ab

Browse files
authored
backport partitions creation (#355)
1 parent cfdbd70 commit 77bf4ab

File tree

9 files changed

+184
-0
lines changed

9 files changed

+184
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Rdkafka Changelog
22

33
## 0.15.0 (Unreleased)
4+
- **[Feature]** Add `Admin#metadata` (mensfeld)
5+
- **[Feature]** Add `Admin#create_partitions` (mensfeld)
46
- **[Feature]** Add `Admin#delete_group` utility (piotaixr)
57
- **[Feature]** Add Create and Delete ACL Feature To Admin Functions (vgnanasekaran)
68
- [Enhancement] Bump librdkafka to 2.3.0 (mensfeld)

lib/rdkafka.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
require "rdkafka/admin/delete_groups_report"
1616
require "rdkafka/admin/delete_topic_handle"
1717
require "rdkafka/admin/delete_topic_report"
18+
require "rdkafka/admin/create_partitions_handle"
19+
require "rdkafka/admin/create_partitions_report"
1820
require "rdkafka/admin/create_acl_handle"
1921
require "rdkafka/admin/create_acl_report"
2022
require "rdkafka/admin/delete_acl_handle"

lib/rdkafka/admin.rb

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,19 @@ def finalizer
1414
->(_) { close }
1515
end
1616

17+
# Performs the metadata request using admin
18+
#
19+
# @param topic_name [String, nil] metadat about particular topic or all if nil
20+
# @param timeout_ms [Integer] metadata request timeout
21+
# @return [Metadata] requested metadata
22+
def metadata(topic_name = nil, timeout_ms = 2_000)
23+
closed_admin_check(__method__)
24+
25+
@native_kafka.with_inner do |inner|
26+
Metadata.new(inner, topic_name, timeout_ms)
27+
end
28+
end
29+
1730
# Close this admin instance
1831
def close
1932
return if closed?
@@ -216,6 +229,71 @@ def delete_topic(topic_name)
216229
delete_topic_handle
217230
end
218231

232+
# Creates extra partitions for a given topic
233+
#
234+
# @param topic_name [String]
235+
# @param partition_count [Integer] how many partitions we want to end up with for given topic
236+
#
237+
# @raise [ConfigError] When the partition count or replication factor are out of valid range
238+
# @raise [RdkafkaError] When the topic name is invalid or the topic already exists
239+
# @raise [RdkafkaError] When the topic configuration is invalid
240+
#
241+
# @return [CreateTopicHandle] Create topic handle that can be used to wait for the result of creating the topic
242+
def create_partitions(topic_name, partition_count)
243+
closed_admin_check(__method__)
244+
245+
@native_kafka.with_inner do |inner|
246+
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
247+
new_partitions_ptr = Rdkafka::Bindings.rd_kafka_NewPartitions_new(
248+
FFI::MemoryPointer.from_string(topic_name),
249+
partition_count,
250+
error_buffer,
251+
256
252+
)
253+
if new_partitions_ptr.null?
254+
raise Rdkafka::Config::ConfigError.new(error_buffer.read_string)
255+
end
256+
257+
pointer_array = [new_partitions_ptr]
258+
topics_array_ptr = FFI::MemoryPointer.new(:pointer)
259+
topics_array_ptr.write_array_of_pointer(pointer_array)
260+
261+
# Get a pointer to the queue that our request will be enqueued on
262+
queue_ptr = Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
263+
if queue_ptr.null?
264+
Rdkafka::Bindings.rd_kafka_NewPartitions_destroy(new_partitions_ptr)
265+
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
266+
end
267+
268+
# Create and register the handle we will return to the caller
269+
create_partitions_handle = CreatePartitionsHandle.new
270+
create_partitions_handle[:pending] = true
271+
create_partitions_handle[:response] = -1
272+
CreatePartitionsHandle.register(create_partitions_handle)
273+
admin_options_ptr = Rdkafka::Bindings.rd_kafka_AdminOptions_new(inner, Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS)
274+
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, create_partitions_handle.to_ptr)
275+
276+
begin
277+
Rdkafka::Bindings.rd_kafka_CreatePartitions(
278+
inner,
279+
topics_array_ptr,
280+
1,
281+
admin_options_ptr,
282+
queue_ptr
283+
)
284+
rescue Exception
285+
CreatePartitionsHandle.remove(create_partitions_handle.to_ptr.address)
286+
raise
287+
ensure
288+
Rdkafka::Bindings.rd_kafka_AdminOptions_destroy(admin_options_ptr)
289+
Rdkafka::Bindings.rd_kafka_queue_destroy(queue_ptr)
290+
Rdkafka::Bindings.rd_kafka_NewPartitions_destroy(new_partitions_ptr)
291+
end
292+
293+
create_partitions_handle
294+
end
295+
end
296+
219297
# Create acl
220298
# @param resource_type - values of type rd_kafka_ResourceType_t
221299
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
@@ -528,6 +606,7 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip
528606
end
529607

530608
private
609+
531610
def closed_admin_check(method)
532611
raise Rdkafka::ClosedAdminError.new(method) if closed?
533612
end
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
module Rdkafka
2+
class Admin
3+
class CreatePartitionsHandle < AbstractHandle
4+
layout :pending, :bool,
5+
:response, :int,
6+
:error_string, :pointer,
7+
:result_name, :pointer
8+
9+
# @return [String] the name of the operation
10+
def operation_name
11+
"create partitions"
12+
end
13+
14+
# @return [Boolean] whether the create topic was successful
15+
def create_result
16+
CreatePartitionsReport.new(self[:error_string], self[:result_name])
17+
end
18+
19+
def raise_error
20+
raise RdkafkaError.new(
21+
self[:response],
22+
broker_message: CreateTopicReport.new(self[:error_string], self[:result_name]).error_string
23+
)
24+
end
25+
end
26+
end
27+
end
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
module Rdkafka
2+
class Admin
3+
class CreatePartitionsReport < CreateTopicReport
4+
end
5+
end
6+
end

lib/rdkafka/bindings.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,16 @@ def self.partitioner(str, partition_count, partitioner_name = "consistent_random
302302
attach_function :rd_kafka_event_DeleteTopics_result, [:pointer], :pointer, blocking: true
303303
attach_function :rd_kafka_DeleteTopics_result_topics, [:pointer, :pointer], :pointer, blocking: true
304304

305+
# Create partitions
306+
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS = 3
307+
RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT = 102
308+
309+
attach_function :rd_kafka_CreatePartitions, [:pointer, :pointer, :size_t, :pointer, :pointer], :void
310+
attach_function :rd_kafka_NewPartitions_new, %i[pointer size_t pointer size_t], :pointer
311+
attach_function :rd_kafka_NewPartitions_destroy, [:pointer], :void
312+
attach_function :rd_kafka_event_CreatePartitions_result, [:pointer], :pointer
313+
attach_function :rd_kafka_CreatePartitions_result_topics, [:pointer, :pointer], :pointer
314+
305315
# Delete Group
306316

307317
RD_KAFKA_ADMIN_OP_DELETEGROUPS = 7 # rd_kafka_admin_op_t

lib/rdkafka/callbacks.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ def self.call(_, event_ptr, _)
128128
process_create_topic(event_ptr)
129129
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
130130
process_delete_topic(event_ptr)
131+
elsif event_type == Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
132+
process_create_partitions(event_ptr)
131133
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
132134
process_create_acl(event_ptr)
133135
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
@@ -192,6 +194,23 @@ def self.process_delete_topic(event_ptr)
192194
end
193195
end
194196

197+
def self.process_create_partitions(event_ptr)
198+
create_partitionss_result = Rdkafka::Bindings.rd_kafka_event_CreatePartitions_result(event_ptr)
199+
200+
# Get the number of create topic results
201+
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
202+
create_partitions_result_array = Rdkafka::Bindings.rd_kafka_CreatePartitions_result_topics(create_partitionss_result, pointer_to_size_t)
203+
create_partitions_results = TopicResult.create_topic_results_from_array(pointer_to_size_t.read_int, create_partitions_result_array)
204+
create_partitions_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
205+
206+
if create_partitions_handle = Rdkafka::Admin::CreatePartitionsHandle.remove(create_partitions_handle_ptr.address)
207+
create_partitions_handle[:response] = create_partitions_results[0].result_error
208+
create_partitions_handle[:error_string] = create_partitions_results[0].error_string
209+
create_partitions_handle[:result_name] = create_partitions_results[0].result_name
210+
create_partitions_handle[:pending] = false
211+
end
212+
end
213+
195214
def self.process_create_acl(event_ptr)
196215
create_acls_result = Rdkafka::Bindings.rd_kafka_event_CreateAcls_result(event_ptr)
197216

spec/rdkafka/admin_spec.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
after do
1010
# Registry should always end up being empty
1111
expect(Rdkafka::Admin::CreateTopicHandle::REGISTRY).to be_empty
12+
expect(Rdkafka::Admin::CreatePartitionsHandle::REGISTRY).to be_empty
1213
expect(Rdkafka::Admin::DescribeAclHandle::REGISTRY).to be_empty
1314
expect(Rdkafka::Admin::CreateAclHandle::REGISTRY).to be_empty
1415
expect(Rdkafka::Admin::DeleteAclHandle::REGISTRY).to be_empty
@@ -366,4 +367,41 @@
366367

367368
end
368369
end
370+
371+
describe '#create_partitions' do
372+
let(:metadata) { admin.metadata(topic_name).topics.first }
373+
374+
context 'when topic does not exist' do
375+
it 'expect to fail due to unknown partition' do
376+
expect { admin.create_partitions(topic_name, 10).wait }.to raise_error(Rdkafka::RdkafkaError, /unknown_topic_or_part/)
377+
end
378+
end
379+
380+
context 'when topic already has the desired number of partitions' do
381+
before { admin.create_topic(topic_name, 2, 1).wait }
382+
383+
it 'expect not to change number of partitions' do
384+
expect { admin.create_partitions(topic_name, 2).wait }.to raise_error(Rdkafka::RdkafkaError, /invalid_partitions/)
385+
expect(metadata[:partition_count]).to eq(2)
386+
end
387+
end
388+
389+
context 'when topic has more than the requested number of partitions' do
390+
before { admin.create_topic(topic_name, 5, 1).wait }
391+
392+
it 'expect not to change number of partitions' do
393+
expect { admin.create_partitions(topic_name, 2).wait }.to raise_error(Rdkafka::RdkafkaError, /invalid_partitions/)
394+
expect(metadata[:partition_count]).to eq(5)
395+
end
396+
end
397+
398+
context 'when topic has less then desired number of partitions' do
399+
before { admin.create_topic(topic_name, 1, 1).wait }
400+
401+
it 'expect to change number of partitions' do
402+
admin.create_partitions(topic_name, 10).wait
403+
expect(metadata[:partition_count]).to eq(10)
404+
end
405+
end
406+
end
369407
end

spec/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
require "rspec"
1212
require "rdkafka"
1313
require "timeout"
14+
require "securerandom"
1415

1516
def rdkafka_base_config
1617
{

0 commit comments

Comments
 (0)