Skip to content

Commit b5151b4

Browse files
committed
Use bucket based on placeholder
Signed-off-by: rmontenegroo <[email protected]>
1 parent 26ce74e commit b5151b4

File tree

4 files changed

+79
-25
lines changed

4 files changed

+79
-25
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ vendor
1111
.ruby-version
1212

1313
test/tmp/
14+
15+
docker/

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.5.0
1+
1.5.0

lib/fluent/plugin/out_s3.rb

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ def initialize
8787
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
8888
desc "S3 bucket name"
8989
config_param :s3_bucket, :string
90+
desc "Set bucket name fallback if fails fetching from placeholders"
91+
config_param :s3_bucket_fallback, :string, :default => nil
9092
desc "S3 region name"
9193
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
9294
desc "Use 's3_region' instead"
@@ -249,11 +251,14 @@ def start
249251

250252
s3_client = Aws::S3::Client.new(options)
251253
@s3 = Aws::S3::Resource.new(client: s3_client)
252-
@bucket = @s3.bucket(@s3_bucket)
253254

254-
check_apikeys if @check_apikey_on_start
255-
ensure_bucket if @check_bucket
256-
ensure_bucket_lifecycle
255+
@tag_placeholders = get_placeholders_tag(@s3_bucket)
256+
@keys_placeholders = get_placeholders_keys(@s3_bucket)
257+
@time_placeholders = ext_get_placeholders_time(@s3_bucket)
258+
259+
if @tag_placeholders.empty? && @keys_placeholders.empty? && @time_placeholders.empty?
260+
@bucket = create_bucket(@s3_bucket)
261+
end
257262

258263
super
259264
end
@@ -263,7 +268,28 @@ def format(tag, time, record)
263268
@formatter.format(tag, time, r)
264269
end
265270

271+
def create_bucket(name)
272+
bucket = @s3.bucket(name)
273+
check_apikeys(bucket) if @check_apikey_on_start
274+
ensure_bucket(bucket) if @check_bucket
275+
ensure_bucket_lifecycle(bucket)
276+
bucket
277+
end
278+
279+
def use_fallback(placeholder)
280+
if !@s3_bucket_fallback
281+
raise "It was not possible to extract '#{placeholder}' placeholder from chunk and @s3_bucket_fallback is not set."
282+
end
283+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
284+
@s3_bucket_fallback
285+
end
286+
287+
def ext_get_placeholders_time(str)
288+
output = [ "%S", "%M", "%H", "%d", "%m", "%Y" ].select { |tp| str.include? tp }
289+
end
290+
266291
def write(chunk)
292+
267293
i = 0
268294
metadata = chunk.metadata
269295
previous_path = nil
@@ -273,6 +299,31 @@ def write(chunk)
273299
@time_slice_with_tz.call(metadata.timekey)
274300
end
275301

302+
bucket = @bucket ? @bucket : nil
303+
304+
if (!bucket) && (!@tag_placeholders.empty?)
305+
if (!chunk.metadata.tag) || ((@tag_placeholders.max + 1) > chunk.metadata.tag.split('.').length)
306+
bucket = create_bucket(use_fallback("tag"))
307+
end
308+
end
309+
310+
if !bucket
311+
@keys_placeholders.each do |placeholder|
312+
if (!chunk.metadata.variables) || (!chunk.metadata.variables.keys.include?(placeholder.to_sym))
313+
bucket = create_bucket(use_fallback(placeholder))
314+
break
315+
end
316+
end
317+
end
318+
319+
if (!bucket) && (!chunk.metadata.timekey) && @time_placeholders
320+
bucket = create_bucket(use_fallback("time"))
321+
end
322+
323+
if !bucket
324+
bucket = create_bucket(extract_placeholders(@s3_bucket, chunk))
325+
end
326+
276327
if @check_object
277328
begin
278329
@values_for_s3_object_chunk[chunk.unique_id] ||= {
@@ -304,7 +355,7 @@ def write(chunk)
304355

305356
i += 1
306357
previous_path = s3path
307-
end while @bucket.object(s3path).exists?
358+
end while bucket.object(s3path).exists?
308359
else
309360
if @localtime
310361
hms_slicer = Time.now.strftime("%H%M%S")
@@ -362,18 +413,19 @@ def write(chunk)
362413
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
363414
end
364415
end
365-
@bucket.object(s3path).put(put_options)
416+
bucket.object(s3path).put(put_options)
366417

367418
@values_for_s3_object_chunk.delete(chunk.unique_id)
368419

369420
if @warn_for_delay
370421
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
371-
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
422+
log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}"
372423
end
373424
end
374425
ensure
375426
tmp.close(true) rescue nil
376427
end
428+
377429
end
378430

379431
private
@@ -399,34 +451,34 @@ def timekey_to_timeformat(timekey)
399451
end
400452
end
401453

402-
def ensure_bucket
403-
if !@bucket.exists?
454+
def ensure_bucket(bucket)
455+
if !bucket.exists?
404456
if @auto_create_bucket
405-
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
406-
@s3.create_bucket(bucket: @s3_bucket)
457+
log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}"
458+
@s3.create_bucket(bucket: bucket.name)
407459
else
408-
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
460+
raise "The specified bucket does not exist: bucket = #{bucket.name}"
409461
end
410462
end
411463
end
412464

413-
def ensure_bucket_lifecycle
465+
def ensure_bucket_lifecycle(bucket)
414466
unless @bucket_lifecycle_rules.empty?
415-
old_rules = get_bucket_lifecycle_rules
467+
old_rules = get_bucket_lifecycle_rules(bucket)
416468
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
417469
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
418470
end
419471

420472
unless old_rules == new_rules
421-
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
422-
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
473+
log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}"
474+
bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
423475
end
424476
end
425477
end
426478

427-
def get_bucket_lifecycle_rules
479+
def get_bucket_lifecycle_rules(bucket)
428480
begin
429-
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
481+
bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
430482
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
431483
end
432484
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
@@ -461,8 +513,8 @@ def check_s3_path_safety(conf)
461513
end
462514
end
463515

464-
def check_apikeys
465-
@bucket.objects(prefix: @path, :max_keys => 1).first
516+
def check_apikeys(bucket)
517+
bucket.objects(prefix: @path, :max_keys => 1).first
466518
rescue Aws::S3::Errors::NoSuchBucket
467519
# ignore NoSuchBucket Error because ensure_bucket checks it.
468520
rescue => e

test/test_out_s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ def write(chunk)
4444

4545
private
4646

47-
def ensure_bucket
47+
def ensure_bucket(bucket)
4848
end
4949

50-
def check_apikeys
50+
def check_apikeys(bucket)
5151
end
5252
end.configure(conf)
5353
end
@@ -287,7 +287,7 @@ def write(chunk)
287287

288288
private
289289

290-
def check_apikeys
290+
def check_apikeys(bucket)
291291
end
292292
end.configure(conf)
293293
end
@@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
427427
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
428428
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
429429
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
430-
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
430+
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
431431
client: @s3_client))
432432
@s3_bucket.exists? { exists_return }
433433
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",

0 commit comments

Comments
 (0)