diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f970a2..e7d6392 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.6.0 + - Support additional `oauth` and `sasl` configuration options for configuring kafka client [#189](https://github.com/logstash-plugins/logstash-integration-kafka/pull/189) + ## 11.5.4 - Update kafka client to 3.8.1 and transitive dependencies [#188](https://github.com/logstash-plugins/logstash-integration-kafka/pull/188) - Removed Jar Dependencies dependency [#187](https://github.com/logstash-plugins/logstash-integration-kafka/pull/187) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index c5ebf24..ffe67f8 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -131,6 +131,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -556,13 +563,62 @@ retries are exhausted. The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop. -[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""] +[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"] ===== `sasl_client_callback_handler_class` -* Value type is <> -* There is no default value for this setting. + * Value type is <> + * There is no default value for this setting. The SASL client callback handler class the specified SASL mechanism should use. +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"] +===== `sasl_oauthbearer_token_endpoint_url` + * Value type is <> + * There is no default value for this setting. + +The URL for the OAuth 2.0 issuer token endpoint. + +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"] +===== `sasl_oauthbearer_scope_claim_name` + * Value type is <> + * Default value is `"scope"` + +(optional) The override name of the scope claim. + +[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"] +===== `sasl_login_callback_handler_class` + * Value type is <> + * There is no default value for this setting. + +The SASL login callback handler class the specified SASL mechanism should use. + +[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"] +===== `sasl_login_connect_timeout_ms` + * Value type is <> + * There is no default value for this setting. + +(optional) The duration, in milliseconds, for HTTPS connect timeout + +[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"] +===== `sasl_login_read_timeout_ms` + * Value type is <> + * There is no default value for this setting. + +(optional) The duration, in milliseconds, for HTTPS read timeout. + +[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"] +===== `sasl_login_retry_backoff_ms` + * Value type is <> + * Default value is `100` milliseconds. + +(optional) The duration, in milliseconds, to wait between HTTPS call attempts. + +[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"] +===== `sasl_login_retry_backoff_max_ms` + * Value type is <> + * Default value is `10000` milliseconds. + +(optional) The maximum duration, in milliseconds, for HTTPS call attempts. + [id="plugins-{type}s-{plugin}-sasl_jaas_config"] ===== `sasl_jaas_config` diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 4cd585f..64f3dbe 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -102,6 +102,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -392,13 +399,62 @@ In versions prior to 10.5.0, any exception is retried indefinitely unless the `r The amount of time to wait before attempting to retry a failed produce request to a given topic partition. -[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""] +[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"] ===== `sasl_client_callback_handler_class` -* Value type is <> -* There is no default value for this setting. + * Value type is <> + * There is no default value for this setting. The SASL client callback handler class the specified SASL mechanism should use. +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"] +===== `sasl_oauthbearer_token_endpoint_url` + * Value type is <> + * There is no default value for this setting. + +The URL for the OAuth 2.0 issuer token endpoint. + +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"] +===== `sasl_oauthbearer_scope_claim_name` + * Value type is <> + * Default value is `"scope"` + +(optional) The override name of the scope claim. + +[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"] +===== `sasl_login_callback_handler_class` + * Value type is <> + * There is no default value for this setting. + +The SASL login callback handler class the specified SASL mechanism should use. + +[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"] +===== `sasl_login_connect_timeout_ms` + * Value type is <> + * There is no default value for this setting. + +(optional) The duration, in milliseconds, for HTTPS connect timeout + +[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"] +===== `sasl_login_read_timeout_ms` + * Value type is <> + * There is no default value for this setting. + +(optional) The duration, in milliseconds, for HTTPS read timeout. + +[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"] +===== `sasl_login_retry_backoff_ms` + * Value type is <> + * Default value is `100` milliseconds. + +(optional) The duration, in milliseconds, to wait between HTTPS call attempts. + +[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"] +===== `sasl_login_retry_backoff_max_ms` + * Value type is <> + * Default value is `10000` milliseconds. + +(optional) The maximum duration, in milliseconds, for HTTPS call attempts. + [id="plugins-{type}s-{plugin}-sasl_jaas_config"] ===== `sasl_jaas_config` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 604bc68..aa713c3 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -210,6 +210,20 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT" # SASL client callback handler class config :sasl_client_callback_handler_class, :validate => :string + # The URL for the OAuth 2.0 issuer token endpoint. + config :sasl_oauthbearer_token_endpoint_url, :validate => :string + # (optional) The override name of the scope claim. + config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default + # SASL login callback handler class + config :sasl_login_callback_handler_class, :validate => :string + # (optional) The duration, in milliseconds, for HTTPS connect timeout + config :sasl_login_connect_timeout_ms, :validate => :number + # (optional) The duration, in milliseconds, for HTTPS read timeout. + config :sasl_login_read_timeout_ms, :validate => :number + # (optional) The duration, in milliseconds, to wait between HTTPS call attempts. + config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default + # (optional) The maximum duration, in milliseconds, for HTTPS call attempts. + config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. # This may be any mechanism for which a security provider is available. # GSSAPI is the default mechanism. diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index ebf233f..8ba0148 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -149,6 +149,20 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT" # SASL client callback handler class config :sasl_client_callback_handler_class, :validate => :string + # The URL for the OAuth 2.0 issuer token endpoint. + config :sasl_oauthbearer_token_endpoint_url, :validate => :string + # (optional) The override name of the scope claim. + config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default + # SASL login callback handler class + config :sasl_login_callback_handler_class, :validate => :string + # (optional) The duration, in milliseconds, for HTTPS connect timeout + config :sasl_login_connect_timeout_ms, :validate => :number + # (optional) The duration, in milliseconds, for HTTPS read timeout. + config :sasl_login_read_timeout_ms, :validate => :number + # (optional) The duration, in milliseconds, to wait between HTTPS call attempts. + config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default + # (optional) The maximum duration, in milliseconds, for HTTPS call attempts. + config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. # This may be any mechanism for which a security provider is available. # GSSAPI is the default mechanism. diff --git a/lib/logstash/plugin_mixins/kafka/common.rb b/lib/logstash/plugin_mixins/kafka/common.rb index 1ae8546..6564e80 100644 --- a/lib/logstash/plugin_mixins/kafka/common.rb +++ b/lib/logstash/plugin_mixins/kafka/common.rb @@ -42,6 +42,13 @@ def set_sasl_config(props) props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil? + props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil? + props.put("sasl.oauthbearer.scope.claim.name", sasl_oauthbearer_scope_claim_name) unless sasl_oauthbearer_scope_claim_name.nil? + props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil? + props.put("sasl.login.connect.timeout.ms", sasl_login_connect_timeout_ms.to_s) unless sasl_login_connect_timeout_ms.nil? + props.put("sasl.login.read.timeout.ms", sasl_login_read_timeout_ms.to_s) unless sasl_login_read_timeout_ms.nil? + props.put("sasl.login.retry.backoff.ms", sasl_login_retry_backoff_ms.to_s) unless sasl_login_retry_backoff_ms.nil? + props.put("sasl.login.retry.backoff.max.ms", sasl_login_retry_backoff_max_ms.to_s) unless sasl_login_retry_backoff_max_ms.nil? end def reassign_dns_lookup diff --git a/logstash-integration-kafka.gemspec b/logstash-integration-kafka.gemspec index 05e5c49..9c3bf38 100644 --- a/logstash-integration-kafka.gemspec +++ b/logstash-integration-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-kafka' - s.version = '11.5.4' + s.version = '11.6.0' s.licenses = ['Apache-2.0'] s.summary = "Integration with Kafka - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ diff --git a/spec/unit/inputs/kafka_spec.rb b/spec/unit/inputs/kafka_spec.rb index 4fbb5ae..04ed19c 100644 --- a/spec/unit/inputs/kafka_spec.rb +++ b/spec/unit/inputs/kafka_spec.rb @@ -218,6 +218,54 @@ end + context 'when oauth is configured' do + let(:config) { super().merge( + 'security_protocol' => 'SASL_PLAINTEXT', + 'sasl_mechanism' => 'OAUTHBEARER', + 'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token', + 'sasl_oauthbearer_scope_claim_name' => 'custom_scope' + )} + + it "sets oauth properties" do + expect(org.apache.kafka.clients.consumer.KafkaConsumer). + to receive(:new).with(hash_including( + 'security.protocol' => 'SASL_PLAINTEXT', + 'sasl.mechanism' => 'OAUTHBEARER', + 'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token', + 'sasl.oauthbearer.scope.claim.name' => 'custom_scope' + )).and_return(kafka_client = double('kafka-consumer')) + + expect(subject.send(:create_consumer, 'test-client-1', 'group_instance_id')).to be kafka_client + end + end + + context 'when sasl is configured' do + let(:config) { super().merge( + 'security_protocol' => 'SASL_PLAINTEXT', + 'sasl_mechanism' => 'OAUTHBEARER', + 'sasl_login_connect_timeout_ms' => 15000, + 'sasl_login_read_timeout_ms' => 5000, + 'sasl_login_retry_backoff_ms' => 200, + 'sasl_login_retry_backoff_max_ms' => 15000, + 'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler' + )} + + it "sets sasl login properties" do + expect(org.apache.kafka.clients.consumer.KafkaConsumer). + to receive(:new).with(hash_including( + 'security.protocol' => 'SASL_PLAINTEXT', + 'sasl.mechanism' => 'OAUTHBEARER', + 'sasl.login.connect.timeout.ms' => '15000', + 'sasl.login.read.timeout.ms' => '5000', + 'sasl.login.retry.backoff.ms' => '200', + 'sasl.login.retry.backoff.max.ms' => '15000', + 'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler' + )).and_return(kafka_client = double('kafka-consumer')) + + expect(subject.send(:create_consumer, 'test-client-2', 'group_instance_id')).to be kafka_client + end + end + describe "schema registry" do let(:base_config) do { 'schema_registry_url' => 'http://localhost:8081', diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index 97fcb7c..a215ab6 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -9,6 +9,7 @@ '@timestamp' => LogStash::Timestamp.now}) } let(:future) { double('kafka producer future') } + subject { LogStash::Outputs::Kafka.new(config) } context 'when initializing' do it "should register" do @@ -267,8 +268,6 @@ File.join(File.dirname(__FILE__), '../../fixtures/trust-store_stub.jks') end - subject { LogStash::Outputs::Kafka.new(config) } - it 'sets empty ssl.endpoint.identification.algorithm' do expect(org.apache.kafka.clients.producer.KafkaProducer). to receive(:new).with(hash_including('ssl.endpoint.identification.algorithm' => '')) @@ -283,4 +282,53 @@ end + context 'when oauth is configured' do + let(:config) { + simple_kafka_config.merge( + 'security_protocol' => 'SASL_PLAINTEXT', + 'sasl_mechanism' => 'OAUTHBEARER', + 'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token', + 'sasl_oauthbearer_scope_claim_name' => 'custom_scope' + ) + } + + it "sets oauth properties" do + expect(org.apache.kafka.clients.producer.KafkaProducer). + to receive(:new).with(hash_including( + 'security.protocol' => 'SASL_PLAINTEXT', + 'sasl.mechanism' => 'OAUTHBEARER', + 'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token', + 'sasl.oauthbearer.scope.claim.name' => 'custom_scope' + )) + subject.register + end + end + + context 'when sasl is configured' do + let(:config) { + simple_kafka_config.merge( + 'security_protocol' => 'SASL_PLAINTEXT', + 'sasl_mechanism' => 'OAUTHBEARER', + 'sasl_login_connect_timeout_ms' => 15000, + 'sasl_login_read_timeout_ms' => 5000, + 'sasl_login_retry_backoff_ms' => 200, + 'sasl_login_retry_backoff_max_ms' => 15000, + 'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler' + ) + } + + it "sets sasl login properties" do + expect(org.apache.kafka.clients.producer.KafkaProducer). + to receive(:new).with(hash_including( + 'security.protocol' => 'SASL_PLAINTEXT', + 'sasl.mechanism' => 'OAUTHBEARER', + 'sasl.login.connect.timeout.ms' => '15000', + 'sasl.login.read.timeout.ms' => '5000', + 'sasl.login.retry.backoff.ms' => '200', + 'sasl.login.retry.backoff.max.ms' => '15000', + 'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler' + )) + subject.register + end + end end