Skip to content

Commit 78f60fa

Browse files
committed
out_kafka2: Unify duplicate kafka params
Signed-off-by: Takuro Ashie <[email protected]>
1 parent 9020b7c commit 78f60fa

File tree

1 file changed

+25
-64
lines changed

1 file changed

+25
-64
lines changed

lib/fluent/plugin/out_kafka2.rb

Lines changed: 25 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -116,84 +116,45 @@ def refresh_client(raise_error = true)
116116
logger = @get_kafka_client_log ? log : nil
117117
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
118118
if @scram_mechanism != nil && @username != nil && @password != nil
119-
@kafka = Kafka.new(
120-
seed_brokers: @seed_brokers,
121-
client_id: @client_id,
122-
logger: logger,
123-
connect_timeout: @connect_timeout,
124-
socket_timeout: @socket_timeout,
125-
ssl_ca_cert_file_path: @ssl_ca_cert,
126-
ssl_client_cert: read_ssl_file(@ssl_client_cert),
127-
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
128-
ssl_client_cert_key_password: @ssl_client_cert_key_password,
129-
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
130-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
119+
sasl_params = {
131120
sasl_scram_username: @username,
132121
sasl_scram_password: @password,
133122
sasl_scram_mechanism: @scram_mechanism,
134-
sasl_over_ssl: @sasl_over_ssl,
135-
ssl_verify_hostname: @ssl_verify_hostname,
136-
resolve_seed_brokers: @resolve_seed_brokers,
137-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
123+
}
138124
elsif @username != nil && @password != nil
139-
@kafka = Kafka.new(
140-
seed_brokers: @seed_brokers,
141-
client_id: @client_id,
142-
logger: logger,
143-
connect_timeout: @connect_timeout,
144-
socket_timeout: @socket_timeout,
145-
ssl_ca_cert_file_path: @ssl_ca_cert,
146-
ssl_client_cert: read_ssl_file(@ssl_client_cert),
147-
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
148-
ssl_client_cert_key_password: @ssl_client_cert_key_password,
149-
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
150-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
125+
sasl_params = {
151126
sasl_plain_username: @username,
152127
sasl_plain_password: @password,
153-
sasl_over_ssl: @sasl_over_ssl,
154-
ssl_verify_hostname: @ssl_verify_hostname,
155-
resolve_seed_brokers: @resolve_seed_brokers,
156-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
128+
}
157129
elsif use_long_lived_aws_credentials
158-
@kafka = Kafka.new(
159-
seed_brokers: @seed_brokers,
160-
client_id: @client_id,
161-
logger: logger,
162-
connect_timeout: @connect_timeout,
163-
socket_timeout: @socket_timeout,
164-
ssl_ca_cert_file_path: @ssl_ca_cert,
165-
ssl_client_cert: read_ssl_file(@ssl_client_cert),
166-
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
167-
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
168-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
169-
sasl_over_ssl: @sasl_over_ssl,
170-
ssl_verify_hostname: @ssl_verify_hostname,
171-
resolve_seed_brokers: @resolve_seed_brokers,
130+
sasl_params = {
172131
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
173132
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
174133
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
175-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
176-
)
134+
}
177135
else
178-
@kafka = Kafka.new(
179-
seed_brokers: @seed_brokers,
180-
client_id: @client_id,
181-
logger: logger,
182-
connect_timeout: @connect_timeout,
183-
socket_timeout: @socket_timeout,
184-
ssl_ca_cert_file_path: @ssl_ca_cert,
185-
ssl_client_cert: read_ssl_file(@ssl_client_cert),
186-
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
187-
ssl_client_cert_key_password: @ssl_client_cert_key_password,
188-
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
189-
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
136+
sasl_params = {
190137
sasl_gssapi_principal: @principal,
191138
sasl_gssapi_keytab: @keytab,
192-
sasl_over_ssl: @sasl_over_ssl,
193-
ssl_verify_hostname: @ssl_verify_hostname,
194-
resolve_seed_brokers: @resolve_seed_brokers,
195-
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
139+
}
196140
end
141+
@kafka = Kafka.new(
142+
seed_brokers: @seed_brokers,
143+
client_id: @client_id,
144+
logger: logger,
145+
connect_timeout: @connect_timeout,
146+
socket_timeout: @socket_timeout,
147+
ssl_ca_cert_file_path: @ssl_ca_cert,
148+
ssl_client_cert: read_ssl_file(@ssl_client_cert),
149+
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
150+
ssl_client_cert_key_password: @ssl_client_cert_key_password,
151+
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
152+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
153+
ssl_verify_hostname: @ssl_verify_hostname,
154+
resolve_seed_brokers: @resolve_seed_brokers,
155+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function),
156+
sasl_over_ssl: @sasl_over_ssl,
157+
**sasl_params)
197158
log.info "initialized kafka producer: #{@client_id}"
198159
rescue Exception => e
199160
if raise_error # During startup, error should be reported to engine and stop its phase for safety.

0 commit comments

Comments
 (0)