Skip to content

Commit fa5db23

Browse files
authored
provide primitives to fetch consumer group metadata pointers (#393)
1 parent 0a7d561 commit fa5db23

File tree

4 files changed

+34
-6
lines changed

4 files changed

+34
-6
lines changed

lib/rdkafka/bindings.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ class TopicPartitionList < FFI::Struct
191191
attach_function :rd_kafka_seek, [:pointer, :int32, :int64, :int], :int, blocking: true
192192
attach_function :rd_kafka_offsets_for_times, [:pointer, :pointer, :int], :int, blocking: true
193193
attach_function :rd_kafka_position, [:pointer, :pointer], :int, blocking: true
194+
# those two are used for eos support
195+
attach_function :rd_kafka_consumer_group_metadata, [:pointer], :pointer, blocking: true
196+
attach_function :rd_kafka_consumer_group_metadata_destroy, [:pointer], :void, blocking: true
194197

195198
# Headers
196199
attach_function :rd_kafka_header_get_all, [:pointer, :size_t, :pointer, :pointer, SizePtr], :int

lib/rdkafka/callbacks.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ def self.process_describe_acl(event_ptr)
261261
end
262262
end
263263
end
264-
265264
end
266265

267266
# FFI Function used for Message Delivery callbacks

lib/rdkafka/consumer.rb

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ def initialize(native_kafka)
1919
@native_kafka = native_kafka
2020
end
2121

22-
def finalizer
23-
->(_) { close }
24-
end
25-
2622
# @return [String] consumer name
2723
def name
2824
@name ||= @native_kafka.with_inner do |inner|
2925
::Rdkafka::Bindings.rd_kafka_name(inner)
3026
end
3127
end
3228

29+
def finalizer
30+
->(_) { close }
31+
end
32+
3333
# Close this consumer
3434
# @return [nil]
3535
def close
@@ -239,7 +239,7 @@ def assignment_lost?
239239
# @param timeout_ms [Integer] The timeout for fetching this information.
240240
# @return [TopicPartitionList]
241241
# @raise [RdkafkaError] When getting the committed positions fails.
242-
def committed(list=nil, timeout_ms=1200)
242+
def committed(list=nil, timeout_ms=2000)
243243
closed_consumer_check(__method__)
244244

245245
if list.nil?
@@ -672,6 +672,22 @@ def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250
672672
end
673673
end
674674

675+
# Returns pointer to the consumer group metadata. It is used only in the context of
676+
# exactly-once-semantics in transactions, this is why it is never remapped to Ruby
677+
#
678+
# This API is **not** usable by itself from Ruby
679+
#
680+
# @note This pointer **needs** to be removed with `#rd_kafka_consumer_group_metadata_destroy`
681+
#
682+
# @private
683+
def consumer_group_metadata_pointer
684+
closed_consumer_check(__method__)
685+
686+
@native_kafka.with_inner do |inner|
687+
Bindings.rd_kafka_consumer_group_metadata(inner)
688+
end
689+
end
690+
675691
private
676692

677693
def closed_consumer_check(method)

spec/rdkafka/consumer_spec.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,16 @@ def send_one_message(val)
11331133
end
11341134
end
11351135

1136+
describe '#consumer_group_metadata_pointer' do
1137+
let(:pointer) { consumer.consumer_group_metadata_pointer }
1138+
1139+
after { Rdkafka::Bindings.rd_kafka_consumer_group_metadata_destroy(pointer) }
1140+
1141+
it 'expect to return a pointer' do
1142+
expect(pointer).to be_a(FFI::Pointer)
1143+
end
1144+
end
1145+
11361146
describe "a rebalance listener" do
11371147
let(:consumer) do
11381148
config = rdkafka_consumer_config

0 commit comments

Comments
 (0)