Skip to content

Commit e225aab

Browse files
authored
Merge pull request #814 from karafka/feature/events-poll-nb-each
Add events_poll_nb_each and poll_nb_each for fiber scheduler integration
1 parent 7fd16bd commit e225aab

File tree

7 files changed

+341
-50
lines changed

7 files changed

+341
-50
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
## 0.25.1 (Unreleased)
44
- [Enhancement] Use native ARM64 runners instead of QEMU emulation for Alpine musl aarch64 builds, improving build performance and reliability.
55
- [Enhancement] Enable parallel compilation (`make -j$(nproc)`) for ARM64 Alpine musl builds.
6-
- [Enhancement] Add file descriptor API for fiber scheduler integration. Expose `queue_fd` and `background_queue_fd` on `Consumer`, `Producer`, and `Admin` to enable non-blocking monitoring with select/poll/epoll for integration with Ruby fiber schedulers (Falcon, Async) and custom event loops.
7-
- [Enhancement] Add non-blocking poll methods (`poll_nb`, `events_poll_nb`) that skip GVL release for efficient fiber scheduler integration when using `poll(0)`.
6+
- [Enhancement] Add file descriptor API for fiber scheduler integration. Expose `enable_queue_io_events` and `enable_background_queue_io_events` on `Consumer`, `Producer`, and `Admin` to enable non-blocking monitoring with select/poll/epoll for integration with Ruby fiber schedulers (Falcon, Async) and custom event loops.
7+
- [Enhancement] Add non-blocking poll methods (`poll_nb`, `events_poll_nb`) on `Consumer` that skip GVL release for efficient fiber scheduler integration when using `poll(0)`.
8+
- [Enhancement] Add `events_poll_nb_each` method on `Producer`, `Consumer`, and `Admin` for polling events in a single GVL/mutex session. Yields count after each iteration, caller returns `:stop` to break.
9+
- [Enhancement] Add `poll_nb_each` method on `Consumer` for non-blocking message polling with proper resource cleanup, yielding each message and supporting early termination via `:stop` return value.
810

911
## 0.25.0 (2026-01-20)
1012
- **[Deprecation]** `AbstractHandle#wait` parameter `max_wait_timeout:` (seconds) is deprecated in favor of `max_wait_timeout_ms:` (milliseconds). The old parameter still works but will be removed in v1.0.0.

lib/rdkafka/admin.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,45 @@ def enable_background_queue_io_events(fd, payload = "\x01")
9191
@native_kafka.enable_background_queue_io_events(fd, payload)
9292
end
9393

94+
# Polls for events in a non-blocking loop, yielding the count after each iteration.
95+
#
96+
# This method processes events (stats, errors, etc.) in a single GVL/mutex session,
97+
# which is more efficient than repeated individual polls. It uses non-blocking polls
98+
# internally (no GVL release between polls).
99+
#
100+
# Yields the count of events processed after each poll iteration, allowing the caller
101+
# to implement timeout or other termination logic by returning `:stop`.
102+
#
103+
# @yield [count] Called after each poll iteration
104+
# @yieldparam count [Integer] Number of events processed in this iteration
105+
# @yieldreturn [Symbol, Object] Return `:stop` to break the loop, any other value continues
106+
# @return [nil]
107+
# @raise [Rdkafka::ClosedAdminError] if called on a closed admin client
108+
#
109+
# @note This method holds the inner lock until the queue is empty or `:stop` is returned.
110+
# Other admin operations will wait until this method returns.
111+
# @note This method is thread-safe as it uses @native_kafka.with_inner synchronization
112+
#
113+
# @example Drain all pending events
114+
# admin.events_poll_nb_each { |_count| }
115+
#
116+
# @example With timeout control
117+
# deadline = monotonic_now + timeout_ms
118+
# admin.events_poll_nb_each do |_count|
119+
# :stop if monotonic_now >= deadline
120+
# end
121+
def events_poll_nb_each
122+
closed_admin_check(__method__)
123+
124+
@native_kafka.with_inner do |inner|
125+
loop do
126+
count = Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0)
127+
break if count.zero?
128+
break if yield(count) == :stop
129+
end
130+
end
131+
end
132+
94133
# @return [Proc] finalizer proc for closing the admin
95134
# @private
96135
def finalizer

lib/rdkafka/consumer.rb

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,103 @@ def enable_background_queue_io_events(fd, payload = "\x01")
7575
@native_kafka.enable_background_queue_io_events(fd, payload)
7676
end
7777

78+
# Polls for events in a non-blocking loop, yielding the count after each iteration.
79+
#
80+
# This method processes events (stats, errors, etc.) in a single GVL/mutex session,
81+
# which is more efficient than repeated individual polls. It uses non-blocking polls
82+
# internally (no GVL release between polls).
83+
#
84+
# Yields the count of events processed after each poll iteration, allowing the caller
85+
# to implement timeout or other termination logic by returning `:stop`.
86+
#
87+
# @yield [count] Called after each poll iteration
88+
# @yieldparam count [Integer] Number of events processed in this iteration
89+
# @yieldreturn [Symbol, Object] Return `:stop` to break the loop, any other value continues
90+
# @return [nil]
91+
# @raise [Rdkafka::ClosedConsumerError] if called on a closed consumer
92+
#
93+
# @note This method holds the inner lock until the queue is empty or `:stop` is returned.
94+
# Other consumer operations will wait until this method returns.
95+
# @note This method is thread-safe as it uses @native_kafka.with_inner synchronization
96+
# @note Do NOT use this if `consumer_poll_set` was set to `true`
97+
#
98+
# @example Drain all pending events
99+
# consumer.events_poll_nb_each { |_count| }
100+
#
101+
# @example With timeout control
102+
# deadline = monotonic_now + timeout_ms
103+
# consumer.events_poll_nb_each do |_count|
104+
# :stop if monotonic_now >= deadline
105+
# end
106+
def events_poll_nb_each
107+
closed_consumer_check(__method__)
108+
109+
@native_kafka.with_inner do |inner|
110+
loop do
111+
count = Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0)
112+
break if count.zero?
113+
break if yield(count) == :stop
114+
end
115+
end
116+
end
117+
118+
# Polls for messages in a non-blocking loop, yielding each message to the caller.
119+
#
120+
# This method processes messages in a single GVL/mutex session until the queue is empty
121+
# or the caller returns `:stop`. It handles the message pointer lifecycle internally,
122+
# ensuring proper cleanup via `rd_kafka_message_destroy`.
123+
#
124+
# @yield [message] Called for each message received
125+
# @yieldparam message [Consumer::Message] The received message
126+
# @yieldreturn [Symbol, Object] Return `:stop` to break the loop, any other value continues
127+
# @return [nil]
128+
# @raise [Rdkafka::ClosedConsumerError] if called on a closed consumer
129+
# @raise [Rdkafka::RdkafkaError] if a Kafka error occurs while polling
130+
#
131+
# @note This method uses `rd_kafka_consumer_poll` to fetch messages, unlike
132+
# `events_poll_nb_each` which uses `rd_kafka_poll` for event callbacks (delivery reports,
133+
# statistics, etc.). For consumers, use this method to receive messages and
134+
# `events_poll_nb_each` for processing background events.
135+
# @note This method holds the inner lock for the duration. Other consumer operations
136+
# will wait until this method returns.
137+
# @note Timeout/max_messages logic should be implemented by the caller
138+
#
139+
# @example Process messages until queue is empty
140+
# consumer.poll_nb_each do |message|
141+
# process(message)
142+
# end
143+
#
144+
# @example Process with early termination
145+
# count = 0
146+
# consumer.poll_nb_each do |message|
147+
# process(message)
148+
# count += 1
149+
# :stop if count >= 10
150+
# end
151+
def poll_nb_each
152+
closed_consumer_check(__method__)
153+
154+
@native_kafka.with_inner do |inner|
155+
loop do
156+
message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll_nb(inner, 0)
157+
break if message_ptr.null?
158+
159+
begin
160+
native_message = Rdkafka::Bindings::Message.new(message_ptr)
161+
162+
if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
163+
raise Rdkafka::RdkafkaError.new(native_message[:err])
164+
end
165+
166+
result = yield Consumer::Message.new(native_message)
167+
break if result == :stop
168+
ensure
169+
Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
170+
end
171+
end
172+
end
173+
end
174+
78175
# @return [Proc] finalizer proc for closing the consumer
79176
# @private
80177
def finalizer

lib/rdkafka/producer.rb

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -258,41 +258,41 @@ def queue_size
258258

259259
alias_method :queue_length, :queue_size
260260

261-
# Drains the producer's event queue by continuously polling until empty or time limit reached.
261+
# Polls for events in a non-blocking loop, yielding the count after each iteration.
262262
#
263-
# This method is useful when you need to ensure delivery callbacks are processed within a
264-
# bounded time, particularly when polling multiple producers from a single thread where
265-
# fair scheduling is required to prevent starvation.
263+
# This method processes delivery callbacks in a single GVL/mutex session, which is more
264+
# efficient than repeated individual polls. It uses non-blocking polls internally
265+
# (no GVL release between polls).
266266
#
267-
# Uses non-blocking polls internally (no GVL release) for efficiency. The method holds
268-
# a single `with_inner` lock for the duration, minimizing per-poll overhead when processing
269-
# many events.
267+
# Yields the count of events processed after each poll iteration, allowing the caller
268+
# to implement timeout or other termination logic by returning `:stop`.
270269
#
271-
# @param timeout_ms [Integer] maximum time to spend draining in milliseconds (default: 100)
272-
# @return [Boolean] true if no more events to process, false if stopped due to time limit
270+
# @yield [count] Called after each poll iteration
271+
# @yieldparam count [Integer] Number of events processed in this iteration
272+
# @yieldreturn [Symbol, Object] Return `:stop` to break the loop, any other value continues
273+
# @return [nil]
273274
# @raise [Rdkafka::ClosedProducerError] if called on a closed producer
274275
#
275-
# @note This method holds the inner lock for up to `timeout_ms`. Other producer operations
276-
# (produce, close, etc.) will wait until this method returns.
276+
# @note This method holds the inner lock until the queue is empty or `:stop` is returned.
277+
# Other producer operations (produce, close, etc.) will wait until this method returns.
277278
# @note This method is thread-safe as it uses @native_kafka.with_inner synchronization
278279
#
279-
# @example Basic usage - drain for up to 100ms
280-
# fully_drained = producer.poll_drain_nb
280+
# @example Drain all pending callbacks
281+
# producer.events_poll_nb_each { |_count| }
281282
#
282-
# @example Round-robin polling multiple producers fairly
283-
# producers.each do |producer|
284-
# fully_drained = producer.poll_drain_nb(10)
285-
# # If false, this producer has more pending events
283+
# @example With timeout control
284+
# deadline = monotonic_now + timeout_ms
285+
# producer.events_poll_nb_each do |_count|
286+
# :stop if monotonic_now >= deadline
286287
# end
287-
def poll_drain_nb(timeout_ms = 100)
288+
def events_poll_nb_each
288289
closed_producer_check(__method__)
289290

290291
@native_kafka.with_inner do |inner|
291-
deadline = monotonic_now_ms + timeout_ms
292-
293292
loop do
294-
break true if Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0).zero?
295-
break false if monotonic_now_ms >= deadline
293+
count = Rdkafka::Bindings.rd_kafka_poll_nb(inner, 0)
294+
break if count.zero?
295+
break if yield(count) == :stop
296296
end
297297
end
298298
end

spec/lib/rdkafka/admin_spec.rb

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,47 @@
972972
end
973973
end
974974

975+
describe "#events_poll_nb_each" do
976+
it "does not raise when queue is empty" do
977+
expect { admin.events_poll_nb_each { |_| } }.not_to raise_error
978+
end
979+
980+
it "yields the count after each poll" do
981+
counts = []
982+
# Stub to return events, then zero
983+
call_count = 0
984+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll_nb) do
985+
call_count += 1
986+
(call_count <= 2) ? 1 : 0
987+
end
988+
989+
admin.events_poll_nb_each { |count| counts << count }
990+
991+
expect(counts).to eq([1, 1])
992+
end
993+
994+
it "stops when block returns :stop" do
995+
iterations = 0
996+
# Stub to always return events
997+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll_nb).and_return(1)
998+
999+
admin.events_poll_nb_each do |_count|
1000+
iterations += 1
1001+
:stop if iterations >= 3
1002+
end
1003+
1004+
expect(iterations).to eq(3)
1005+
end
1006+
1007+
context "when admin is closed" do
1008+
before { admin.close }
1009+
1010+
it "raises ClosedAdminError" do
1011+
expect { admin.events_poll_nb_each { |_| } }.to raise_error(Rdkafka::ClosedAdminError, /events_poll_nb_each/)
1012+
end
1013+
end
1014+
end
1015+
9751016
describe "file descriptor access for fiber scheduler integration" do
9761017
let(:admin) { config.admin(run_polling_thread: false) }
9771018

spec/lib/rdkafka/consumer_spec.rb

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,109 @@ def collect(name, list)
14391439
end
14401440
end
14411441

1442+
describe "#events_poll_nb_each" do
1443+
it "does not raise when queue is empty" do
1444+
expect { consumer.events_poll_nb_each { |_| } }.not_to raise_error
1445+
end
1446+
1447+
it "yields the count after each poll" do
1448+
counts = []
1449+
# Stub to return events, then zero
1450+
call_count = 0
1451+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll_nb) do
1452+
call_count += 1
1453+
(call_count <= 2) ? 1 : 0
1454+
end
1455+
1456+
consumer.events_poll_nb_each { |count| counts << count }
1457+
1458+
expect(counts).to eq([1, 1])
1459+
end
1460+
1461+
it "stops when block returns :stop" do
1462+
iterations = 0
1463+
# Stub to always return events
1464+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll_nb).and_return(1)
1465+
1466+
consumer.events_poll_nb_each do |_count|
1467+
iterations += 1
1468+
:stop if iterations >= 3
1469+
end
1470+
1471+
expect(iterations).to eq(3)
1472+
end
1473+
1474+
context "when consumer is closed" do
1475+
before { consumer.close }
1476+
1477+
it "raises ClosedConsumerError" do
1478+
expect { consumer.events_poll_nb_each { |_| } }.to raise_error(Rdkafka::ClosedConsumerError, /events_poll_nb_each/)
1479+
end
1480+
end
1481+
end
1482+
1483+
describe "#poll_nb_each" do
1484+
it "does not raise when queue is empty" do
1485+
consumer.subscribe(TestTopics.consume_test_topic)
1486+
# Give it a moment to subscribe
1487+
sleep 0.5
1488+
1489+
messages = []
1490+
consumer.poll_nb_each { |msg| messages << msg }
1491+
expect(messages).to be_a(Array)
1492+
end
1493+
1494+
it "yields messages and respects :stop" do
1495+
topic = TestTopics.consume_test_topic
1496+
consumer.subscribe(topic)
1497+
1498+
# Produce some messages
1499+
5.times { |i| producer.produce(topic: topic, payload: "poll_nb_each test #{i}") }
1500+
producer.flush
1501+
1502+
# Use blocking poll first to ensure consumer is ready and messages are fetched
1503+
# poll_nb_each is non-blocking so we need to ensure messages are available first
1504+
deadline = Time.now + 30
1505+
first_message = nil
1506+
while Time.now < deadline && first_message.nil?
1507+
first_message = consumer.poll(100)
1508+
end
1509+
1510+
# Now test that :stop works - we should get exactly one more message then stop
1511+
# (we already consumed one with blocking poll above)
1512+
messages = []
1513+
consumer.poll_nb_each do |message|
1514+
messages << message
1515+
:stop if messages.size >= 1
1516+
end
1517+
1518+
# Should have stopped after exactly 1 message (we got 1 via blocking poll earlier)
1519+
expect(messages.size).to eq(1)
1520+
end
1521+
1522+
it "properly cleans up message pointers" do
1523+
topic = TestTopics.consume_test_topic
1524+
consumer.subscribe(topic)
1525+
1526+
producer.produce(topic: topic, payload: "cleanup test")
1527+
producer.flush
1528+
sleep 2
1529+
1530+
# This should not leak memory - message_destroy is called in ensure
1531+
expect {
1532+
consumer.poll_nb_each { |_| }
1533+
}.not_to raise_error
1534+
end
1535+
1536+
context "when consumer is closed" do
1537+
before { consumer.close }
1538+
1539+
it "raises ClosedConsumerError" do
1540+
expect { consumer.poll_nb_each { |_| } }.to raise_error(Rdkafka::ClosedConsumerError, /poll_nb_each/)
1541+
end
1542+
end
1543+
end
1544+
14421545
describe "file descriptor access for fiber scheduler integration" do
14431546
it "enables IO events on consumer queue" do
14441547
consumer.subscribe(TestTopics.consume_test_topic)

0 commit comments

Comments
 (0)