Skip to content

Commit 1d313a6

Browse files
kuckjwi0928madebydna
authored andcommitted
feat: Add aws msk iam option (#531)
Signed-off-by: kuckjwi <[email protected]>
1 parent 632faa2 commit 1d313a6

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

lib/fluent/plugin/in_kafka_group.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class Fluent::KafkaGroupInput < Fluent::Input
7474

7575
include Fluent::KafkaPluginUtil::SSLSettings
7676
include Fluent::KafkaPluginUtil::SaslSettings
77+
include Fluent::KafkaPluginUtil::AwsIamSettings
7778

7879
class ForShutdown < StandardError
7980
end
@@ -185,6 +186,7 @@ def start
185186
super
186187

187188
logger = @get_kafka_client_log ? log : nil
189+
use_aws_msk_iam = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
188190
if @scram_mechanism != nil && @username != nil && @password != nil
189191
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
190192
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
@@ -197,6 +199,10 @@ def start
197199
ssl_client_cert_key_password: @ssl_client_cert_key_password,
198200
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
199201
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
202+
elsif use_aws_msk_iam
203+
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout,
204+
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id, sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id, sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
205+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system)
200206
else
201207
@kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
202208
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),

0 commit comments

Comments
 (0)