|
4 | 4 | require 'fluent/plugin/kafka_plugin_util' |
5 | 5 |
|
6 | 6 | require 'rdkafka' |
| 7 | +require 'aws_msk_iam_sasl_signer' |
7 | 8 |
|
8 | 9 | begin |
9 | 10 | rdkafka_version = Gem::Version::create(Rdkafka::VERSION) |
@@ -101,6 +102,8 @@ class Fluent::Rdkafka2Output < Output |
101 | 102 | config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], |
102 | 103 | :desc => 'Handle some of the error codes should be unrecoverable if specified' |
103 | 104 |
|
| 105 | + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' |
| 106 | + |
104 | 107 | config_section :buffer do |
105 | 108 | config_set_default :chunk_keys, ["topic"] |
106 | 109 | end |
@@ -205,10 +208,17 @@ def add(level, message = nil) |
205 | 208 | end |
206 | 209 | end |
207 | 210 | } |
| 211 | + # HERE ----------------- |
208 | 212 | Rdkafka::Config.logger = log |
209 | 213 | config = build_config |
210 | 214 | @rdkafka = Rdkafka::Config.new(config) |
211 | 215 |
|
| 216 | + |
| 217 | + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" |
| 218 | + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) |
| 219 | + end |
| 220 | + # HERE ----------------- |
| 221 | + |
212 | 222 | if @default_topic.nil? |
213 | 223 | if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error |
214 | 224 | raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" |
@@ -289,16 +299,47 @@ def build_config |
289 | 299 | config[:"sasl.password"] = @password if @password |
290 | 300 | config[:"enable.idempotence"] = @idempotent if @idempotent |
291 | 301 |
|
| 302 | + # sasl.mechnisms and security.protocol are set as rdkafka_options |
292 | 303 | @rdkafka_options.each { |k, v| |
293 | 304 | config[k.to_sym] = v |
294 | 305 | } |
295 | 306 |
|
296 | 307 | config |
297 | 308 | end |
298 | 309 |
|
| 310 | + def refresh_token(_config, _client_name) |
| 311 | + log.info("+--- Refreshing token") |
| 312 | + client = get_producer |
| 313 | + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet |
| 314 | + # We will set the token manually after creation and after that this refresh method will work |
| 315 | + unless client |
| 316 | + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") |
| 317 | + return |
| 318 | + end |
| 319 | + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) |
| 320 | + token = signer.generate_auth_token |
| 321 | + |
| 322 | + if token |
| 323 | + client.oauthbearer_set_token( |
| 324 | + token: token.token, |
| 325 | + lifetime_ms: token.expiration_time_ms, |
| 326 | + principal_name: "kafka-cluster" |
| 327 | + ) |
| 328 | + else |
| 329 | + client.oauthbearer_set_token_failure( |
| 330 | + "Failed to generate token." |
| 331 | + ) |
| 332 | + end |
| 333 | + end |
| 334 | + |
299 | 335 | def start |
300 | 336 | if @share_producer |
301 | 337 | @shared_producer = @rdkafka.producer |
| 338 | + log.info("Created shared producer") |
| 339 | + if @aws_msk_region |
| 340 | + refresh_token(nil, nil) |
| 341 | + log.info("Set initial token for shared producer") |
| 342 | + end |
302 | 343 | else |
303 | 344 | @producers = {} |
304 | 345 | @producers_mutex = Mutex.new |
|
0 commit comments