Skip to content

Commit 368e81d

Browse files
authored
support assignment lost (#358)
1 parent 84ac9a4 commit 368e81d

File tree

4 files changed

+33
-0
lines changed

4 files changed

+33
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- **[Feature]** Add `Admin#create_partitions` (mensfeld)
66
- **[Feature]** Add `Admin#delete_group` utility (piotaixr)
77
- **[Feature]** Add Create and Delete ACL Feature To Admin Functions (vgnanasekaran)
8+
- **[Feature]** Support `#assignment_lost?` on a consumer to check for involuntary assignment revocation (mensfeld)
89
- [Enhancement] Expose alternative way of managing consumer events via a separate queue (mensfeld)
910
- [Enhancement] Bump librdkafka to 2.3.0 (mensfeld)
1011
- [Enhancement] Increase the `#lag` and `#query_watermark_offsets` default timeouts from 100ms to 1000ms. This will compensate for network glitches and remote clusters operations (mensfeld)

lib/rdkafka/bindings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ class TopicPartitionList < FFI::Struct
179179
attach_function :rd_kafka_incremental_assign, [:pointer, :pointer], :int, blocking: true
180180
attach_function :rd_kafka_incremental_unassign, [:pointer, :pointer], :int, blocking: true
181181
attach_function :rd_kafka_assignment, [:pointer, :pointer], :int, blocking: true
182+
attach_function :rd_kafka_assignment_lost, [:pointer], :int, blocking: true
182183
attach_function :rd_kafka_committed, [:pointer, :pointer, :int], :int, blocking: true
183184
attach_function :rd_kafka_commit, [:pointer, :pointer, :bool], :int, blocking: true
184185
attach_function :rd_kafka_poll_set_consumer, [:pointer], :void, blocking: true

lib/rdkafka/consumer.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,15 @@ def assignment
221221
ptr.free unless ptr.nil?
222222
end
223223

224+
# @return [Boolean] true if our current assignment has been lost involuntarily.
225+
def assignment_lost?
226+
closed_consumer_check(__method__)
227+
228+
@native_kafka.with_inner do |inner|
229+
!Rdkafka::Bindings.rd_kafka_assignment_lost(inner).zero?
230+
end
231+
end
232+
224233
# Return the current committed offset per partition for this consumer group.
225234
# The offset field of each requested partition will either be set to stored offset or to -1001
226235
# in case there was no stored offset for that partition.

spec/rdkafka/consumer_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,28 @@ def send_one_message(val)
297297
end
298298
end
299299

300+
describe '#assignment_lost?' do
301+
it "should not return true as we do have an assignment" do
302+
consumer.subscribe("consume_test_topic")
303+
expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
304+
list.add_topic("consume_test_topic")
305+
end
306+
307+
expect(consumer.assignment_lost?).to eq false
308+
consumer.unsubscribe
309+
end
310+
311+
it "should not return true after voluntary unsubscribing" do
312+
consumer.subscribe("consume_test_topic")
313+
expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
314+
list.add_topic("consume_test_topic")
315+
end
316+
317+
consumer.unsubscribe
318+
expect(consumer.assignment_lost?).to eq false
319+
end
320+
end
321+
300322
describe "#close" do
301323
it "should close a consumer" do
302324
consumer.subscribe("consume_test_topic")

0 commit comments

Comments
 (0)