Skip to content

Commit 6f22abb

Browse files
authored
Merge pull request #482 from fluent/unify-duplicate-kafka-params
Unify duplicate kafka params
2 parents 222048c + 78f60fa commit 6f22abb

File tree

1 file changed

+32
-31
lines changed

1 file changed

+32
-31
lines changed

lib/fluent/plugin/out_kafka2.rb

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -116,44 +116,45 @@ def refresh_client(raise_error = true)
116116
logger = @get_kafka_client_log ? log : nil
117117
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
118118
if @scram_mechanism != nil && @username != nil && @password != nil
119-
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
120-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
121-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
122-
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
123-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
119+
sasl_params = {
120+
sasl_scram_username: @username,
121+
sasl_scram_password: @password,
122+
sasl_scram_mechanism: @scram_mechanism,
123+
}
124124
elsif @username != nil && @password != nil
125-
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
126-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
127-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
128-
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
129-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
125+
sasl_params = {
126+
sasl_plain_username: @username,
127+
sasl_plain_password: @password,
128+
}
130129
elsif use_long_lived_aws_credentials
131-
@kafka = Kafka.new(
132-
seed_brokers: @seed_brokers,
133-
client_id: @client_id,
134-
logger: logger,
135-
connect_timeout: @connect_timeout,
136-
socket_timeout: @socket_timeout,
137-
ssl_ca_cert_file_path: @ssl_ca_cert,
138-
ssl_client_cert: read_ssl_file(@ssl_client_cert),
139-
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
140-
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
141-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
142-
sasl_over_ssl: @sasl_over_ssl,
143-
ssl_verify_hostname: @ssl_verify_hostname,
144-
resolve_seed_brokers: @resolve_seed_brokers,
130+
sasl_params = {
145131
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
146132
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
147133
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
148-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
149-
)
134+
}
150135
else
151-
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
152-
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_key_password: @ssl_client_cert_key_password, ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
153-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
154-
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
155-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
136+
sasl_params = {
137+
sasl_gssapi_principal: @principal,
138+
sasl_gssapi_keytab: @keytab,
139+
}
156140
end
141+
@kafka = Kafka.new(
142+
seed_brokers: @seed_brokers,
143+
client_id: @client_id,
144+
logger: logger,
145+
connect_timeout: @connect_timeout,
146+
socket_timeout: @socket_timeout,
147+
ssl_ca_cert_file_path: @ssl_ca_cert,
148+
ssl_client_cert: read_ssl_file(@ssl_client_cert),
149+
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
150+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
151+
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
152+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
153+
ssl_verify_hostname: @ssl_verify_hostname,
154+
resolve_seed_brokers: @resolve_seed_brokers,
155+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function),
156+
sasl_over_ssl: @sasl_over_ssl,
157+
**sasl_params)
157158
log.info "initialized kafka producer: #{@client_id}"
158159
rescue Exception => e
159160
if raise_error # During startup, error should be reported to engine and stop its phase for safety.

0 commit comments

Comments
 (0)