Skip to content

Issue's in configuring kafka input plugin with TLS #146

@girishms-sentient

Description

@girishms-sentient

Logstash information:

Please include the following information:

  1. Logstash version - 8.8.1
  2. Logstash installation source - Bitnami Logstash helm chart
  3. How is Logstash being run - Kubernetes stateful-set
  4. Kafka Input plugin - https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

Description of the problem including expected versus actual behavior:

I'm having trouble integrating Kafka with Logstash, and Kafka is configured with TLS.
I am getting the following exceptions when trying to provide PKCS12 format TLS certificates in the Kafka input plugin.

Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>}
[2023-07-05T16:16:33,533][ERROR][logstash.javapipeline ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.

More logs to follow.

Helm chart values:

    extraEnvVars:
      - name: KAFKA_HOST
        value: "kafka-kafka-bootstrap"
      - name: KAFKA_PORT
        value: "9093"

    extraVolumeMounts:
      - name: kafka-cluster-certs
        mountPath: /usr/share/logstash/config/certs/kafka-cluster
        readOnly: true
      - name: ample-user
        mountPath: /usr/share/logstash/config/certs/ample-user
        readOnly: true

    extraVolumes: 
      - name: kafka-cluster-certs
        secret:
          secretName: kafka-cluster-ca-cert
          items:
          - key: ca.p12
            path: p12
          - key: ca.password
            path: password
      - name: ample-user
        secret:
          secretName: ample-user
          items:
          - key: user.p12
            path: p12
          - key: user.password
            path: password

    input: |-
      kafka {
        bootstrap_servers => ["${KAFKA_HOST}:${KAFKA_PORT}"]
        topics => ["fluent-log-vault"]
        type => "vault"
        security_protocol => "SSL"
        ssl_truststore_type => "PKCS12"
        ssl_keystore_type => "PKCS12"
        ssl_truststore_location => "/usr/share/logstash/config/certs/kafka-cluster/p12"
        ssl_truststore_password => "/usr/share/logstash/config/certs/password"
        ssl_keystore_location => "/usr/share/logstash/config/certs/ample-user/p12"
        ssl_keystore_password => "/usr/share/logstash/config/certs/ample-user/password"
      }

Detailed Logs:

[2023-07-05T16:16:33,529][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics scheduler closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Closing reporter org.apache.kafka.common.metrics.JmxReporter
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.metrics.Metrics][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Metrics reporters closed
[2023-07-05T16:16:33,530][INFO ][org.apache.kafka.common.utils.AppInfoParser][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] App info kafka.consumer for logstash-0 unregistered
[2023-07-05T16:16:33,531][ERROR][logstash.inputs.kafka    ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] Unable to create Kafka consumer from given configuration {:kafka_error_message=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to construct kafka consumer>, :cause=>#<Java::OrgApacheKafkaCommon::KafkaException: Failed to load SSL keystore /usr/share/logstash/config/certs/ample-user/p12 of type PKCS12>}
[2023-07-05T16:16:33,533][ERROR][logstash.javapipeline    ][main][0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:main
  Plugin: <LogStash::Inputs::Kafka ssl_keystore_location=>"/usr/share/logstash/config/certs/ample-user/p12", ssl_keystore_password=><password>, topics=>["fluent-log-vault"], ssl_truststore_location=>"/usr/share/logstash/config/certs/kafka-cluster/p12", ssl_truststore_password=><password>, security_protocol=>"SSL", id=>"0a81a40d970eb2a3701645643b7b64f3e3bed2443d45da944a470013bf8ce0ef", type=>"vault", ssl_truststore_type=>"PKCS12", bootstrap_servers=>"kafka-kafka-bootstrap:9093", ssl_keystore_type=>"PKCS12", codec=><LogStash::Codecs::Plain id=>"plain_c97407f8-f507-4c16-a8a6-8cc822b8e0eb", enable_metric=>true, charset=>"UTF-8">, enable_metric=>true, connections_max_idle_ms=>540000, metadata_max_age_ms=>300000, request_timeout_ms=>40000, schema_registry_ssl_keystore_type=>"jks", schema_registry_ssl_truststore_type=>"jks", schema_registry_validation=>"auto", auto_commit_interval_ms=>5000, check_crcs=>true, client_dns_lookup=>"use_all_dns_ips", client_id=>"logstash", consumer_threads=>1, enable_auto_commit=>true, fetch_max_bytes=>52428800, fetch_max_wait_ms=>500, group_id=>"logstash", heartbeat_interval_ms=>3000, isolation_level=>"read_uncommitted", key_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", max_poll_interval_ms=>300000, max_partition_fetch_bytes=>1048576, max_poll_records=>500, receive_buffer_bytes=>32768, reconnect_backoff_ms=>50, retry_backoff_ms=>100, send_buffer_bytes=>131072, session_timeout_ms=>10000, value_deserializer_class=>"org.apache.kafka.common.serialization.StringDeserializer", poll_timeout_ms=>100, ssl_endpoint_identification_algorithm=>"https", sasl_mechanism=>"GSSAPI", decorate_events=>"none">
Error: Failed to construct kafka consumer
  Exception: Java::OrgApacheKafkaCommon::KafkaException
  Stack: org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:830)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:666)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:647)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(org/apache/kafka/clients/consumer/KafkaConsumer.java:627)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(jdk/internal/reflect/NativeConstructorAccessorImpl.java:77)
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(jdk/internal/reflect/DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstanceWithCaller(java/lang/reflect/Constructor.java:499)
java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:480)
org.jruby.javasupport.JavaConstructor.newInstanceDirect(org/jruby/javasupport/JavaConstructor.java:269)
org.jruby.RubyClass.new(org/jruby/RubyClass.java:890)
org.jruby.RubyClass$INVOKER$i$newInstance.call(org/jruby/RubyClass$INVOKER$i$newInstance.gen)
RUBY.create_consumer(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:477)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:297)
org.jruby.RubyEnumerable$18.call(org/jruby/RubyEnumerable.java:815)
org.jruby.RubyEnumerator$1.call(org/jruby/RubyEnumerator.java:400)
org.jruby.RubyFixnum.times(org/jruby/RubyFixnum.java:308)
org.jruby.RubyInteger$INVOKER$i$0$0$times.call(org/jruby/RubyInteger$INVOKER$i$0$0$times.gen)
org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514)
org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:502)
org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:387)
org.jruby.RubyEnumerator.__each__(org/jruby/RubyEnumerator.java:396)
org.jruby.RubyEnumerator.each(org/jruby/RubyEnumerator.java:392)
org.jruby.RubyEnumerator$INVOKER$i$each.call(org/jruby/RubyEnumerator$INVOKER$i$each.gen)
org.jruby.RubyEnumerable.collectCommon(org/jruby/RubyEnumerable.java:807)
org.jruby.RubyEnumerable.map(org/jruby/RubyEnumerable.java:799)
org.jruby.RubyEnumerable$INVOKER$s$0$0$map.call(org/jruby/RubyEnumerable$INVOKER$s$0$0$map.gen)
RUBY.run(/opt/bitnami/logstash/vendor/bundle/jruby/2.6.0/gems/logstash-integration-kafka-11.2.1-java/lib/logstash/inputs/kafka.rb:295)
RUBY.inputworker(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:414)
RUBY.start_input(/opt/bitnami/logstash/logstash-core/lib/logstash/java_pipeline.rb:405)
org.jruby.RubyProc.call(org/jruby/RubyProc.java:309)
java.lang.Thread.run(java/lang/Thread.java:833)

Is there any way to solve this problem?

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions