Skip to content

Commit 84ac9a4

Browse files
authored
Expose alternative poll flow for consumer (#356)
1 parent 77bf4ab commit 84ac9a4

File tree

6 files changed

+109
-4
lines changed

6 files changed

+109
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
- **[Feature]** Add `Admin#create_partitions` (mensfeld)
66
- **[Feature]** Add `Admin#delete_group` utility (piotaixr)
77
- **[Feature]** Add Create and Delete ACL Feature To Admin Functions (vgnanasekaran)
8+
- [Enhancement] Expose alternative way of managing consumer events via a separate queue (mensfeld)
89
- [Enhancement] Bump librdkafka to 2.3.0 (mensfeld)
910
- [Enhancement] Increase the `#lag` and `#query_watermark_offsets` default timeouts from 100ms to 1000ms. This will compensate for network glitches and remote clusters operations (mensfeld)
11+
- [Change] Use `SecureRandom.uuid` instead of `random` for test consumer groups (mensfeld)
1012

1113
## 0.14.0 (2023-11-21)
1214
- [Enhancement] Add `raise_response_error` flag to the `Rdkafka::AbstractHandle`.

lib/rdkafka/config.rb

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def self.opaques
112112
def initialize(config_hash = {})
113113
@config_hash = DEFAULT_CONFIG.merge(config_hash)
114114
@consumer_rebalance_listener = nil
115+
@consumer_poll_set = true
115116
end
116117

117118
# Set a config option.
@@ -140,6 +141,22 @@ def consumer_rebalance_listener=(listener)
140141
@consumer_rebalance_listener = listener
141142
end
142143

144+
# Should we use a single queue for the underlying consumer and events.
145+
#
146+
# This is an advanced API that allows for more granular control of the polling process.
147+
# When this value is set to `false` (`true` by defualt), there will be two queues that need to
148+
# be polled:
149+
# - main librdkafka queue for events
150+
# - consumer queue with messages and rebalances
151+
#
152+
# It is recommended to use the defaults and only set it to `false` in advance multi-threaded
153+
# and complex cases where granular events handling control is needed.
154+
#
155+
# @param poll_set [Boolean]
156+
def consumer_poll_set=(poll_set)
157+
@consumer_poll_set = poll_set
158+
end
159+
143160
# Creates a consumer with this configuration.
144161
#
145162
# @return [Consumer] The created consumer
@@ -158,8 +175,8 @@ def consumer
158175
# Create native client
159176
kafka = native_kafka(config, :rd_kafka_consumer)
160177

161-
# Redirect the main queue to the consumer
162-
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
178+
# Redirect the main queue to the consumer queue
179+
Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) if @consumer_poll_set
163180

164181
# Return consumer with Kafka client
165182
Rdkafka::Consumer.new(
@@ -187,7 +204,11 @@ def producer
187204
# Return producer with Kafka client
188205
partitioner_name = self[:partitioner] || self["partitioner"]
189206
Rdkafka::Producer.new(
190-
Rdkafka::NativeKafka.new(native_kafka(config, :rd_kafka_producer), run_polling_thread: true, opaque: opaque),
207+
Rdkafka::NativeKafka.new(
208+
native_kafka(config, :rd_kafka_producer),
209+
run_polling_thread: true,
210+
opaque: opaque
211+
),
191212
partitioner_name
192213
).tap do |producer|
193214
opaque.producer = producer

lib/rdkafka/consumer.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,32 @@ def poll(timeout_ms)
531531
end
532532
end
533533

534+
# Polls the main rdkafka queue (not the consumer one). Do **NOT** use it if `consumer_poll_set`
535+
# was set to `true`.
536+
#
537+
# Events will cause application-provided callbacks to be called.
538+
#
539+
# Events (in the context of the consumer):
540+
# - error callbacks
541+
# - stats callbacks
542+
# - any other callbacks supported by librdkafka that are not part of the consumer_poll, that
543+
# would have a callback configured and activated.
544+
#
545+
# This method needs to be called at regular intervals to serve any queued callbacks waiting to
546+
# be called. When in use, does **NOT** replace `#poll` but needs to run complementary with it.
547+
#
548+
# @param timeout_ms [Integer] poll timeout. If set to 0 will run async, when set to -1 will
549+
# block until any events available.
550+
#
551+
# @note This method technically should be called `#poll` and the current `#poll` should be
552+
# called `#consumer_poll` though we keep the current naming convention to make it backward
553+
# compatible.
554+
def events_poll(timeout_ms = 0)
555+
@native_kafka.with_inner do |inner|
556+
Rdkafka::Bindings.rd_kafka_poll(inner, timeout_ms)
557+
end
558+
end
559+
534560
# Poll for new messages and yield for each received one. Iteration
535561
# will end when the consumer is closed.
536562
#

spec/rdkafka/config_spec.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ def call(stats); end
113113
consumer.close
114114
end
115115

116+
it "should create a consumer with consumer_poll_set set to false" do
117+
config = rdkafka_consumer_config
118+
config.consumer_poll_set = false
119+
consumer = config.consumer
120+
expect(consumer).to be_a Rdkafka::Consumer
121+
consumer.close
122+
end
123+
116124
it "should raise an error when creating a consumer with invalid config" do
117125
config = Rdkafka::Config.new('invalid.key' => 'value')
118126
expect {

spec/rdkafka/consumer_spec.rb

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@
5454
consumer.subscription
5555
}.to raise_error(Rdkafka::RdkafkaError)
5656
end
57+
58+
context "when using consumer without the poll set" do
59+
let(:consumer) do
60+
config = rdkafka_consumer_config
61+
config.consumer_poll_set = false
62+
config.consumer
63+
end
64+
65+
it "should subscribe, unsubscribe and return the subscription" do
66+
expect(consumer.subscription).to be_empty
67+
68+
consumer.subscribe("consume_test_topic")
69+
70+
expect(consumer.subscription).not_to be_empty
71+
expected_subscription = Rdkafka::Consumer::TopicPartitionList.new.tap do |list|
72+
list.add_topic("consume_test_topic")
73+
end
74+
expect(consumer.subscription).to eq expected_subscription
75+
76+
consumer.unsubscribe
77+
78+
expect(consumer.subscription).to be_empty
79+
end
80+
end
5781
end
5882

5983
describe "#pause and #resume" do
@@ -1054,6 +1078,29 @@ def send_one_message(val)
10541078
end
10551079
end
10561080

1081+
# Only relevant in case of a consumer with separate queues
1082+
describe '#events_poll' do
1083+
let(:stats) { [] }
1084+
1085+
before { Rdkafka::Config.statistics_callback = ->(published) { stats << published } }
1086+
1087+
after { Rdkafka::Config.statistics_callback = nil }
1088+
1089+
let(:consumer) do
1090+
config = rdkafka_consumer_config('statistics.interval.ms': 100)
1091+
config.consumer_poll_set = false
1092+
config.consumer
1093+
end
1094+
1095+
it "expect to run events_poll, operate and propagate stats on events_poll and not poll" do
1096+
consumer.subscribe("consume_test_topic")
1097+
consumer.poll(1_000)
1098+
expect(stats).to be_empty
1099+
consumer.events_poll(-1)
1100+
expect(stats).not_to be_empty
1101+
end
1102+
end
1103+
10571104
describe "a rebalance listener" do
10581105
let(:consumer) do
10591106
config = rdkafka_consumer_config

spec/spec_helper.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def rdkafka_consumer_config(config_overrides={})
3636
# Add consumer specific fields to it
3737
config[:"auto.offset.reset"] = "earliest"
3838
config[:"enable.partition.eof"] = false
39-
config[:"group.id"] = "ruby-test-#{Random.new.rand(0..1_000_000)}"
39+
config[:"group.id"] = "ruby-test-#{SecureRandom.uuid}"
4040
# Enable debug mode if required
4141
if ENV["DEBUG_CONSUMER"]
4242
config[:debug] = "cgrp,topic,fetch"
@@ -135,6 +135,7 @@ def notify_listener(listener, &block)
135135
rake_test_topic: 3,
136136
watermarks_test_topic: 3,
137137
partitioner_test_topic: 25,
138+
example_topic: 1
138139
}.each do |topic, partitions|
139140
create_topic_handle = admin.create_topic(topic.to_s, partitions, 1)
140141
begin

0 commit comments

Comments
 (0)