Skip to content

Commit 701911b

Browse files
committed
Use bucket based on placeholder
Signed-off-by: rmontenegroo <[email protected]>
1 parent 26ce74e commit 701911b

File tree

4 files changed

+62
-25
lines changed

4 files changed

+62
-25
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ vendor
1111
.ruby-version
1212

1313
test/tmp/
14+
15+
docker/

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.5.0
1+
1.5.0

lib/fluent/plugin/out_s3.rb

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ def initialize
8787
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
8888
desc "S3 bucket name"
8989
config_param :s3_bucket, :string
90+
desc "Set bucket name fallback if fails fetching from placeholders"
91+
config_param :s3_bucket_fallback, :string, :default => nil
9092
desc "S3 region name"
9193
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
9294
desc "Use 's3_region' instead"
@@ -249,11 +251,13 @@ def start
249251

250252
s3_client = Aws::S3::Client.new(options)
251253
@s3 = Aws::S3::Resource.new(client: s3_client)
252-
@bucket = @s3.bucket(@s3_bucket)
253254

254-
check_apikeys if @check_apikey_on_start
255-
ensure_bucket if @check_bucket
256-
ensure_bucket_lifecycle
255+
if not @s3_bucket =~ /\$\{.*\}/
256+
@bucket = @s3.bucket(@s3_bucket)
257+
check_apikeys(@bucket) if @check_apikey_on_start
258+
ensure_bucket(@bucket) if @check_bucket
259+
ensure_bucket_lifecycle(@bucket)
260+
end
257261

258262
super
259263
end
@@ -264,6 +268,7 @@ def format(tag, time, record)
264268
end
265269

266270
def write(chunk)
271+
267272
i = 0
268273
metadata = chunk.metadata
269274
previous_path = nil
@@ -272,6 +277,35 @@ def write(chunk)
272277
else
273278
@time_slice_with_tz.call(metadata.timekey)
274279
end
280+
281+
bucket_name = nil
282+
283+
if @s3_bucket =~ /\$\{.*\}/
284+
@s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder|
285+
placeholder = placeholder.join
286+
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
287+
log.warn "There is no placeholder '#{placeholder}'"
288+
if @s3_bucket_fallback
289+
bucket_name = @s3_bucket_fallback
290+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
291+
break
292+
else
293+
raise "It was not possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set."
294+
end
295+
end
296+
end
297+
298+
if not bucket_name
299+
bucket_name = extract_placeholders(@s3_bucket, chunk)
300+
end
301+
302+
bucket = @s3.bucket(bucket_name)
303+
check_apikeys(bucket) if @check_apikey_on_start
304+
ensure_bucket(bucket) if @check_bucket
305+
ensure_bucket_lifecycle(bucket)
306+
else
307+
bucket = @bucket
308+
end
275309

276310
if @check_object
277311
begin
@@ -304,7 +338,7 @@ def write(chunk)
304338

305339
i += 1
306340
previous_path = s3path
307-
end while @bucket.object(s3path).exists?
341+
end while bucket.object(s3path).exists?
308342
else
309343
if @localtime
310344
hms_slicer = Time.now.strftime("%H%M%S")
@@ -362,18 +396,19 @@ def write(chunk)
362396
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
363397
end
364398
end
365-
@bucket.object(s3path).put(put_options)
399+
bucket.object(s3path).put(put_options)
366400

367401
@values_for_s3_object_chunk.delete(chunk.unique_id)
368402

369403
if @warn_for_delay
370404
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
371-
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
405+
log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}"
372406
end
373407
end
374408
ensure
375409
tmp.close(true) rescue nil
376410
end
411+
377412
end
378413

379414
private
@@ -399,34 +434,34 @@ def timekey_to_timeformat(timekey)
399434
end
400435
end
401436

402-
def ensure_bucket
403-
if !@bucket.exists?
437+
def ensure_bucket(bucket)
438+
if !bucket.exists?
404439
if @auto_create_bucket
405-
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
406-
@s3.create_bucket(bucket: @s3_bucket)
440+
log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}"
441+
@s3.create_bucket(bucket: bucket.name)
407442
else
408-
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
443+
raise "The specified bucket does not exist: bucket = #{bucket.name}"
409444
end
410445
end
411446
end
412447

413-
def ensure_bucket_lifecycle
448+
def ensure_bucket_lifecycle(bucket)
414449
unless @bucket_lifecycle_rules.empty?
415-
old_rules = get_bucket_lifecycle_rules
450+
old_rules = get_bucket_lifecycle_rules(bucket)
416451
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
417452
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
418453
end
419454

420455
unless old_rules == new_rules
421-
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
422-
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
456+
log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}"
457+
bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
423458
end
424459
end
425460
end
426461

427-
def get_bucket_lifecycle_rules
462+
def get_bucket_lifecycle_rules(bucket)
428463
begin
429-
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
464+
bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
430465
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
431466
end
432467
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
@@ -461,8 +496,8 @@ def check_s3_path_safety(conf)
461496
end
462497
end
463498

464-
def check_apikeys
465-
@bucket.objects(prefix: @path, :max_keys => 1).first
499+
def check_apikeys(bucket)
500+
bucket.objects(prefix: @path, :max_keys => 1).first
466501
rescue Aws::S3::Errors::NoSuchBucket
467502
# ignore NoSuchBucket Error because ensure_bucket checks it.
468503
rescue => e

test/test_out_s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ def write(chunk)
4444

4545
private
4646

47-
def ensure_bucket
47+
def ensure_bucket(bucket)
4848
end
4949

50-
def check_apikeys
50+
def check_apikeys(bucket)
5151
end
5252
end.configure(conf)
5353
end
@@ -287,7 +287,7 @@ def write(chunk)
287287

288288
private
289289

290-
def check_apikeys
290+
def check_apikeys(bucket)
291291
end
292292
end.configure(conf)
293293
end
@@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
427427
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
428428
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
429429
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
430-
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
430+
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
431431
client: @s3_client))
432432
@s3_bucket.exists? { exists_return }
433433
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",

0 commit comments

Comments
 (0)