@@ -87,6 +87,8 @@ def initialize
87
87
config_param :aws_iam_retries , :integer , default : nil , deprecated : "Use 'instance_profile_credentials' instead"
88
88
desc "S3 bucket name"
89
89
config_param :s3_bucket , :string
90
+ desc "Set bucket name fallback if fails fetching from placeholders"
91
+ config_param :s3_bucket_fallback , :string , :default => nil
90
92
desc "S3 region name"
91
93
config_param :s3_region , :string , default : ENV [ "AWS_REGION" ] || "us-east-1"
92
94
desc "Use 's3_region' instead"
@@ -249,11 +251,6 @@ def start
249
251
250
252
s3_client = Aws ::S3 ::Client . new ( options )
251
253
@s3 = Aws ::S3 ::Resource . new ( client : s3_client )
252
- @bucket = @s3 . bucket ( @s3_bucket )
253
-
254
- check_apikeys if @check_apikey_on_start
255
- ensure_bucket if @check_bucket
256
- ensure_bucket_lifecycle
257
254
258
255
super
259
256
end
@@ -264,6 +261,7 @@ def format(tag, time, record)
264
261
end
265
262
266
263
def write ( chunk )
264
+
267
265
i = 0
268
266
metadata = chunk . metadata
269
267
previous_path = nil
@@ -273,6 +271,40 @@ def write(chunk)
273
271
@time_slice_with_tz . call ( metadata . timekey )
274
272
end
275
273
274
+ bucket_name = nil
275
+
276
+ if @s3_bucket . match ( /\$ \{ .*\} / )
277
+
278
+ bucket_name = extract_placeholders ( @s3_bucket , chunk )
279
+
280
+ if bucket_name . match ( /\$ \{ .*\} / )
281
+
282
+ log . warn "Trying to use @s3_bucket_fallback as a fallback bucket name"
283
+
284
+ if @s3_bucket_fallback
285
+
286
+ bucket_name = @s3_bucket_fallback
287
+
288
+ else
289
+
290
+ raise "It was possible to extract_placeholder from @s3_bucket and there is no @s3_bucket_fallback set"
291
+
292
+ end
293
+
294
+ end
295
+
296
+ else
297
+
298
+ bucket_name = @s3_bucket
299
+
300
+ end
301
+
302
+ bucket = @s3 . bucket ( bucket_name )
303
+
304
+ check_apikeys ( bucket ) if @check_apikey_on_start
305
+ ensure_bucket ( bucket ) if @check_bucket
306
+ ensure_bucket_lifecycle ( bucket )
307
+
276
308
if @check_object
277
309
begin
278
310
@values_for_s3_object_chunk [ chunk . unique_id ] ||= {
@@ -304,7 +336,7 @@ def write(chunk)
304
336
305
337
i += 1
306
338
previous_path = s3path
307
- end while @ bucket. object ( s3path ) . exists?
339
+ end while bucket . object ( s3path ) . exists?
308
340
else
309
341
if @localtime
310
342
hms_slicer = Time . now . strftime ( "%H%M%S" )
@@ -362,18 +394,19 @@ def write(chunk)
362
394
put_options [ :metadata ] [ k ] = extract_placeholders ( v , chunk ) . gsub ( %r(%{[^}]+}) , { "%{index}" => sprintf ( @index_format , i - 1 ) } )
363
395
end
364
396
end
365
- @ bucket. object ( s3path ) . put ( put_options )
397
+ bucket . object ( s3path ) . put ( put_options )
366
398
367
399
@values_for_s3_object_chunk . delete ( chunk . unique_id )
368
400
369
401
if @warn_for_delay
370
402
if Time . at ( chunk . metadata . timekey ) < Time . now - @warn_for_delay
371
- log . warn "out_s3: delayed events were put to s3://#{ @s3_bucket } /#{ s3path } "
403
+ log . warn "out_s3: delayed events were put to s3://#{ bucket . name } /#{ s3path } "
372
404
end
373
405
end
374
406
ensure
375
407
tmp . close ( true ) rescue nil
376
408
end
409
+
377
410
end
378
411
379
412
private
@@ -399,40 +432,40 @@ def timekey_to_timeformat(timekey)
399
432
end
400
433
end
401
434
402
- def ensure_bucket
403
- if !@ bucket. exists?
435
+ def ensure_bucket ( bucket )
436
+ if !bucket . exists?
404
437
if @auto_create_bucket
405
- log . info "Creating bucket #{ @s3_bucket } on #{ @s3_endpoint } "
406
- @s3 . create_bucket ( bucket : @s3_bucket )
438
+ log . info "Creating bucket #{ bucket . name } on #{ @s3_endpoint } "
439
+ @s3 . create_bucket ( bucket : bucket . name )
407
440
else
408
- raise "The specified bucket does not exist: bucket = #{ @s3_bucket } "
441
+ raise "The specified bucket does not exist: bucket = #{ bucket . name } "
409
442
end
410
443
end
411
444
end
412
445
413
- def ensure_bucket_lifecycle
446
+ def ensure_bucket_lifecycle ( bucket )
414
447
unless @bucket_lifecycle_rules . empty?
415
- old_rules = get_bucket_lifecycle_rules
448
+ old_rules = get_bucket_lifecycle_rules ( bucket )
416
449
new_rules = @bucket_lifecycle_rules . sort_by { |rule | rule . id } . map do |rule |
417
450
{ id : rule . id , expiration : { days : rule . expiration_days } , prefix : rule . prefix , status : "Enabled" }
418
451
end
419
452
420
453
unless old_rules == new_rules
421
- log . info "Configuring bucket lifecycle rules for #{ @s3_bucket } on #{ @s3_endpoint } "
422
- @ bucket. lifecycle_configuration . put ( { lifecycle_configuration : { rules : new_rules } } )
454
+ log . info "Configuring bucket lifecycle rules for #{ bucket . name } on #{ @s3_endpoint } "
455
+ bucket . lifecycle_configuration . put ( { lifecycle_configuration : { rules : new_rules } } )
423
456
end
424
457
end
425
458
end
426
459
427
- def get_bucket_lifecycle_rules
460
+ def get_bucket_lifecycle_rules ( bucket )
428
461
begin
429
- @ bucket. lifecycle_configuration . rules . sort_by { |rule | rule [ :id ] } . map do |rule |
462
+ bucket . lifecycle_configuration . rules . sort_by { |rule | rule [ :id ] } . map do |rule |
430
463
{ id : rule [ :id ] , expiration : { days : rule [ :expiration ] [ :days ] } , prefix : rule [ :prefix ] , status : rule [ :status ] }
431
464
end
432
465
rescue Aws ::S3 ::Errors ::NoSuchLifecycleConfiguration
433
466
[ ]
434
467
end
435
- end
468
+ end
436
469
437
470
def process_s3_object_key_format
438
471
%W( %{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp} ) . each { |ph |
@@ -461,8 +494,8 @@ def check_s3_path_safety(conf)
461
494
end
462
495
end
463
496
464
- def check_apikeys
465
- @ bucket. objects ( prefix : @path , :max_keys => 1 ) . first
497
+ def check_apikeys ( bucket )
498
+ bucket . objects ( prefix : @path , :max_keys => 1 ) . first
466
499
rescue Aws ::S3 ::Errors ::NoSuchBucket
467
500
# ignore NoSuchBucket Error because ensure_bucket checks it.
468
501
rescue => e
0 commit comments