Skip to content

Conversation

@mashhurs
Copy link
Contributor

@mashhurs mashhurs commented Aug 27, 2025

Description

Current implementation hides the detail errors coming from kafka-client when initializing a producer. Kafka client creates a chain of KafkaExceptions (e.g KafkaException("message", new KafkaException("NetworkClient error", new KafkaException())) but current following line shows only first error.

logger.error("Unable to create Kafka producer from given configuration",
                   :kafka_error_message => e,
                   :cause => e.respond_to?(:getCause) ? e.getCause() : nil)

We need iterate over error chains and show to provide descriptive error messages.

With this PR , we get chain of exception details. For the example when password is incorrect, we get keystore password was incorrect in the chain errors.

logstash  | [2025-08-28T00:02:01,315][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaIo::IOException: keystore password was incorrect"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaSecurity::UnrecoverableKeyException: failed to decrypt safe contents entry: javax.crypto.BadPaddingException: Given final block not properly padded. Such issues can arise if a bad key is used during decryption."}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.javapipeline    ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer>, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:476)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:297)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:324)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:309)", "jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(jdk/internal/reflect/DirectConstructorHandleAccessor.java:62)", "java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:502)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:486)", "org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:165)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:922)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)", "RUBY.create_producer(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:392)", "RUBY.register(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:232)", "org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:598)", "org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:349)", "org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.reg(org/logstash/config/ir/compiler/OutputStrategyExt.java:275)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.register(org/logstash/config/ir/compiler/OutputStrategyExt.java:131)", "org.logstash.config.ir.compiler.OutputDelegatorExt.doRegister(org/logstash/config/ir/compiler/OutputDelegatorExt.java:126)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.register(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:69)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register.call(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:245)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(org/jruby/RubyArray$INVOKER$i$0$0$each.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:244)", "RUBY.maybe_setup_out_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:622)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:257)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:198)", "RUBY.start(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:150)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:354)", "java.lang.Thread.run(java/lang/Thread.java:1583)"], "pipeline.sources"=>["/usr/share/logstash/config/kafkaout-kerberos.conf"], :thread=>"#<Thread:0x792cc33a /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
logstash  | [2025-08-28T00:02:01,317][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
logstash  | [2025-08-28T00:02:01,328][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

@mashhurs mashhurs linked an issue Aug 27, 2025 that may be closed by this pull request
@mashhurs mashhurs marked this pull request as ready for review August 27, 2025 23:08
:cause => e.respond_to?(:getCause) ? e.getCause() : nil)
:cause => cause_error)
while cause_error != nil
logger.error("Kafka producer error chain",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will duplicate the Error log message right? Should we re-assign cause_error with getCause, or just put the log message in the while loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error("Unable to create Kafka producer from given configuration",
:kafka_error_message => e,
:cause => e.respond_to?(:getCause) ? e.getCause() : nil)
:cause => cause_error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:cause => cause_error)
:cause => cause_error)
cause_error = e.respond_to?(:getCause) ? e.getCause() : nil

Avoid duplicating the top level err code when unwinding the error stack.

logstash  | [2025-08-27T22:20:02,962][ERROR][logstash.outputs.kafka   ][main] Unable to create Kafka producer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>}
logstash  | [2025-08-27T22:20:02,962][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient>}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, it duplicates.

WDYT for this commit? It just goes throw each exception where cause will be inner one.

logstash  | [2025-08-28T00:02:01,315][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to create new NetworkClient"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/kerberos/ssl/logstash.keystore.jks of type JKS"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaIo::IOException: keystore password was incorrect"}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.outputs.kafka   ][main] Kafka producer error chain {:kafka_error_message=>"Java::JavaSecurity::UnrecoverableKeyException: failed to decrypt safe contents entry: javax.crypto.BadPaddingException: Given final block not properly padded. Such issues can arise if a bad key is used during decryption."}
logstash  | [2025-08-28T00:02:01,316][ERROR][logstash.javapipeline    ][main] Pipeline error {:pipeline_id=>"main", :exception=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka producer>, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:476)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:297)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:324)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:309)", "jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(jdk/internal/reflect/DirectConstructorHandleAccessor.java:62)", "java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:502)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:486)", "org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:165)", "org.jruby.RubyClass.new(org/jruby/RubyClass.java:922)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)", "RUBY.create_producer(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:392)", "RUBY.register(/usr/share/logstash/vendor/local_gems/22313566/logstash-integration-kafka-11.6.4-universal.arm64e-darwin-24/lib/logstash/outputs/kafka.rb:232)", "org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:598)", "org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:349)", "org.logstash.config.ir.compiler.OutputStrategyExt$SimpleAbstractOutputStrategyExt.reg(org/logstash/config/ir/compiler/OutputStrategyExt.java:275)", "org.logstash.config.ir.compiler.OutputStrategyExt$AbstractOutputStrategyExt.register(org/logstash/config/ir/compiler/OutputStrategyExt.java:131)", "org.logstash.config.ir.compiler.OutputDelegatorExt.doRegister(org/logstash/config/ir/compiler/OutputDelegatorExt.java:126)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt.register(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:69)", "org.logstash.config.ir.compiler.AbstractOutputDelegatorExt$INVOKER$i$0$0$register.call(org/logstash/config/ir/compiler/AbstractOutputDelegatorExt$INVOKER$i$0$0$register.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:245)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1981)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(org/jruby/RubyArray$INVOKER$i$0$0$each.gen)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:244)", "RUBY.maybe_setup_out_plugins(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:622)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:257)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:198)", "RUBY.start(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:150)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:354)", "java.lang.Thread.run(java/lang/Thread.java:1583)"], "pipeline.sources"=>["/usr/share/logstash/config/kafkaout-kerberos.conf"], :thread=>"#<Thread:0x792cc33a /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:138 run>"}
logstash  | [2025-08-28T00:02:01,317][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
logstash  | [2025-08-28T00:02:01,328][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like it. Simplifies the control flow too. Nice.

@mashhurs mashhurs merged commit c93f09b into logstash-plugins:main Aug 28, 2025
3 checks passed
@mashhurs mashhurs deleted the display-chain-errors branch August 28, 2025 17:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve logging experience when kafka-output cannot create producer

2 participants