|
20 | 20 | raise "unable to patch rdkafka." |
21 | 21 | end |
22 | 22 |
|
| 23 | +if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0') |
| 24 | + require 'aws-msk-iam-sasl-signer' |
| 25 | +end |
| 26 | + |
23 | 27 | module Fluent::Plugin |
24 | 28 | class Fluent::Rdkafka2Output < Output |
25 | 29 | Fluent::Plugin.register_output('rdkafka2', self) |
@@ -100,6 +104,7 @@ class Fluent::Rdkafka2Output < Output |
100 | 104 | config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name' |
101 | 105 | config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], |
102 | 106 | :desc => 'Handle some of the error codes should be unrecoverable if specified' |
| 107 | + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' |
103 | 108 |
|
104 | 109 | config_section :buffer do |
105 | 110 | config_set_default :chunk_keys, ["topic"] |
@@ -209,6 +214,10 @@ def add(level, message = nil) |
209 | 214 | config = build_config |
210 | 215 | @rdkafka = Rdkafka::Config.new(config) |
211 | 216 |
|
| 217 | + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" |
| 218 | + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) |
| 219 | + end |
| 220 | + |
212 | 221 | if @default_topic.nil? |
213 | 222 | if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error |
214 | 223 | raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" |
@@ -296,9 +305,39 @@ def build_config |
296 | 305 | config |
297 | 306 | end |
298 | 307 |
|
| 308 | + def refresh_token(_config, _client_name) |
| 309 | + log.info("+--- Refreshing token") |
| 310 | + client = get_producer |
| 311 | + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet |
| 312 | + # We will set the token manually after creation and after that this refresh method will work |
| 313 | + unless client |
| 314 | + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") |
| 315 | + return |
| 316 | + end |
| 317 | + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) |
| 318 | + token = signer.generate_auth_token |
| 319 | + |
| 320 | + if token |
| 321 | + client.oauthbearer_set_token( |
| 322 | + token: token.token, |
| 323 | + lifetime_ms: token.expiration_time_ms, |
| 324 | + principal_name: "kafka-cluster" |
| 325 | + ) |
| 326 | + else |
| 327 | + client.oauthbearer_set_token_failure( |
| 328 | + "Failed to generate token." |
| 329 | + ) |
| 330 | + end |
| 331 | + end |
| 332 | + |
299 | 333 | def start |
300 | 334 | if @share_producer |
301 | 335 | @shared_producer = @rdkafka.producer |
| 336 | + log.info("Created shared producer") |
| 337 | + if @aws_msk_region |
| 338 | + refresh_token(nil, nil) |
| 339 | + log.info("Set initial token for shared producer") |
| 340 | + end |
302 | 341 | else |
303 | 342 | @producers = {} |
304 | 343 | @producers_mutex = Mutex.new |
|
0 commit comments