Skip to content

Commit 6de2996

Browse files
donoghucdarby.hanmashhurs
authored
Add fine tuned oath/sasl configuration to kafka client (#189)
* Add missing oauth config (#180) * Add missing `sasl.login.callback.handler.class"` config (#180) * Add unit tests for new oauth and sasl config options * 11.6.0 release prep Add changelog entry and increment minor version for new configuration features. * Align default values with kafka client lib This commit updates the plugin defaults to align with the default (or lack of default) values in the underlying kafka client library. https://github.com/apache/kafka/blob/3.8.1/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java * Explicitly document no default setting Co-authored-by: Mashhur <[email protected]> * Explicitly document no default setting Co-authored-by: Mashhur <[email protected]> * Explicitly document no default setting Co-authored-by: Mashhur <[email protected]> * Explicitly document no default setting Co-authored-by: Mashhur <[email protected]> --------- Co-authored-by: darby.han <[email protected]> Co-authored-by: Mashhur <[email protected]>
1 parent 3b5789c commit 6de2996

File tree

9 files changed

+255
-9
lines changed

9 files changed

+255
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.6.0
2+
- Support additional `oauth` and `sasl` configuration options for configuring kafka client [#189](https://github.com/logstash-plugins/logstash-integration-kafka/pull/189)
3+
14
## 11.5.4
25
- Update kafka client to 3.8.1 and transitive dependencies [#188](https://github.com/logstash-plugins/logstash-integration-kafka/pull/188)
36
- Removed Jar Dependencies dependency [#187](https://github.com/logstash-plugins/logstash-integration-kafka/pull/187)

docs/input-kafka.asciidoc

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
131131
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<number,number>>|No
132132
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
133133
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
134+
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
135+
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
136+
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
137+
| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
138+
| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
139+
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
140+
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
134141
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
135142
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
136143
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
@@ -556,13 +563,62 @@ retries are exhausted.
556563
The amount of time to wait before attempting to retry a failed fetch request
557564
to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.
558565

559-
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
566+
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
560567
===== `sasl_client_callback_handler_class`
561-
* Value type is <<string,string>>
562-
* There is no default value for this setting.
568+
* Value type is <<string,string>>
569+
* There is no default value for this setting.
563570

564571
The SASL client callback handler class the specified SASL mechanism should use.
565572

573+
[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
574+
===== `sasl_oauthbearer_token_endpoint_url`
575+
* Value type is <<string,string>>
576+
* There is no default value for this setting.
577+
578+
The URL for the OAuth 2.0 issuer token endpoint.
579+
580+
[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
581+
===== `sasl_oauthbearer_scope_claim_name`
582+
* Value type is <<string,string>>
583+
* Default value is `"scope"`
584+
585+
(optional) The override name of the scope claim.
586+
587+
[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
588+
===== `sasl_login_callback_handler_class`
589+
* Value type is <<string,string>>
590+
* There is no default value for this setting.
591+
592+
The SASL login callback handler class the specified SASL mechanism should use.
593+
594+
[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
595+
===== `sasl_login_connect_timeout_ms`
596+
* Value type is <<number,number>>
597+
* There is no default value for this setting.
598+
599+
(optional) The duration, in milliseconds, for HTTPS connect timeout
600+
601+
[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
602+
===== `sasl_login_read_timeout_ms`
603+
* Value type is <<number,number>>
604+
* There is no default value for this setting.
605+
606+
(optional) The duration, in milliseconds, for HTTPS read timeout.
607+
608+
[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
609+
===== `sasl_login_retry_backoff_ms`
610+
* Value type is <<number,number>>
611+
* Default value is `100` milliseconds.
612+
613+
(optional) The duration, in milliseconds, to wait between HTTPS call attempts.
614+
615+
[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
616+
===== `sasl_login_retry_backoff_max_ms`
617+
* Value type is <<number,number>>
618+
* Default value is `10000` milliseconds.
619+
620+
(optional) The maximum duration, in milliseconds, for HTTPS call attempts.
621+
566622
[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
567623
===== `sasl_jaas_config`
568624

docs/output-kafka.asciidoc

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
102102
| <<plugins-{type}s-{plugin}-retries>> |<<number,number>>|No
103103
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<number,number>>|No
104104
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
105+
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url>> |<<string,string>>|No
106+
| <<plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name>> |<<string,string>>|No
107+
| <<plugins-{type}s-{plugin}-sasl_login_callback_handler_class>> |<<string,string>>|No
108+
| <<plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms>> |<<number,number>>|No
109+
| <<plugins-{type}s-{plugin}-sasl_login_read_timeout_ms>> |<<number,number>>|No
110+
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms>> |<<number,number>>|No
111+
| <<plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms>> |<<number,number>>|No
105112
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
106113
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
107114
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
@@ -392,13 +399,62 @@ In versions prior to 10.5.0, any exception is retried indefinitely unless the `r
392399

393400
The amount of time to wait before attempting to retry a failed produce request to a given topic partition.
394401

395-
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
402+
[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class"]
396403
===== `sasl_client_callback_handler_class`
397-
* Value type is <<string,string>>
398-
* There is no default value for this setting.
404+
* Value type is <<string,string>>
405+
* There is no default value for this setting.
399406

400407
The SASL client callback handler class the specified SASL mechanism should use.
401408

409+
[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url"]
410+
===== `sasl_oauthbearer_token_endpoint_url`
411+
* Value type is <<string,string>>
412+
* There is no default value for this setting.
413+
414+
The URL for the OAuth 2.0 issuer token endpoint.
415+
416+
[id="plugins-{type}s-{plugin}-sasl_oauthbearer_scope_claim_name"]
417+
===== `sasl_oauthbearer_scope_claim_name`
418+
* Value type is <<string,string>>
419+
* Default value is `"scope"`
420+
421+
(optional) The override name of the scope claim.
422+
423+
[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class"]
424+
===== `sasl_login_callback_handler_class`
425+
* Value type is <<string,string>>
426+
* There is no default value for this setting.
427+
428+
The SASL login callback handler class the specified SASL mechanism should use.
429+
430+
[id="plugins-{type}s-{plugin}-sasl_login_connect_timeout_ms"]
431+
===== `sasl_login_connect_timeout_ms`
432+
* Value type is <<number,number>>
433+
* There is no default value for this setting.
434+
435+
(optional) The duration, in milliseconds, for HTTPS connect timeout
436+
437+
[id="plugins-{type}s-{plugin}-sasl_login_read_timeout_ms"]
438+
===== `sasl_login_read_timeout_ms`
439+
* Value type is <<number,number>>
440+
* There is no default value for this setting.
441+
442+
(optional) The duration, in milliseconds, for HTTPS read timeout.
443+
444+
[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_ms"]
445+
===== `sasl_login_retry_backoff_ms`
446+
* Value type is <<number,number>>
447+
* Default value is `100` milliseconds.
448+
449+
(optional) The duration, in milliseconds, to wait between HTTPS call attempts.
450+
451+
[id="plugins-{type}s-{plugin}-sasl_login_retry_backoff_max_ms"]
452+
===== `sasl_login_retry_backoff_max_ms`
453+
* Value type is <<number,number>>
454+
* Default value is `10000` milliseconds.
455+
456+
(optional) The maximum duration, in milliseconds, for HTTPS call attempts.
457+
402458
[id="plugins-{type}s-{plugin}-sasl_jaas_config"]
403459
===== `sasl_jaas_config`
404460

lib/logstash/inputs/kafka.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,20 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
210210
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
211211
# SASL client callback handler class
212212
config :sasl_client_callback_handler_class, :validate => :string
213+
# The URL for the OAuth 2.0 issuer token endpoint.
214+
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
215+
# (optional) The override name of the scope claim.
216+
config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default
217+
# SASL login callback handler class
218+
config :sasl_login_callback_handler_class, :validate => :string
219+
# (optional) The duration, in milliseconds, for HTTPS connect timeout
220+
config :sasl_login_connect_timeout_ms, :validate => :number
221+
# (optional) The duration, in milliseconds, for HTTPS read timeout.
222+
config :sasl_login_read_timeout_ms, :validate => :number
223+
# (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
224+
config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default
225+
# (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
226+
config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default
213227
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
214228
# This may be any mechanism for which a security provider is available.
215229
# GSSAPI is the default mechanism.

lib/logstash/outputs/kafka.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,20 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
149149
config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT"
150150
# SASL client callback handler class
151151
config :sasl_client_callback_handler_class, :validate => :string
152+
# The URL for the OAuth 2.0 issuer token endpoint.
153+
config :sasl_oauthbearer_token_endpoint_url, :validate => :string
154+
# (optional) The override name of the scope claim.
155+
config :sasl_oauthbearer_scope_claim_name, :validate => :string, :default => 'scope' # Kafka default
156+
# SASL login callback handler class
157+
config :sasl_login_callback_handler_class, :validate => :string
158+
# (optional) The duration, in milliseconds, for HTTPS connect timeout
159+
config :sasl_login_connect_timeout_ms, :validate => :number
160+
# (optional) The duration, in milliseconds, for HTTPS read timeout.
161+
config :sasl_login_read_timeout_ms, :validate => :number
162+
# (optional) The duration, in milliseconds, to wait between HTTPS call attempts.
163+
config :sasl_login_retry_backoff_ms, :validate => :number, :default => 100 # Kafka default
164+
# (optional) The maximum duration, in milliseconds, for HTTPS call attempts.
165+
config :sasl_login_retry_backoff_max_ms, :validate => :number, :default => 10000 # Kafka default
152166
# http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
153167
# This may be any mechanism for which a security provider is available.
154168
# GSSAPI is the default mechanism.

lib/logstash/plugin_mixins/kafka/common.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ def set_sasl_config(props)
4242
props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
4343
props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
4444
props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil?
45+
props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil?
46+
props.put("sasl.oauthbearer.scope.claim.name", sasl_oauthbearer_scope_claim_name) unless sasl_oauthbearer_scope_claim_name.nil?
47+
props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil?
48+
props.put("sasl.login.connect.timeout.ms", sasl_login_connect_timeout_ms.to_s) unless sasl_login_connect_timeout_ms.nil?
49+
props.put("sasl.login.read.timeout.ms", sasl_login_read_timeout_ms.to_s) unless sasl_login_read_timeout_ms.nil?
50+
props.put("sasl.login.retry.backoff.ms", sasl_login_retry_backoff_ms.to_s) unless sasl_login_retry_backoff_ms.nil?
51+
props.put("sasl.login.retry.backoff.max.ms", sasl_login_retry_backoff_max_ms.to_s) unless sasl_login_retry_backoff_max_ms.nil?
4552
end
4653

4754
def reassign_dns_lookup

logstash-integration-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-kafka'
3-
s.version = '11.5.4'
3+
s.version = '11.6.0'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Integration with Kafka - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

spec/unit/inputs/kafka_spec.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,54 @@
218218

219219
end
220220

221+
context 'when oauth is configured' do
222+
let(:config) { super().merge(
223+
'security_protocol' => 'SASL_PLAINTEXT',
224+
'sasl_mechanism' => 'OAUTHBEARER',
225+
'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token',
226+
'sasl_oauthbearer_scope_claim_name' => 'custom_scope'
227+
)}
228+
229+
it "sets oauth properties" do
230+
expect(org.apache.kafka.clients.consumer.KafkaConsumer).
231+
to receive(:new).with(hash_including(
232+
'security.protocol' => 'SASL_PLAINTEXT',
233+
'sasl.mechanism' => 'OAUTHBEARER',
234+
'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token',
235+
'sasl.oauthbearer.scope.claim.name' => 'custom_scope'
236+
)).and_return(kafka_client = double('kafka-consumer'))
237+
238+
expect(subject.send(:create_consumer, 'test-client-1', 'group_instance_id')).to be kafka_client
239+
end
240+
end
241+
242+
context 'when sasl is configured' do
243+
let(:config) { super().merge(
244+
'security_protocol' => 'SASL_PLAINTEXT',
245+
'sasl_mechanism' => 'OAUTHBEARER',
246+
'sasl_login_connect_timeout_ms' => 15000,
247+
'sasl_login_read_timeout_ms' => 5000,
248+
'sasl_login_retry_backoff_ms' => 200,
249+
'sasl_login_retry_backoff_max_ms' => 15000,
250+
'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler'
251+
)}
252+
253+
it "sets sasl login properties" do
254+
expect(org.apache.kafka.clients.consumer.KafkaConsumer).
255+
to receive(:new).with(hash_including(
256+
'security.protocol' => 'SASL_PLAINTEXT',
257+
'sasl.mechanism' => 'OAUTHBEARER',
258+
'sasl.login.connect.timeout.ms' => '15000',
259+
'sasl.login.read.timeout.ms' => '5000',
260+
'sasl.login.retry.backoff.ms' => '200',
261+
'sasl.login.retry.backoff.max.ms' => '15000',
262+
'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler'
263+
)).and_return(kafka_client = double('kafka-consumer'))
264+
265+
expect(subject.send(:create_consumer, 'test-client-2', 'group_instance_id')).to be kafka_client
266+
end
267+
end
268+
221269
describe "schema registry" do
222270
let(:base_config) do {
223271
'schema_registry_url' => 'http://localhost:8081',

spec/unit/outputs/kafka_spec.rb

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
'@timestamp' => LogStash::Timestamp.now}) }
1010

1111
let(:future) { double('kafka producer future') }
12+
subject { LogStash::Outputs::Kafka.new(config) }
1213

1314
context 'when initializing' do
1415
it "should register" do
@@ -267,8 +268,6 @@
267268
File.join(File.dirname(__FILE__), '../../fixtures/trust-store_stub.jks')
268269
end
269270

270-
subject { LogStash::Outputs::Kafka.new(config) }
271-
272271
it 'sets empty ssl.endpoint.identification.algorithm' do
273272
expect(org.apache.kafka.clients.producer.KafkaProducer).
274273
to receive(:new).with(hash_including('ssl.endpoint.identification.algorithm' => ''))
@@ -283,4 +282,53 @@
283282

284283
end
285284

285+
context 'when oauth is configured' do
286+
let(:config) {
287+
simple_kafka_config.merge(
288+
'security_protocol' => 'SASL_PLAINTEXT',
289+
'sasl_mechanism' => 'OAUTHBEARER',
290+
'sasl_oauthbearer_token_endpoint_url' => 'https://auth.example.com/token',
291+
'sasl_oauthbearer_scope_claim_name' => 'custom_scope'
292+
)
293+
}
294+
295+
it "sets oauth properties" do
296+
expect(org.apache.kafka.clients.producer.KafkaProducer).
297+
to receive(:new).with(hash_including(
298+
'security.protocol' => 'SASL_PLAINTEXT',
299+
'sasl.mechanism' => 'OAUTHBEARER',
300+
'sasl.oauthbearer.token.endpoint.url' => 'https://auth.example.com/token',
301+
'sasl.oauthbearer.scope.claim.name' => 'custom_scope'
302+
))
303+
subject.register
304+
end
305+
end
306+
307+
context 'when sasl is configured' do
308+
let(:config) {
309+
simple_kafka_config.merge(
310+
'security_protocol' => 'SASL_PLAINTEXT',
311+
'sasl_mechanism' => 'OAUTHBEARER',
312+
'sasl_login_connect_timeout_ms' => 15000,
313+
'sasl_login_read_timeout_ms' => 5000,
314+
'sasl_login_retry_backoff_ms' => 200,
315+
'sasl_login_retry_backoff_max_ms' => 15000,
316+
'sasl_login_callback_handler_class' => 'org.example.CustomLoginHandler'
317+
)
318+
}
319+
320+
it "sets sasl login properties" do
321+
expect(org.apache.kafka.clients.producer.KafkaProducer).
322+
to receive(:new).with(hash_including(
323+
'security.protocol' => 'SASL_PLAINTEXT',
324+
'sasl.mechanism' => 'OAUTHBEARER',
325+
'sasl.login.connect.timeout.ms' => '15000',
326+
'sasl.login.read.timeout.ms' => '5000',
327+
'sasl.login.retry.backoff.ms' => '200',
328+
'sasl.login.retry.backoff.max.ms' => '15000',
329+
'sasl.login.callback.handler.class' => 'org.example.CustomLoginHandler'
330+
))
331+
subject.register
332+
end
333+
end
286334
end

0 commit comments

Comments
 (0)