Skip to content

Commit deb3d6f

Browse files
authored
Merge pull request #491 from raytung/feat/out_rdkafka2_unknown_partition
feat(out_rdkafka2): add parameter `use_default_for_unknown_partition_error`
2 parents efc612d + 51245c9 commit deb3d6f

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,8 @@ You need to install rdkafka gem.
510510
partition_key_key (string) :default => 'partition_key'
511511
message_key_key (string) :default => 'message_key'
512512
default_topic (string) :default => nil
513-
use_default_for_unknown_topic (bool) :default => false
513+
use_default_for_unknown_topic (bool) :default => false
514+
use_default_for_unknown_partition_error (bool) :default => false
514515
default_partition_key (string) :default => nil
515516
default_message_key (string) :default => nil
516517
exclude_topic_key (bool) :default => false

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class Fluent::Rdkafka2Output < Output
6666
config_param :default_topic, :string, :default => nil,
6767
:desc => "Default output topic when record doesn't have topic field"
6868
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
69+
config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error"
6970
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
7071
config_param :default_message_key, :string, :default => nil
7172
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
@@ -234,8 +235,8 @@ def add(level, message = nil)
234235
@rdkafka = Rdkafka::Config.new(config)
235236

236237
if @default_topic.nil?
237-
if @use_default_for_unknown_topic
238-
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic is true"
238+
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
239+
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
239240
end
240241
if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
241242
log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
@@ -504,6 +505,16 @@ def enqueue_with_retry(producer, topic, record_buf, message_key, partition, head
504505
actual_topic = @default_topic
505506
retry
506507
end
508+
raise e
509+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305
510+
# RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
511+
when :unknown_partition
512+
if @use_default_for_unknown_partition_error && actual_topic != @default_topic
513+
log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'"
514+
actual_topic = @default_topic
515+
retry
516+
end
517+
507518
raise e
508519
else
509520
raise e

0 commit comments

Comments
 (0)