Skip to content

Commit d5620ae

Browse files
authored
consumer position (#339)
1 parent 3ecae0e commit d5620ae

File tree

4 files changed

+106
-38
lines changed

4 files changed

+106
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Rdkafka Changelog
22

33
## 0.14.0 (Unreleased)
4+
* [Enhancement] Get consumer position (thijsc & mensfeld)
45
* [Enhancement] Provide `#purge` to remove any outstanding requests from the producer (mensfeld)
56
* [Enhancement] Update `librdkafka` to `2.2.0` (mensfeld)
67
* [Enhancement] Introduce producer partitions count metadata cache (mensfeld)

lib/rdkafka/bindings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class TopicPartitionList < FFI::Struct
189189
attach_function :rd_kafka_resume_partitions, [:pointer, :pointer], :int, blocking: true
190190
attach_function :rd_kafka_seek, [:pointer, :int32, :int64, :int], :int, blocking: true
191191
attach_function :rd_kafka_offsets_for_times, [:pointer, :pointer, :int], :int, blocking: true
192+
attach_function :rd_kafka_position, [:pointer, :pointer], :int, blocking: true
192193

193194
# Headers
194195
attach_function :rd_kafka_header_get_all, [:pointer, :size_t, :pointer, :pointer, SizePtr], :int

lib/rdkafka/consumer.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,34 @@ def committed(list=nil, timeout_ms=1200)
254254
end
255255
end
256256

257+
# Return the current positions (offsets) for topics and partitions.
258+
# The offset field of each requested partition will be set to the offset of the last consumed message + 1, or nil in case there was no previous message.
259+
#
260+
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
261+
#
262+
# @raise [RdkafkaError] When getting the positions fails.
263+
#
264+
# @return [TopicPartitionList]
265+
def position(list=nil)
266+
if list.nil?
267+
list = assignment
268+
elsif !list.is_a?(TopicPartitionList)
269+
raise TypeError.new("list has to be nil or a TopicPartitionList")
270+
end
271+
272+
tpl = list.to_native_tpl
273+
274+
response = @native_kafka.with_inner do |inner|
275+
Rdkafka::Bindings.rd_kafka_position(inner, tpl)
276+
end
277+
278+
if response != 0
279+
raise Rdkafka::RdkafkaError.new(response)
280+
end
281+
282+
TopicPartitionList.from_native_tpl(tpl)
283+
end
284+
257285
# Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.
258286
#
259287
# @param topic [String] The topic to query

spec/rdkafka/consumer_spec.rb

Lines changed: 76 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,9 @@ def send_one_message(val)
314314
end
315315
end
316316

317-
describe "#commit, #committed and #store_offset" do
318-
# Make sure there's a stored offset
317+
318+
describe "#position, #commit, #committed and #store_offset" do
319+
# Make sure there are messages to work with
319320
let!(:report) do
320321
producer.produce(
321322
topic: "consume_test_topic",
@@ -333,29 +334,33 @@ def send_one_message(val)
333334
)
334335
end
335336

336-
it "should only accept a topic partition list in committed" do
337-
expect {
338-
consumer.committed("list")
339-
}.to raise_error TypeError
337+
describe "#position" do
338+
it "should only accept a topic partition list in position if not nil" do
339+
expect {
340+
consumer.position("list")
341+
}.to raise_error TypeError
342+
end
340343
end
341344

342-
it "should commit in sync mode" do
343-
expect {
344-
consumer.commit(nil, true)
345-
}.not_to raise_error
346-
end
345+
describe "#committed" do
346+
it "should only accept a topic partition list in commit if not nil" do
347+
expect {
348+
consumer.commit("list")
349+
}.to raise_error TypeError
350+
end
347351

348-
it "should only accept a topic partition list in commit if not nil" do
349-
expect {
350-
consumer.commit("list")
351-
}.to raise_error TypeError
352+
it "should commit in sync mode" do
353+
expect {
354+
consumer.commit(nil, true)
355+
}.not_to raise_error
356+
end
352357
end
353358

354359
context "with a committed consumer" do
355360
before :all do
356361
# Make sure there are some messages.
357362
handles = []
358-
producer = rdkafka_producer_config.producer
363+
producer = rdkafka_config.producer
359364
10.times do
360365
(0..2).each do |i|
361366
handles << producer.produce(
@@ -399,31 +404,33 @@ def send_one_message(val)
399404
}.to raise_error(Rdkafka::RdkafkaError)
400405
end
401406

402-
it "should fetch the committed offsets for the current assignment" do
403-
partitions = consumer.committed.to_h["consume_test_topic"]
404-
expect(partitions).not_to be_nil
405-
expect(partitions[0].offset).to eq 1
406-
end
407+
describe "#committed" do
408+
it "should fetch the committed offsets for the current assignment" do
409+
partitions = consumer.committed.to_h["consume_test_topic"]
410+
expect(partitions).not_to be_nil
411+
expect(partitions[0].offset).to eq 1
412+
end
407413

408-
it "should fetch the committed offsets for a specified topic partition list" do
409-
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
410-
list.add_topic("consume_test_topic", [0, 1, 2])
414+
it "should fetch the committed offsets for a specified topic partition list" do
415+
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
416+
list.add_topic("consume_test_topic", [0, 1, 2])
417+
end
418+
partitions = consumer.committed(list).to_h["consume_test_topic"]
419+
expect(partitions).not_to be_nil
420+
expect(partitions[0].offset).to eq 1
421+
expect(partitions[1].offset).to eq 1
422+
expect(partitions[2].offset).to eq 1
411423
end
412-
partitions = consumer.committed(list).to_h["consume_test_topic"]
413-
expect(partitions).not_to be_nil
414-
expect(partitions[0].offset).to eq 1
415-
expect(partitions[1].offset).to eq 1
416-
expect(partitions[2].offset).to eq 1
417-
end
418424

419-
it "should raise an error when getting committed fails" do
420-
expect(Rdkafka::Bindings).to receive(:rd_kafka_committed).and_return(20)
421-
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
422-
list.add_topic("consume_test_topic", [0, 1, 2])
425+
it "should raise an error when getting committed fails" do
426+
expect(Rdkafka::Bindings).to receive(:rd_kafka_committed).and_return(20)
427+
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
428+
list.add_topic("consume_test_topic", [0, 1, 2])
429+
end
430+
expect {
431+
consumer.committed(list)
432+
}.to raise_error Rdkafka::RdkafkaError
423433
end
424-
expect {
425-
consumer.committed(list)
426-
}.to raise_error Rdkafka::RdkafkaError
427434
end
428435

429436
describe "#store_offset" do
@@ -444,6 +451,8 @@ def send_one_message(val)
444451
@new_consumer.store_offset(message)
445452
@new_consumer.commit
446453

454+
# TODO use position here, should be at offset
455+
447456
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
448457
list.add_topic("consume_test_topic", [0, 1, 2])
449458
end
@@ -458,6 +467,35 @@ def send_one_message(val)
458467
@new_consumer.store_offset(message)
459468
}.to raise_error Rdkafka::RdkafkaError
460469
end
470+
471+
describe "#position" do
472+
it "should fetch the positions for the current assignment" do
473+
consumer.store_offset(message)
474+
475+
partitions = consumer.position.to_h["consume_test_topic"]
476+
expect(partitions).not_to be_nil
477+
expect(partitions[0].offset).to eq message.offset + 1
478+
end
479+
480+
it "should fetch the positions for a specified assignment" do
481+
consumer.store_offset(message)
482+
483+
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
484+
list.add_topic_and_partitions_with_offsets("consume_test_topic", 0 => nil, 1 => nil, 2 => nil)
485+
end
486+
partitions = consumer.position(list).to_h["consume_test_topic"]
487+
expect(partitions).not_to be_nil
488+
expect(partitions[0].offset).to eq message.offset + 1
489+
end
490+
491+
it "should raise an error when getting the position fails" do
492+
expect(Rdkafka::Bindings).to receive(:rd_kafka_position).and_return(20)
493+
494+
expect {
495+
consumer.position
496+
}.to raise_error(Rdkafka::RdkafkaError)
497+
end
498+
end
461499
end
462500
end
463501
end
@@ -1090,7 +1128,7 @@ def on_partitions_revoked(list)
10901128
:assign => [ nil ],
10911129
:assignment => nil,
10921130
:committed => [],
1093-
:query_watermark_offsets => [ nil, nil ],
1131+
:query_watermark_offsets => [ nil, nil ]
10941132
}.each do |method, args|
10951133
it "raises an exception if #{method} is called" do
10961134
expect {

0 commit comments

Comments
 (0)