Skip to content

Commit efc612d

Browse files
authored
Merge pull request #490 from raytung/task/rdkafka2-unknown-topic
feat(out_rdkafka2): adds `use_default_for_unknown_topic` configuration
2 parents c21236f + 6222730 commit efc612d

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ You need to install rdkafka gem.
510510
partition_key_key (string) :default => 'partition_key'
511511
message_key_key (string) :default => 'message_key'
512512
default_topic (string) :default => nil
513+
use_default_for_unknown_topic (bool) :default => false
513514
default_partition_key (string) :default => nil
514515
default_message_key (string) :default => nil
515516
exclude_topic_key (bool) :default => false

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class Fluent::Rdkafka2Output < Output
6565
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
6666
config_param :default_topic, :string, :default => nil,
6767
:desc => "Default output topic when record doesn't have topic field"
68+
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
6869
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
6970
config_param :default_message_key, :string, :default => nil
7071
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
@@ -233,6 +234,9 @@ def add(level, message = nil)
233234
@rdkafka = Rdkafka::Config.new(config)
234235

235236
if @default_topic.nil?
237+
if @use_default_for_unknown_topic
238+
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic is true"
239+
end
236240
if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
237241
log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
238242
end
@@ -466,24 +470,41 @@ def write(chunk)
466470

467471
def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers, time)
468472
attempt = 0
473+
actual_topic = topic
474+
469475
loop do
470476
begin
471477
@enqueue_rate.raise_if_limit_exceeded(record_buf.bytesize) if @enqueue_rate
472-
return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
478+
return producer.produce(topic: actual_topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
473479
rescue EnqueueRate::LimitExceeded => e
474480
@enqueue_rate.revert if @enqueue_rate
475481
duration = e.next_retry_clock - Fluent::Clock.now
476482
sleep(duration) if duration > 0.0
477483
rescue Exception => e
478484
@enqueue_rate.revert if @enqueue_rate
479-
if e.respond_to?(:code) && e.code == :queue_full
485+
486+
if !e.respond_to?(:code)
487+
raise e
488+
end
489+
490+
case e.code
491+
when :queue_full
480492
if attempt <= @max_enqueue_retries
481493
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
482494
sleep @enqueue_retry_backoff
483495
attempt += 1
484496
else
485497
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
486498
end
499+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41
500+
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
501+
when :unknown_topic
502+
if @use_default_for_unknown_topic && actual_topic != @default_topic
503+
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic"
504+
actual_topic = @default_topic
505+
retry
506+
end
507+
raise e
487508
else
488509
raise e
489510
end

0 commit comments

Comments
 (0)