Skip to content

Commit 5cd9607

Browse files
committed
Allowing multiple s3 bucket names in the input s3 plugin
Added queue url to sqs section to override calling get_queue_url Changed the default tag to include the bucket name Signed-off-by: abdullahzen <[email protected]>
1 parent 26ce74e commit 5cd9607

File tree

1 file changed

+37
-11
lines changed

1 file changed

+37
-11
lines changed

lib/fluent/plugin/in_s3.rb

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def initialize
7676
desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']"
7777
config_param :profile_name, :string, default: nil
7878
end
79-
desc "S3 bucket name"
80-
config_param :s3_bucket, :string
79+
desc "S3 bucket name(s) separated by commas in case of multiple bucket names"
80+
config_param :s3_buckets, :string
8181
desc "S3 region name"
8282
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
8383
desc "Use 's3_region' instead"
@@ -96,6 +96,8 @@ def initialize
9696
config_param :queue_name, :string, default: nil
9797
desc "SQS Owner Account ID"
9898
config_param :queue_owner_aws_account_id, :string, default: nil
99+
desc "SQS queue url, when passed it'll not get the queue URL by name & account ID"
100+
config_param :queue_url, :string, default: nil
99101
desc "Use 's3_region' instead"
100102
config_param :endpoint, :string, default: nil
101103
desc "Skip message deletion"
@@ -106,6 +108,7 @@ def initialize
106108
config_param :retry_error_interval, :integer, default: 300
107109
end
108110

111+
# Default tag will include input.s3.bucket_name ###Check the function process(body)
109112
desc "Tag string"
110113
config_param :tag, :string, default: "input.s3"
111114

@@ -145,16 +148,29 @@ def start
145148
s3_client = create_s3_client
146149
log.debug("Succeeded to create S3 client")
147150
@s3 = Aws::S3::Resource.new(client: s3_client)
148-
@bucket = @s3.bucket(@s3_bucket)
149-
150-
raise "#{@bucket.name} is not found." unless @bucket.exists?
151+
@buckets = {}
152+
if (@s3_buckets.include?(","))
153+
splitted_buckets = @s3_buckets.split(',')
154+
splitted_buckets.each do | bucket |
155+
@buckets[bucket] = @s3.bucket(bucket)
156+
raise "#{bucket} is not found." unless @buckets[bucket].exists?
157+
end
158+
else
159+
@buckets[bucket] = @s3.bucket(@s3_buckets)
160+
raise "#{bucket} is not found." unless @buckets[bucket].exists?
161+
end
151162

152163
check_apikeys if @check_apikey_on_start
153164

154165
sqs_client = create_sqs_client
155166
log.debug("Succeeded to create SQS client")
156-
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
157-
sqs_queue_url = response.queue_url
167+
sqs_queue_url = nil
168+
if (@sqs.queue_url.nil?)
169+
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
170+
sqs_queue_url = response.queue_url
171+
else
172+
sqs_queue_url = @sqs.queue_url
173+
end
158174
log.debug("Succeeded to get SQS queue URL")
159175

160176
@poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)
@@ -279,29 +295,39 @@ def create_sqs_client
279295
end
280296

281297
def check_apikeys
282-
@bucket.objects.first
283-
log.debug("Succeeded to verify API keys")
298+
@buckets.each do | bucket_name, bucket_object |
299+
bucket_object.objects.first
300+
log.debug("Succeeded to verify API keys for bucket #{bucket_name}")
301+
end
284302
rescue => e
285303
raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
286304
end
287305

288306
def process(body)
289307
s3 = body["Records"].first["s3"]
290308
raw_key = s3["object"]["key"]
309+
raw_bucket_name = s3["bucket"]["name"]
291310
key = CGI.unescape(raw_key)
292311

293-
io = @bucket.object(key).get.body
312+
if (!@buckets.key?(raw_bucket_name))
313+
raise "S3 bucket name: #{raw_bucket_name} returned from SQS was not provided in the input configuration as one of the s3 fluentd sources."
314+
end
315+
316+
io = @buckets[raw_bucket_name].object(key).get.body
294317
content = @extractor.extract(io)
295318
es = Fluent::MultiEventStream.new
296319
content.each_line do |line|
297320
@parser.parse(line) do |time, record|
298321
if @add_object_metadata
299-
record['s3_bucket'] = @s3_bucket
322+
record['s3_bucket'] = raw_bucket_name
300323
record['s3_key'] = raw_key
301324
end
302325
es.add(time, record)
303326
end
304327
end
328+
if (@tag == "input.s3")
329+
@tag = "input.s3.#{raw_bucket_name}"
330+
end
305331
router.emit_stream(@tag, es)
306332
end
307333

0 commit comments

Comments
 (0)