Skip to content

Commit 2f14b94

Browse files
committed
[Chore] Refresh the topics when reading messages
1 parent 94e9f13 commit 2f14b94

File tree

4 files changed

+52
-30
lines changed

4 files changed

+52
-30
lines changed

lib/kafka/client.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
336336
# @param fetcher_max_queue_size [Integer] max number of items in the fetch queue that
337337
# are stored for further processing. Note, that each item in the queue represents a
338338
# response from a single broker.
339+
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
340+
# If it is 0, the topic list won't be refreshed (default)
341+
# If it is n (n > 0), the topic list will be refreshed every n seconds
339342
# @return [Consumer]
340343
def consumer(
341344
group_id:,
@@ -345,7 +348,8 @@ def consumer(
345348
offset_commit_threshold: 0,
346349
heartbeat_interval: 10,
347350
offset_retention_time: nil,
348-
fetcher_max_queue_size: 100
351+
fetcher_max_queue_size: 100,
352+
refresh_topic_interval: 0
349353
)
350354
cluster = initialize_cluster
351355

@@ -399,6 +403,7 @@ def consumer(
399403
fetcher: fetcher,
400404
session_timeout: session_timeout,
401405
heartbeat: heartbeat,
406+
refresh_topic_interval: refresh_topic_interval
402407
)
403408
end
404409

lib/kafka/consumer.rb

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ module Kafka
4444
#
4545
class Consumer
4646

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
47+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
4848
@cluster = cluster
4949
@logger = TaggedLogger.new(logger)
5050
@instrumenter = instrumenter
@@ -53,6 +53,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5353
@session_timeout = session_timeout
5454
@fetcher = fetcher
5555
@heartbeat = heartbeat
56+
@refresh_topic_interval = refresh_topic_interval
5657

5758
@pauses = Hash.new {|h, k|
5859
h[k] = Hash.new {|h2, k2|
@@ -74,9 +75,11 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
7475
# some already consumed messages
7576
@current_offsets = Hash.new { |h, k| h[k] = {} }
7677

77-
# Hash storing topics that are already being subscribed
78-
# When subcribing to a new topic, if it's already being subscribed before, skip it
79-
@subscribed_topics = Set.new
78+
# Map storing subscribed topics with their configuration
79+
@subscribed_topics = Concurrent::Map.new
80+
81+
# Set storing topics that matched topics in @subscribed_topics
82+
@matched_topics = Set.new
8083

8184
# Whether join_group must be executed again because new topics are added
8285
@join_group_for_new_topics = false
@@ -100,29 +103,16 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
100103
# checkpoint.
101104
# @param max_bytes_per_partition [Integer] the maximum amount of data fetched
102105
# from a single partition at a time.
103-
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
104-
# Only work when topic_or_regex is a regex
105-
# If it is 0, the topic list won't be refreshed (default)
106-
# If it is n (n > 0), the topic list will be refreshed every n seconds
107106
# @return [nil]
108-
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576, refresh_topic_interval: 0)
107+
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
109108
default_offset ||= start_from_beginning ? :earliest : :latest
110109

111-
unless topic_or_regex.is_a?(Regexp)
112-
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
113-
return
114-
end
115-
116-
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
117-
if refresh_topic_interval > 0
118-
@thread ||= Thread.new do
119-
while true
120-
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition) if running?
121-
sleep refresh_topic_interval
122-
end
123-
end
124-
@thread.abort_on_exception = true
125-
end
110+
@subscribed_topics[topic_or_regex] = {
111+
default_offset: default_offset,
112+
start_from_beginning: start_from_beginning,
113+
max_bytes_per_partition: max_bytes_per_partition
114+
}
115+
scan_for_subscribing
126116

127117
nil
128118
end
@@ -228,6 +218,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
228218
)
229219

230220
consumer_loop do
221+
refresh_topic_list_if_enabled
231222
batches = fetch_batches
232223

233224
batches.each do |batch|
@@ -316,6 +307,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
316307
)
317308

318309
consumer_loop do
310+
refresh_topic_list_if_enabled
319311
batches = fetch_batches
320312

321313
batches.each do |batch|
@@ -473,6 +465,7 @@ def make_final_offsets_commit!(attempts = 3)
473465

474466
def join_group
475467
@join_group_for_new_topics = false
468+
476469
old_generation_id = @group.generation_id
477470

478471
@group.join
@@ -534,6 +527,14 @@ def resume_paused_partitions!
534527
end
535528
end
536529

530+
def refresh_topic_list_if_enabled
531+
return if @refresh_topic_interval <= 0
532+
return if @refreshed_at && @refreshed_at + @refresh_topic_interval > Time.now
533+
534+
scan_for_subscribing
535+
@refreshed_at = Time.now
536+
end
537+
537538
def fetch_batches
538539
# Return early if the consumer has been stopped.
539540
return [] if shutting_down?
@@ -592,20 +593,35 @@ def clear_current_offsets(excluding: {})
592593
end
593594
end
594595

596+
def scan_for_subscribing
597+
@subscribed_topics.keys.each do |topic_or_regex|
598+
default_offset = @subscribed_topics[topic_or_regex][:default_offset]
599+
start_from_beginning = @subscribed_topics[topic_or_regex][:start_from_beginning]
600+
max_bytes_per_partition = @subscribed_topics[topic_or_regex][:max_bytes_per_partition]
601+
602+
if topic_or_regex.is_a?(Regexp)
603+
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
604+
else
605+
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
606+
end
607+
end
608+
end
609+
595610
def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_bytes_per_partition)
596611
cluster_topics.select { |topic| topic =~ topic_regex }.each do |topic|
597612
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
598613
end
599614
end
600615

601616
def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
602-
return if @subscribed_topics.include?(topic)
603-
@subscribed_topics.add(topic)
617+
return if @matched_topics.include?(topic)
618+
@matched_topics.add(topic)
604619
@join_group_for_new_topics = true
605620

606621
@group.subscribe(topic)
607622
@offset_manager.set_default_offset(topic, default_offset)
608623
@fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition)
624+
@cluster.mark_as_stale!
609625
end
610626

611627
def cluster_topics

spec/consumer_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
allow(cluster).to receive(:add_target_topics)
138138
allow(cluster).to receive(:disconnect)
139139
allow(cluster).to receive(:refresh_metadata_if_necessary!)
140+
allow(cluster).to receive(:mark_as_stale!)
140141

141142
allow(offset_manager).to receive(:commit_offsets)
142143
allow(offset_manager).to receive(:commit_offsets_if_necessary)

spec/functional/consumer_group_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@
131131
received_messages = []
132132

133133
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
134-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time)
135-
consumer.subscribe(/#{topic_a}|#{topic_b}/, refresh_topic_interval: 2)
134+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, refresh_topic_interval: 1)
135+
consumer.subscribe(/#{topic_a}|#{topic_b}/)
136136

137137
thread = Thread.new do
138138
consumer.each_message do |message|
@@ -145,7 +145,7 @@
145145
end
146146
thread.abort_on_exception = true
147147

148-
sleep 2
148+
sleep 1
149149
messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) }
150150
producer.deliver_messages
151151

0 commit comments

Comments
 (0)