Skip to content

Commit 222048c

Browse files
authored
Merge pull request #480 from raytung/feat/ssl-client-cert-key-password
adds ssl client cert key password
2 parents 2253efd + 29d5b03 commit 222048c

File tree

6 files changed

+12
-4
lines changed

6 files changed

+12
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ If you want to use zookeeper related parameters, you also need to install zookee
3636
- ssl_ca_cert
3737
- ssl_client_cert
3838
- ssl_client_cert_key
39+
- ssl_client_cert_key_password
3940
- ssl_ca_certs_from_system
4041

4142
Set path to SSL related files. See [Encryption and Authentication using SSL](https://github.com/zendesk/ruby-kafka#encryption-and-authentication-using-ssl) for more detail.

lib/fluent/plugin/in_kafka.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,16 +200,19 @@ def start
200200
if @scram_mechanism != nil && @username != nil && @password != nil
201201
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
202202
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
203204
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
204205
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
205206
elsif @username != nil && @password != nil
206207
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
207208
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
209+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
208210
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
209211
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
210212
else
211213
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert_file_path: @ssl_ca_cert,
212214
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
215+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
213216
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
214217
ssl_verify_hostname: @ssl_verify_hostname)
215218
end

lib/fluent/plugin/in_kafka_group.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,19 @@ def start
188188
if @scram_mechanism != nil && @username != nil && @password != nil
189189
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
190190
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
191+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
191192
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
192193
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
193194
elsif @username != nil && @password != nil
194195
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
195196
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
197+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
196198
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
197199
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
198200
else
199201
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
200202
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
203+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
201204
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
202205
ssl_verify_hostname: @ssl_verify_hostname)
203206
end

lib/fluent/plugin/kafka_plugin_util.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def self.included(klass)
2323
:desc => "a PEM encoded client cert to use with and SSL connection. Must be used in combination with ssl_client_cert_key."
2424
config_param :ssl_client_cert_key, :string, :default => nil,
2525
:desc => "a PEM encoded client cert key to use with and SSL connection. Must be used in combination with ssl_client_cert."
26+
config_param :ssl_client_cert_key_password, :string, :default => nil, secret: true,
27+
:desc => "a PEM encoded client cert key password to use with SSL connection."
2628
config_param :ssl_client_cert_chain, :string, :default => nil,
2729
:desc => "an extra PEM encoded cert to use with and SSL connection."
2830
config_param :ssl_ca_certs_from_system, :bool, :default => false,

lib/fluent/plugin/out_kafka2.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,13 @@ def refresh_client(raise_error = true)
117117
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
118118
if @scram_mechanism != nil && @username != nil && @password != nil
119119
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
120-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
120+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
121121
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
122122
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
123123
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
124124
elsif @username != nil && @password != nil
125125
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
126-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
126+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
127127
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
128128
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
129129
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
@@ -149,7 +149,7 @@ def refresh_client(raise_error = true)
149149
)
150150
else
151151
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
152-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
152+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
153153
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
154154
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
155155
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ class Fluent::Rdkafka2Output < Output
9595
config_param :max_enqueue_bytes_per_second, :size, :default => nil, :desc => 'The maximum number of enqueueing bytes per second'
9696

9797
config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name'
98-
config_param :ssl_client_cert_key_password, :string, :default => nil, :desc => 'Used for ssl.key.password'
9998

10099
config_section :buffer do
101100
config_set_default :chunk_keys, ["topic"]

0 commit comments

Comments
 (0)