@@ -116,17 +116,44 @@ def refresh_client(raise_error = true)
116116 logger = @get_kafka_client_log ? log : nil
117117 use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
118118 if @scram_mechanism != nil && @username != nil && @password != nil
119- @kafka = Kafka . new ( seed_brokers : @seed_brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout , ssl_ca_cert_file_path : @ssl_ca_cert ,
120- ssl_client_cert : read_ssl_file ( @ssl_client_cert ) , ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) , ssl_client_cert_key_password : @ssl_client_cert_key_password , ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
121- ssl_ca_certs_from_system : @ssl_ca_certs_from_system , sasl_scram_username : @username , sasl_scram_password : @password ,
122- sasl_scram_mechanism : @scram_mechanism , sasl_over_ssl : @sasl_over_ssl , ssl_verify_hostname : @ssl_verify_hostname , resolve_seed_brokers : @resolve_seed_brokers ,
123- partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
119+ @kafka = Kafka . new (
120+ seed_brokers : @seed_brokers ,
121+ client_id : @client_id ,
122+ logger : logger ,
123+ connect_timeout : @connect_timeout ,
124+ socket_timeout : @socket_timeout ,
125+ ssl_ca_cert_file_path : @ssl_ca_cert ,
126+ ssl_client_cert : read_ssl_file ( @ssl_client_cert ) ,
127+ ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) ,
128+ ssl_client_cert_key_password : @ssl_client_cert_key_password ,
129+ ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
130+ ssl_ca_certs_from_system : @ssl_ca_certs_from_system ,
131+ sasl_scram_username : @username ,
132+ sasl_scram_password : @password ,
133+ sasl_scram_mechanism : @scram_mechanism ,
134+ sasl_over_ssl : @sasl_over_ssl ,
135+ ssl_verify_hostname : @ssl_verify_hostname ,
136+ resolve_seed_brokers : @resolve_seed_brokers ,
137+ partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
124138 elsif @username != nil && @password != nil
125- @kafka = Kafka . new ( seed_brokers : @seed_brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout , ssl_ca_cert_file_path : @ssl_ca_cert ,
126- ssl_client_cert : read_ssl_file ( @ssl_client_cert ) , ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) , ssl_client_cert_key_password : @ssl_client_cert_key_password , ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
127- ssl_ca_certs_from_system : @ssl_ca_certs_from_system , sasl_plain_username : @username , sasl_plain_password : @password , sasl_over_ssl : @sasl_over_ssl ,
128- ssl_verify_hostname : @ssl_verify_hostname , resolve_seed_brokers : @resolve_seed_brokers ,
129- partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
139+ @kafka = Kafka . new (
140+ seed_brokers : @seed_brokers ,
141+ client_id : @client_id ,
142+ logger : logger ,
143+ connect_timeout : @connect_timeout ,
144+ socket_timeout : @socket_timeout ,
145+ ssl_ca_cert_file_path : @ssl_ca_cert ,
146+ ssl_client_cert : read_ssl_file ( @ssl_client_cert ) ,
147+ ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) ,
148+ ssl_client_cert_key_password : @ssl_client_cert_key_password ,
149+ ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
150+ ssl_ca_certs_from_system : @ssl_ca_certs_from_system ,
151+ sasl_plain_username : @username ,
152+ sasl_plain_password : @password ,
153+ sasl_over_ssl : @sasl_over_ssl ,
154+ ssl_verify_hostname : @ssl_verify_hostname ,
155+ resolve_seed_brokers : @resolve_seed_brokers ,
156+ partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
130157 elsif use_long_lived_aws_credentials
131158 @kafka = Kafka . new (
132159 seed_brokers : @seed_brokers ,
@@ -148,11 +175,24 @@ def refresh_client(raise_error = true)
148175 partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function )
149176 )
150177 else
151- @kafka = Kafka . new ( seed_brokers : @seed_brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout , ssl_ca_cert_file_path : @ssl_ca_cert ,
152- ssl_client_cert : read_ssl_file ( @ssl_client_cert ) , ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) , ssl_client_cert_key_password : @ssl_client_cert_key_password , ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
153- ssl_ca_certs_from_system : @ssl_ca_certs_from_system , sasl_gssapi_principal : @principal , sasl_gssapi_keytab : @keytab , sasl_over_ssl : @sasl_over_ssl ,
154- ssl_verify_hostname : @ssl_verify_hostname , resolve_seed_brokers : @resolve_seed_brokers ,
155- partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
178+ @kafka = Kafka . new (
179+ seed_brokers : @seed_brokers ,
180+ client_id : @client_id ,
181+ logger : logger ,
182+ connect_timeout : @connect_timeout ,
183+ socket_timeout : @socket_timeout ,
184+ ssl_ca_cert_file_path : @ssl_ca_cert ,
185+ ssl_client_cert : read_ssl_file ( @ssl_client_cert ) ,
186+ ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) ,
187+ ssl_client_cert_key_password : @ssl_client_cert_key_password ,
188+ ssl_client_cert_chain : read_ssl_file ( @ssl_client_cert_chain ) ,
189+ ssl_ca_certs_from_system : @ssl_ca_certs_from_system ,
190+ sasl_gssapi_principal : @principal ,
191+ sasl_gssapi_keytab : @keytab ,
192+ sasl_over_ssl : @sasl_over_ssl ,
193+ ssl_verify_hostname : @ssl_verify_hostname ,
194+ resolve_seed_brokers : @resolve_seed_brokers ,
195+ partitioner : Kafka ::Partitioner . new ( hash_function : @partitioner_hash_function ) )
156196 end
157197 log . info "initialized kafka producer: #{ @client_id } "
158198 rescue Exception => e
0 commit comments