Skip to content

Commit 708cc07

Browse files
committed
add discard_kafka_delivery_failed_regex to nullify specific delivery failed events by regexp
Signed-off-by: kubotat <[email protected]>
1 parent fd3ec0f commit 708cc07

File tree

1 file changed

+10
-37
lines changed

1 file changed

+10
-37
lines changed

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ class Fluent::Rdkafka2Output < Output
6565
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
6666
config_param :default_topic, :string, :default => nil,
6767
:desc => "Default output topic when record doesn't have topic field"
68-
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
69-
config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error"
7068
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
7169
config_param :default_message_key, :string, :default => nil
7270
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
@@ -112,6 +110,7 @@ class Fluent::Rdkafka2Output < Output
112110
config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for rdkafka timestamp'
113111
config_param :max_send_limit_bytes, :size, :default => nil
114112
config_param :discard_kafka_delivery_failed, :bool, :default => false
113+
config_param :discard_kafka_delivery_failed_regex, :regexp, :default => nil
115114
config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
116115
config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
117116
config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
@@ -235,9 +234,6 @@ def add(level, message = nil)
235234
@rdkafka = Rdkafka::Config.new(config)
236235

237236
if @default_topic.nil?
238-
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
239-
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
240-
end
241237
if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
242238
log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
243239
end
@@ -461,61 +457,38 @@ def write(chunk)
461457
if @discard_kafka_delivery_failed
462458
log.warn "Delivery failed. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
463459
else
464-
log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
465-
# Raise exception to retry sendind messages
466-
raise e
460+
if @discard_kafka_delivery_failed_regex != nil && @discard_kafka_delivery_failed_regex.match?(e.to_s)
461+
log.warn "Delivery failed and matched regexp pattern #{@discard_kafka_delivery_failed_regex}. Discard events:", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
462+
else
463+
log.warn "Send exception occurred: #{e} at #{e.backtrace.first}"
464+
# Raise exception to retry sendind messages
465+
raise e
466+
end
467467
end
468468
ensure
469469
@writing_threads_mutex.synchronize { @writing_threads.delete(Thread.current) }
470470
end
471471

472472
def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers, time)
473473
attempt = 0
474-
actual_topic = topic
475-
476474
loop do
477475
begin
478476
@enqueue_rate.raise_if_limit_exceeded(record_buf.bytesize) if @enqueue_rate
479-
return producer.produce(topic: actual_topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
477+
return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
480478
rescue EnqueueRate::LimitExceeded => e
481479
@enqueue_rate.revert if @enqueue_rate
482480
duration = e.next_retry_clock - Fluent::Clock.now
483481
sleep(duration) if duration > 0.0
484482
rescue Exception => e
485483
@enqueue_rate.revert if @enqueue_rate
486-
487-
if !e.respond_to?(:code)
488-
raise e
489-
end
490-
491-
case e.code
492-
when :queue_full
484+
if e.respond_to?(:code) && e.code == :queue_full
493485
if attempt <= @max_enqueue_retries
494486
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
495487
sleep @enqueue_retry_backoff
496488
attempt += 1
497489
else
498490
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
499491
end
500-
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41
501-
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
502-
when :unknown_topic
503-
if @use_default_for_unknown_topic && actual_topic != @default_topic
504-
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic"
505-
actual_topic = @default_topic
506-
retry
507-
end
508-
raise e
509-
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305
510-
# RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
511-
when :unknown_partition
512-
if @use_default_for_unknown_partition_error && actual_topic != @default_topic
513-
log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'"
514-
actual_topic = @default_topic
515-
retry
516-
end
517-
518-
raise e
519492
else
520493
raise e
521494
end

0 commit comments

Comments
 (0)