Skip to content

Commit c7fce94

Browse files
committed
feat(out_kafka2): adds support for AWS IAM authentication to MSK using long lived credentials
This commit adds support for AWS IAM authentication using long lived credentials (access key id and secret access keys). To support AWS assume role and STS, we will need to wait for upstream's `ruby-kafka` library support. We will need to bump `ruby-kafka` to 1.5.0 in order to support this feature. Signed-off-by: Ray Tung <[email protected]>
1 parent f1c55d6 commit c7fce94

File tree

4 files changed

+46
-1
lines changed

4 files changed

+46
-1
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,16 @@ If `ruby-kafka` doesn't fit your kafka environment, check `rdkafka2` plugin inst
212212
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
213213
share_producer (bool) :default => false
214214

215+
# If you intend to rely on AWS IAM auth to MSK with long lived credentials
216+
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
217+
#
218+
# For AWS STS support, see status in
219+
# - https://github.com/zendesk/ruby-kafka/issues/944
220+
# - https://github.com/zendesk/ruby-kafka/pull/951
221+
sasl_aws_msk_iam_access_key_id (string) :default => nil
222+
sasl_aws_msk_iam_secret_key_id (string) :default => nil
223+
sasl_aws_msk_iam_aws_region (string) :default => nil
224+
215225
<format>
216226
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
217227
</format>

fluent-plugin-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Gem::Specification.new do |gem|
1818

1919
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
2020
gem.add_dependency 'ltsv'
21-
gem.add_dependency 'ruby-kafka', '>= 1.4.0', '< 2'
21+
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
2222
gem.add_development_dependency "rake", ">= 0.9.2"
2323
gem.add_development_dependency "test-unit", ">= 3.0.8"
2424
gem.add_development_dependency "test-unit-rr", "~> 1.0"

lib/fluent/plugin/kafka_plugin_util.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
module Fluent
22
module KafkaPluginUtil
3+
module AwsIamSettings
4+
def self.included(klass)
5+
klass.instance_eval do
6+
config_param :sasl_aws_msk_iam_access_key_id, :string, :default => nil, secret: true,
7+
desc: "AWS access key Id for IAM authentication to MSK."
8+
config_param :sasl_aws_msk_iam_secret_key_id, :string, :default => nil, secret: true,
9+
desc: "AWS access key secret for IAM authentication to MSK."
10+
config_param :sasl_aws_msk_iam_aws_region, :string, :default => nil,
11+
desc: "AWS region for IAM authentication to MSK."
12+
end
13+
end
14+
end
15+
316
module SSLSettings
417
def self.included(klass)
518
klass.instance_eval {

lib/fluent/plugin/out_kafka2.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class Fluent::Kafka2Output < Output
9595
config_set_default :@type, 'json'
9696
end
9797

98+
include Fluent::KafkaPluginUtil::AwsIamSettings
9899
include Fluent::KafkaPluginUtil::SSLSettings
99100
include Fluent::KafkaPluginUtil::SaslSettings
100101

@@ -113,6 +114,7 @@ def initialize
113114
def refresh_client(raise_error = true)
114115
begin
115116
logger = @get_kafka_client_log ? log : nil
117+
use_long_lived_aws_credentials = @sasl_aws_msk_iam_access_key_id != nil && @sasl_aws_msk_iam_secret_key_id != nil
116118
if @scram_mechanism != nil && @username != nil && @password != nil
117119
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
118120
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
@@ -125,6 +127,26 @@ def refresh_client(raise_error = true)
125127
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
126128
ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
127129
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
130+
elsif use_long_lived_aws_credentials
131+
@kafka = Kafka.new(
132+
seed_brokers: @seed_brokers,
133+
client_id: @client_id,
134+
logger: logger,
135+
connect_timeout: @connect_timeout,
136+
socket_timeout: @socket_timeout,
137+
ssl_ca_cert_file_path: @ssl_ca_cert,
138+
ssl_client_cert: read_ssl_file(@ssl_client_cert),
139+
ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
140+
ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
141+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
142+
sasl_over_ssl: @sasl_over_ssl,
143+
ssl_verify_hostname: @ssl_verify_hostname,
144+
resolve_seed_brokers: @resolve_seed_brokers,
145+
sasl_aws_msk_iam_access_key_id: @sasl_aws_msk_iam_access_key_id,
146+
sasl_aws_msk_iam_secret_key_id: @sasl_aws_msk_iam_secret_key_id,
147+
sasl_aws_msk_iam_aws_region: @sasl_aws_msk_iam_aws_region,
148+
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function)
149+
)
128150
else
129151
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
130152
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),

0 commit comments

Comments
 (0)