Skip to content

Commit 3ecae0e

Browse files
mensfeldtimflappernijikon
authored
Add offsets_for_times method on consumer (#229) (#265)
* Add offsets_for_times method on consumer (#229) * Add raw_timestamp attr_reader to Message class * Add offsets_for_times method to Consumer class * Make sure offsets_for_times call is blocking * Fix @param typo * make timeout configurable for offsets_for_times * Remove publication of raw_timestamp * Convert offset to ms timestamp if Time * Fix typo * align inner format --------- Co-authored-by: Tim Flapper <[email protected]> Co-authored-by: Tomasz Pajor <[email protected]>
1 parent 07cd994 commit 3ecae0e

File tree

5 files changed

+119
-1
lines changed

5 files changed

+119
-1
lines changed

lib/rdkafka/bindings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ class TopicPartitionList < FFI::Struct
188188
attach_function :rd_kafka_pause_partitions, [:pointer, :pointer], :int, blocking: true
189189
attach_function :rd_kafka_resume_partitions, [:pointer, :pointer], :int, blocking: true
190190
attach_function :rd_kafka_seek, [:pointer, :int32, :int64, :int], :int, blocking: true
191+
attach_function :rd_kafka_offsets_for_times, [:pointer, :pointer, :int], :int, blocking: true
191192

192193
# Headers
193194
attach_function :rd_kafka_header_get_all, [:pointer, :size_t, :pointer, :pointer, SizePtr], :int

lib/rdkafka/consumer.rb

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,39 @@ def seek(message)
406406
end
407407
end
408408

409+
# Lookup offset for the given partitions by timestamp.
410+
#
411+
# @param list [TopicPartitionList] The TopicPartitionList with timestamps instead of offsets
412+
#
413+
# @raise [RdKafkaError] When the OffsetForTimes lookup fails
414+
#
415+
# @return [TopicPartitionList]
416+
def offsets_for_times(list, timeout_ms = 1000)
417+
closed_consumer_check(__method__)
418+
419+
if !list.is_a?(TopicPartitionList)
420+
raise TypeError.new("list has to be a TopicPartitionList")
421+
end
422+
423+
tpl = list.to_native_tpl
424+
425+
response = @native_kafka.with_inner do |inner|
426+
Rdkafka::Bindings.rd_kafka_offsets_for_times(
427+
inner,
428+
tpl,
429+
timeout_ms # timeout
430+
)
431+
end
432+
433+
if response != 0
434+
raise Rdkafka::RdkafkaError.new(response)
435+
end
436+
437+
TopicPartitionList.from_native_tpl(tpl)
438+
ensure
439+
Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
440+
end
441+
409442
# Manually commit the current offsets of this consumer.
410443
#
411444
# To use this set `enable.auto.commit`to `false` to disable automatic triggering

lib/rdkafka/consumer/topic_partition_list.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,13 @@ def to_native_tpl
142142
)
143143

144144
if p.offset
145+
offset = p.offset.is_a?(Time) ? p.offset.to_f * 1_000 : p.offset
146+
145147
Rdkafka::Bindings.rd_kafka_topic_partition_list_set_offset(
146148
tpl,
147149
topic,
148150
p.partition,
149-
p.offset
151+
offset
150152
)
151153
end
152154
end

spec/rdkafka/consumer/topic_partition_list_spec.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,5 +219,24 @@
219219

220220
expect(list).to eq other
221221
end
222+
223+
it "should create a native list with timetamp offsets if offsets are Time" do
224+
list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
225+
list.add_topic_and_partitions_with_offsets("topic", 0 => Time.at(1505069646, 250_000))
226+
end
227+
228+
tpl = list.to_native_tpl
229+
230+
compare_list = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
231+
list.add_topic_and_partitions_with_offsets(
232+
"topic",
233+
0 => (Time.at(1505069646, 250_000).to_f * 1000).floor
234+
)
235+
end
236+
237+
native_list = Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
238+
239+
expect(native_list).to eq compare_list
240+
end
222241
end
223242
end

spec/rdkafka/consumer_spec.rb

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -953,6 +953,69 @@ def new_message
953953
end
954954
end
955955

956+
describe "#offsets_for_times" do
957+
it "should raise when not TopicPartitionList" do
958+
expect { consumer.offsets_for_times([]) }.to raise_error(TypeError)
959+
end
960+
961+
it "should raise an error when offsets_for_times fails" do
962+
tpl = Rdkafka::Consumer::TopicPartitionList.new
963+
964+
expect(Rdkafka::Bindings).to receive(:rd_kafka_offsets_for_times).and_return(7)
965+
966+
expect { consumer.offsets_for_times(tpl) }.to raise_error(Rdkafka::RdkafkaError)
967+
end
968+
969+
context "when subscribed" do
970+
let(:timeout) { 1000 }
971+
972+
before do
973+
consumer.subscribe("consume_test_topic")
974+
975+
# 1. partitions are assigned
976+
wait_for_assignment(consumer)
977+
expect(consumer.assignment).not_to be_empty
978+
979+
# 2. eat unrelated messages
980+
while(consumer.poll(timeout)) do; end
981+
end
982+
983+
after { consumer.unsubscribe }
984+
985+
def send_one_message(val)
986+
producer.produce(
987+
topic: "consume_test_topic",
988+
payload: "payload #{val}",
989+
key: "key 0",
990+
partition: 0
991+
).wait
992+
end
993+
994+
it "returns a TopicParticionList with updated offsets" do
995+
send_one_message("a")
996+
send_one_message("b")
997+
send_one_message("c")
998+
999+
consumer.poll(timeout)
1000+
message = consumer.poll(timeout)
1001+
consumer.poll(timeout)
1002+
1003+
tpl = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
1004+
list.add_topic_and_partitions_with_offsets(
1005+
"consume_test_topic",
1006+
[
1007+
[0, message.timestamp]
1008+
]
1009+
)
1010+
end
1011+
1012+
tpl_response = consumer.offsets_for_times(tpl)
1013+
1014+
expect(tpl_response.to_h["consume_test_topic"][0].offset).to eq message.offset
1015+
end
1016+
end
1017+
end
1018+
9561019
describe "a rebalance listener" do
9571020
let(:consumer) do
9581021
config = rdkafka_consumer_config

0 commit comments

Comments
 (0)