Skip to content

Commit beab8a4

Browse files
committed
Use bucket based on placeholder
Signed-off-by: rmontenegroo <[email protected]>
1 parent 140de4e commit beab8a4

File tree

2 files changed

+31
-29
lines changed

2 files changed

+31
-29
lines changed

lib/fluent/plugin/out_s3.rb

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,13 @@ def start
252252
s3_client = Aws::S3::Client.new(options)
253253
@s3 = Aws::S3::Resource.new(client: s3_client)
254254

255+
if not @s3_bucket =~ /\$\{.*\}/
256+
@bucket = @s3.bucket(@s3_bucket)
257+
check_apikeys(@bucket) if @check_apikey_on_start
258+
ensure_bucket(@bucket) if @check_bucket
259+
ensure_bucket_lifecycle(@bucket)
260+
end
261+
255262
super
256263
end
257264

@@ -270,41 +277,36 @@ def write(chunk)
270277
else
271278
@time_slice_with_tz.call(metadata.timekey)
272279
end
273-
280+
274281
bucket_name = nil
275282

276-
if @s3_bucket.match(/\$\{.*\}/)
277-
278-
bucket_name = extract_placeholders(@s3_bucket, chunk)
279-
280-
if bucket_name.match(/\$\{.*\}/)
281-
282-
log.warn "Trying to use @s3_bucket_fallback as a fallback bucket name"
283-
283+
if @s3_bucket =~ /\$\{.*\}/
284+
@s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder|
285+
placeholder = placeholder.join
286+
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
287+
log.warn "There is no placeholder '#{placeholder}'"
284288
if @s3_bucket_fallback
285-
286289
bucket_name = @s3_bucket_fallback
287-
290+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
291+
break
288292
else
289-
290-
raise "It was possible to extract_placeholder from @s3_bucket and there is no @s3_bucket_fallback set"
291-
293+
raise "It was possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set."
292294
end
293-
295+
end
294296
end
295-
296-
else
297297

298-
bucket_name = @s3_bucket
298+
if not bucket_name
299+
bucket_name = extract_placeholders(@s3_bucket, chunk)
300+
end
299301

302+
bucket = @s3.bucket(bucket_name)
303+
check_apikeys(bucket) if @check_apikey_on_start
304+
ensure_bucket(bucket) if @check_bucket
305+
ensure_bucket_lifecycle(bucket)
306+
else
307+
bucket = @bucket
300308
end
301309

302-
bucket = @s3.bucket(bucket_name)
303-
304-
check_apikeys(bucket) if @check_apikey_on_start
305-
ensure_bucket(bucket) if @check_bucket
306-
ensure_bucket_lifecycle(bucket)
307-
308310
if @check_object
309311
begin
310312
@values_for_s3_object_chunk[chunk.unique_id] ||= {
@@ -465,7 +467,7 @@ def get_bucket_lifecycle_rules(bucket)
465467
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
466468
[]
467469
end
468-
end
470+
end
469471

470472
def process_s3_object_key_format
471473
%W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph|

test/test_out_s3.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ def write(chunk)
4444

4545
private
4646

47-
def ensure_bucket
47+
def ensure_bucket(bucket)
4848
end
4949

50-
def check_apikeys
50+
def check_apikeys(bucket)
5151
end
5252
end.configure(conf)
5353
end
@@ -287,7 +287,7 @@ def write(chunk)
287287

288288
private
289289

290-
def check_apikeys
290+
def check_apikeys(bucket)
291291
end
292292
end.configure(conf)
293293
end
@@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
427427
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
428428
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
429429
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
430-
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
430+
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
431431
client: @s3_client))
432432
@s3_bucket.exists? { exists_return }
433433
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",

0 commit comments

Comments
 (0)