Skip to content

Commit c69893c

Browse files
committed
Use bucket based on placeholder
Signed-off-by: rmontenegroo <[email protected]>
1 parent 26ce74e commit c69893c

File tree

4 files changed

+108
-25
lines changed

4 files changed

+108
-25
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ vendor
1111
.ruby-version
1212

1313
test/tmp/
14+
15+
docker/

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.5.0
1+
1.5.0

lib/fluent/plugin/out_s3.rb

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ def initialize
8787
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
8888
desc "S3 bucket name"
8989
config_param :s3_bucket, :string
90+
desc "Set bucket name fallback if fails fetching from placeholders"
91+
config_param :s3_bucket_fallback, :string, :default => nil
9092
desc "S3 region name"
9193
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
9294
desc "Use 's3_region' instead"
@@ -249,11 +251,14 @@ def start
249251

250252
s3_client = Aws::S3::Client.new(options)
251253
@s3 = Aws::S3::Resource.new(client: s3_client)
252-
@bucket = @s3.bucket(@s3_bucket)
253254

254-
check_apikeys if @check_apikey_on_start
255-
ensure_bucket if @check_bucket
256-
ensure_bucket_lifecycle
255+
@tag_placeholders = get_placeholders_tag(@s3_bucket)
256+
@keys_placeholders = get_placeholders_keys(@s3_bucket)
257+
@time_placeholders = ext_get_placeholders_time(@s3_bucket)
258+
259+
if @tag_placeholders.empty? and @keys_placeholders.empty? and @time_placeholders.empty?
260+
@bucket = create_bucket(@s3_bucket)
261+
end
257262

258263
super
259264
end
@@ -263,7 +268,28 @@ def format(tag, time, record)
263268
@formatter.format(tag, time, r)
264269
end
265270

271+
def create_bucket(name)
272+
bucket = @s3.bucket(name)
273+
check_apikeys(bucket) if @check_apikey_on_start
274+
ensure_bucket(bucket) if @check_bucket
275+
ensure_bucket_lifecycle(bucket)
276+
bucket
277+
end
278+
279+
def use_fallback(placeholder)
280+
if not @s3_bucket_fallback
281+
raise "It was not possible to extract '#{placeholder}' placeholder from chunk and @s3_bucket_fallback is not set."
282+
end
283+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
284+
@s3_bucket_fallback
285+
end
286+
287+
def ext_get_placeholders_time(str)
288+
output = [ "%S", "%M", "%H", "%d", "%m", "%Y" ].select { |tp| str.include? tp }
289+
end
290+
266291
def write(chunk)
292+
267293
i = 0
268294
metadata = chunk.metadata
269295
previous_path = nil
@@ -272,6 +298,60 @@ def write(chunk)
272298
else
273299
@time_slice_with_tz.call(metadata.timekey)
274300
end
301+
302+
bucket_name = nil
303+
304+
if @s3_bucket =~ /\$\{.*\}/
305+
@s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder|
306+
placeholder = placeholder.join
307+
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
308+
log.warn "There is no placeholder '#{placeholder}'"
309+
if @s3_bucket_fallback
310+
bucket_name = @s3_bucket_fallback
311+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
312+
break
313+
else
314+
raise "It was possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set."
315+
end
316+
end
317+
end
318+
319+
if not bucket_name
320+
bucket_name = extract_placeholders(@s3_bucket, chunk)
321+
end
322+
323+
bucket = @s3.bucket(bucket_name)
324+
check_apikeys(bucket) if @check_apikey_on_start
325+
ensure_bucket(bucket) if @check_bucket
326+
ensure_bucket_lifecycle(bucket)
327+
else
328+
bucket = @bucket
329+
end
330+
331+
bucket = @bucket ? @bucket : nil
332+
333+
if (not bucket) and (not @tag_placeholders.empty?)
334+
if (not chunk.metadata.tag) or ((@tag_placeholders.max + 1) > chunk.metadata.tag.split('.').length)
335+
bucket = create_bucket(use_fallback("tag"))
336+
end
337+
end
338+
339+
if not bucket
340+
@keys_placeholders.each do |placeholder|
341+
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
342+
bucket = create_bucket(use_fallback(placeholder))
343+
break
344+
end
345+
end
346+
end
347+
348+
if (not bucket) and (not chunk.metadata.timekey) and @time_placeholders
349+
bucket = create_bucket(use_fallback("time"))
350+
end
351+
352+
if not bucket
353+
bucket = create_bucket(extract_placeholders(@s3_bucket, chunk))
354+
end
275355

276356
if @check_object
277357
begin
@@ -304,7 +384,7 @@ def write(chunk)
304384

305385
i += 1
306386
previous_path = s3path
307-
end while @bucket.object(s3path).exists?
387+
end while bucket.object(s3path).exists?
308388
else
309389
if @localtime
310390
hms_slicer = Time.now.strftime("%H%M%S")
@@ -362,18 +442,19 @@ def write(chunk)
362442
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
363443
end
364444
end
365-
@bucket.object(s3path).put(put_options)
445+
bucket.object(s3path).put(put_options)
366446

367447
@values_for_s3_object_chunk.delete(chunk.unique_id)
368448

369449
if @warn_for_delay
370450
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
371-
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
451+
log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}"
372452
end
373453
end
374454
ensure
375455
tmp.close(true) rescue nil
376456
end
457+
377458
end
378459

379460
private
@@ -399,34 +480,34 @@ def timekey_to_timeformat(timekey)
399480
end
400481
end
401482

402-
def ensure_bucket
403-
if !@bucket.exists?
483+
def ensure_bucket(bucket)
484+
if !bucket.exists?
404485
if @auto_create_bucket
405-
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
406-
@s3.create_bucket(bucket: @s3_bucket)
486+
log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}"
487+
@s3.create_bucket(bucket: bucket.name)
407488
else
408-
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
489+
raise "The specified bucket does not exist: bucket = #{bucket.name}"
409490
end
410491
end
411492
end
412493

413-
def ensure_bucket_lifecycle
494+
def ensure_bucket_lifecycle(bucket)
414495
unless @bucket_lifecycle_rules.empty?
415-
old_rules = get_bucket_lifecycle_rules
496+
old_rules = get_bucket_lifecycle_rules(bucket)
416497
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
417498
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
418499
end
419500

420501
unless old_rules == new_rules
421-
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
422-
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
502+
log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}"
503+
bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
423504
end
424505
end
425506
end
426507

427-
def get_bucket_lifecycle_rules
508+
def get_bucket_lifecycle_rules(bucket)
428509
begin
429-
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
510+
bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
430511
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
431512
end
432513
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
@@ -461,8 +542,8 @@ def check_s3_path_safety(conf)
461542
end
462543
end
463544

464-
def check_apikeys
465-
@bucket.objects(prefix: @path, :max_keys => 1).first
545+
def check_apikeys(bucket)
546+
bucket.objects(prefix: @path, :max_keys => 1).first
466547
rescue Aws::S3::Errors::NoSuchBucket
467548
# ignore NoSuchBucket Error because ensure_bucket checks it.
468549
rescue => e

test/test_out_s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ def write(chunk)
4444

4545
private
4646

47-
def ensure_bucket
47+
def ensure_bucket(bucket)
4848
end
4949

50-
def check_apikeys
50+
def check_apikeys(bucket)
5151
end
5252
end.configure(conf)
5353
end
@@ -287,7 +287,7 @@ def write(chunk)
287287

288288
private
289289

290-
def check_apikeys
290+
def check_apikeys(bucket)
291291
end
292292
end.configure(conf)
293293
end
@@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
427427
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
428428
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
429429
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
430-
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
430+
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
431431
client: @s3_client))
432432
@s3_bucket.exists? { exists_return }
433433
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",

0 commit comments

Comments
 (0)