Skip to content

Commit e6c72a7

Browse files
committed
Use bucket based on placeholder
Signed-off-by: rmontenegroo <[email protected]>
2 parents b0bd8ba + 701911b commit e6c72a7

File tree

1 file changed

+44
-27
lines changed

1 file changed

+44
-27
lines changed

lib/fluent/plugin/out_s3.rb

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,12 @@ def start
252252
s3_client = Aws::S3::Client.new(options)
253253
@s3 = Aws::S3::Resource.new(client: s3_client)
254254

255-
if not @s3_bucket =~ /\$\{.*\}/
256-
@bucket = @s3.bucket(@s3_bucket)
257-
check_apikeys(@bucket) if @check_apikey_on_start
258-
ensure_bucket(@bucket) if @check_bucket
259-
ensure_bucket_lifecycle(@bucket)
255+
@tag_placeholders = get_placeholders_tag(@s3_bucket)
256+
@keys_placeholders = get_placeholders_keys(@s3_bucket)
257+
@time_placeholders = ext_get_placeholders_time(@s3_bucket)
258+
259+
if @tag_placeholders.empty? and @keys_placeholders.empty? and @time_placeholders.empty?
260+
@bucket = create_bucket(@s3_bucket)
260261
end
261262

262263
super
@@ -267,6 +268,26 @@ def format(tag, time, record)
267268
@formatter.format(tag, time, r)
268269
end
269270

271+
def create_bucket(name)
272+
bucket = @s3.bucket(name)
273+
check_apikeys(bucket) if @check_apikey_on_start
274+
ensure_bucket(bucket) if @check_bucket
275+
ensure_bucket_lifecycle(bucket)
276+
bucket
277+
end
278+
279+
def use_fallback(placeholder)
280+
if not @s3_bucket_fallback
281+
raise "It was not possible to extract '#{placeholder}' placeholder from chunk and @s3_bucket_fallback is not set."
282+
end
283+
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
284+
@s3_bucket_fallback
285+
end
286+
287+
def ext_get_placeholders_time(str)
288+
output = [ "%S", "%M", "%H", "%d", "%m", "%Y" ].select { |tp| str.include? tp }
289+
end
290+
270291
def write(chunk)
271292

272293
i = 0
@@ -277,34 +298,30 @@ def write(chunk)
277298
else
278299
@time_slice_with_tz.call(metadata.timekey)
279300
end
280-
281-
bucket_name = nil
282301

283-
if @s3_bucket =~ /\$\{.*\}/
284-
@s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder|
285-
placeholder = placeholder.join
302+
bucket = @bucket ? @bucket : nil
303+
304+
if (not bucket) and (not @tag_placeholders.empty?)
305+
if (not chunk.metadata.tag) or ((@tag_placeholders.max + 1) > chunk.metadata.tag.split('.').length)
306+
bucket = create_bucket(use_fallback("tag"))
307+
end
308+
end
309+
310+
if not bucket
311+
@keys_placeholders.each do |placeholder|
286312
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
287-
log.warn "There is no placeholder '#{placeholder}'"
288-
if @s3_bucket_fallback
289-
bucket_name = @s3_bucket_fallback
290-
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
291-
break
292-
else
293-
raise "It was not possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set."
294-
end
313+
bucket = create_bucket(use_fallback(placeholder))
314+
break
295315
end
296316
end
317+
end
297318

298-
if not bucket_name
299-
bucket_name = extract_placeholders(@s3_bucket, chunk)
300-
end
319+
if (not bucket) and (not chunk.metadata.timekey) and @time_placeholders
320+
bucket = create_bucket(use_fallback("time"))
321+
end
301322

302-
bucket = @s3.bucket(bucket_name)
303-
check_apikeys(bucket) if @check_apikey_on_start
304-
ensure_bucket(bucket) if @check_bucket
305-
ensure_bucket_lifecycle(bucket)
306-
else
307-
bucket = @bucket
323+
if not bucket
324+
bucket = create_bucket(extract_placeholders(@s3_bucket, chunk))
308325
end
309326

310327
if @check_object

0 commit comments

Comments
 (0)