@@ -74,6 +74,7 @@ class Fluent::KafkaGroupInput < Fluent::Input
7474
7575 include Fluent ::KafkaPluginUtil ::SSLSettings
7676 include Fluent ::KafkaPluginUtil ::SaslSettings
77+ include Fluent ::KafkaPluginUtil ::AwsIamSettings
7778
7879 class ForShutdown < StandardError
7980 end
@@ -185,6 +186,7 @@ def start
185186 super
186187
187188 logger = @get_kafka_client_log ? log : nil
189+ use_aws_msk_iam = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
188190 if @scram_mechanism != nil && @username != nil && @password != nil
189191 @kafka = Kafka . new ( seed_brokers : @brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout , ssl_ca_cert_file_path : @ssl_ca_cert ,
190192 ssl_client_cert : read_ssl_file ( @ssl_client_cert ) , ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) ,
@@ -197,6 +199,10 @@ def start
197199 ssl_client_cert_key_password : @ssl_client_cert_key_password ,
198200 ssl_ca_certs_from_system : @ssl_ca_certs_from_system , sasl_plain_username : @username , sasl_plain_password : @password ,
199201 sasl_over_ssl : @sasl_over_ssl , ssl_verify_hostname : @ssl_verify_hostname )
202+ elsif use_aws_msk_iam
203+ @kafka = Kafka . new ( seed_brokers : @brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout ,
204+ sasl_aws_msk_iam_secret_key_id : @sasl_aws_msk_iam_secret_key_id , sasl_aws_msk_iam_access_key_id : @sasl_aws_msk_iam_access_key_id , sasl_aws_msk_iam_aws_region : @sasl_aws_msk_iam_aws_region ,
205+ ssl_ca_certs_from_system : @ssl_ca_certs_from_system )
200206 else
201207 @kafka = Kafka . new ( seed_brokers : @brokers , client_id : @client_id , logger : logger , connect_timeout : @connect_timeout , socket_timeout : @socket_timeout , ssl_ca_cert_file_path : @ssl_ca_cert ,
202208 ssl_client_cert : read_ssl_file ( @ssl_client_cert ) , ssl_client_cert_key : read_ssl_file ( @ssl_client_cert_key ) ,
0 commit comments