Skip to content

Commit 028a559

Browse files
committed
resolved merge conflicts with master
Signed-off-by: riquemon <[email protected]>
2 parents 7daaf27 + b3210b7 commit 028a559

File tree

6 files changed

+95
-23
lines changed

6 files changed

+95
-23
lines changed

ChangeLog

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
Release 1.3.3 - 2020/06/25
2+
3+
* Allow fips/gov included endpoint
4+
* Support sts_region parameter
5+
6+
Release 1.3.2 - 2020/05/18
7+
8+
* out_s3: Show warning message for object conflict case.
9+
10+
Release 1.3.1 - 2020/04/15
11+
12+
* out_s3: Support S3 Dual-Stack Endpoints in output plugin via enable_dual_stack parameter
13+
114
Release 1.3.0 - 2020/02/10
215

316
* in_s3/out_s3: Support AssumeRoleWebIdentityCredentials via `web_identity_credentials` section for EKS.

README.md

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,13 @@ We must setup SQS queue and S3 event notification before use this plugin.
3131

3232
Simply use RubyGems:
3333

34-
$ gem install fluent-plugin-s3 -v "~> 0.8" --no-document # for fluentd v0.12 or later
35-
$ gem install fluent-plugin-s3 -v 1.0.0 --no-document # for fluentd v1.0 or later
34+
# install latest version
35+
$ gem install fluent-plugin-s3 --no-document # for fluentd v1.0 or later
36+
# If you need to install specifiv version, use -v option
37+
$ gem install fluent-plugin-s3 -v 1.3.0 --no-document
38+
# For v0.12. This is for old v0.12 users. Don't use v0.12 for new deployment
39+
$ gem install fluent-plugin-s3 -v "~> 0.8" --no-document # for fluentd v0.12
40+
3641

3742
## Configuration: credentials
3843

@@ -301,6 +306,10 @@ See also AWS article: [Working with Regions](https://aws.amazon.com/blogs/develo
301306

302307
Enable [S3 Transfer Acceleration](https://docs.aws.amazon.com/AmazonS3/latest/dev/transfer-acceleration.html) for uploads. **IMPORTANT**: For this to work, you must first enable this feature on your destination S3 bucket.
303308

309+
**enable_dual_stack**
310+
311+
Enable [Amazon S3 Dual-Stack Endpoints](https://docs.aws.amazon.com/AmazonS3/latest/dev/dual-stack-endpoints.html) for uploads. Will make it possible to use either IPv4 or IPv6 when connecting to S3.
312+
304313
**use_bundled_cert**
305314

306315
For cases where the default SSL certificate is unavailable (e.g. Windows), you can set this option to true in order to use the AWS SDK bundled certificate. Default is false.
@@ -422,16 +431,16 @@ Change one line format in the S3 object. Supported formats are "out_file",
422431

423432

424433
At this format, "time" and "tag" are omitted. But you can set these
425-
information to the record by setting "include_tag_key" / "tag_key" and
426-
"include_time_key" / "time_key" option. If you set following configuration in
434+
information to the record by setting `<inject>` option. If you set following configuration in
427435
S3 output:
428436

429437
# v1
430438
<format>
431439
@type json
432-
include_time_key true
433-
time_key log_time # default is time
434440
</format>
441+
<inject>
442+
time_key log_time
443+
</inject>
435444
# v0.12
436445
@format json
437446
include_time_key true
@@ -441,15 +450,14 @@ then the record has log_time field.
441450

442451
{"log_time":"time string",...}
443452

453+
See also [official Inject Section article](https://docs.fluentd.org/configuration/inject-section).
454+
444455
* ltsv
445456

446457
key1:value1\tkey2:value2
447458
key1:value1\tkey2:value2
448459
...
449460

450-
451-
"ltsv" format also accepts "include_xxx" related options. See "json" section.
452-
453461
* single_value
454462

455463

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.3.0
1+
1.3.3

lib/fluent/plugin/in_s3.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,11 @@ def initialize
118118
def configure(conf)
119119
super
120120

121-
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
121+
if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
122122
raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
123123
end
124124

125-
if @sqs.endpoint && @sqs.endpoint.end_with?('amazonaws.com')
125+
if @sqs.endpoint && (@sqs.endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @sqs.endpoint.include?(e) })
126126
raise Fluent::ConfigError, "sqs/endpoint parameter is not supported for SQS, use s3_region instead. This parameter is for SQS compatible services"
127127
end
128128

lib/fluent/plugin/out_s3.rb

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ def initialize
3939
config_param :duration_seconds, :integer, default: nil
4040
desc "A unique identifier that is used by third parties when assuming roles in their customers' accounts."
4141
config_param :external_id, :string, default: nil, secret: true
42+
desc "The region of the STS endpoint to use."
43+
config_param :sts_region, :string, default: nil
44+
desc "A http proxy url for requests to aws sts service"
45+
config_param :sts_http_proxy, :string, default: nil, secret: true
46+
desc "A url for a regional sts api endpoint, the default is global"
47+
config_param :sts_endpoint_url, :string, default: nil ## STS_region
4248
end
4349
# See the following link for additional params that could be added:
4450
# https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/STS/Client.html#assume_role_with_web_identity-instance_method
@@ -53,10 +59,8 @@ def initialize
5359
config_param :policy, :string, default: nil
5460
desc "The duration, in seconds, of the role session (900-43200)"
5561
config_param :duration_seconds, :integer, default: nil
56-
desc "A http proxy url for requests to aws sts service"
57-
config_param :sts_http_proxy, :string, default: nil, secret: true
58-
desc "A url for a regional sts api endpoint, the default is global"
59-
config_param :sts_endpoint_url, :string, default: nil
62+
desc "The region of the STS endpoint to use."
63+
config_param :sts_region, :string, default: nil
6064
end
6165
config_section :instance_profile_credentials, multi: false do
6266
desc "Number of times to retry when retrieving credentials"
@@ -88,6 +92,8 @@ def initialize
8892
config_param :s3_endpoint, :string, default: nil
8993
desc "If true, S3 Transfer Acceleration will be enabled for uploads. IMPORTANT: You must first enable this feature on your destination S3 bucket"
9094
config_param :enable_transfer_acceleration, :bool, default: false
95+
desc "If true, use Amazon S3 Dual-Stack Endpoints. Will make it possible to use either IPv4 or IPv6 when connecting to S3."
96+
config_param :enable_dual_stack, :bool, default: false
9197
desc "If false, the certificate of endpoint will not be verified"
9298
config_param :ssl_verify_peer, :bool, :default => true
9399
desc "The format of S3 object keys"
@@ -173,7 +179,7 @@ def configure(conf)
173179

174180
Aws.use_bundled_cert! if @use_bundled_cert
175181

176-
if @s3_endpoint && @s3_endpoint.end_with?('amazonaws.com')
182+
if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
177183
raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
178184
end
179185

@@ -211,6 +217,8 @@ def configure(conf)
211217
end
212218
end
213219

220+
check_s3_path_safety(conf)
221+
214222
# For backward compatibility
215223
# TODO: Remove time_slice_format when end of support compat_parameters
216224
@configured_time_slice_format = conf['time_slice_format']
@@ -227,6 +235,7 @@ def start
227235
options[:region] = @s3_region if @s3_region
228236
options[:endpoint] = @s3_endpoint if @s3_endpoint
229237
options[:use_accelerate_endpoint] = @enable_transfer_acceleration
238+
options[:use_dualstack_endpoint] = @enable_dual_stack
230239
options[:http_proxy] = @proxy_uri if @proxy_uri
231240
options[:force_path_style] = @force_path_style
232241
options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
@@ -452,6 +461,16 @@ def process_s3_object_key_format
452461
}
453462
end
454463

464+
def check_s3_path_safety(conf)
465+
unless conf.has_key?('s3_object_key_format')
466+
log.warn "The default value of s3_object_key_format will use ${chunk_id} instead of %{index} to avoid object conflict in v2"
467+
end
468+
469+
if (@buffer_config.flush_thread_count > 1) && ['${chunk_id}', '%{uuid_flush}'].none? { |key| @s3_object_key_format.include?(key) }
470+
log.warn "No ${chunk_id} or %{uuid_flush} in s3_object_key_format with multiple flush threads. Recommend to set ${chunk_id} or %{uuid_flush} to avoid data lost by object conflict"
471+
end
472+
end
473+
455474
def check_apikeys
456475
@bucket.objects(prefix: @path, :max_keys => 1).first
457476
rescue Aws::S3::Errors::NoSuchBucket
@@ -475,6 +494,7 @@ def setup_credentials
475494
credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
476495
credentials_options[:external_id] = c.external_id if c.external_id
477496
credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url
497+
credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy
478498
if c.sts_http_proxy and c.sts_endpoint_url
479499
credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url)
480500
elsif @region and c.sts_http_proxy
@@ -485,11 +505,10 @@ def setup_credentials
485505
credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy)
486506
elsif c.sts_endpoint_url
487507
credentials_options[:client] = Aws::STS::Client.new(endpoint: c.sts_endpoint_url)
488-
elsif @region
489-
opt = @s3_region ? { region: @s3_region } : {}
490-
opt[:http_proxy] = c.sts_http_proxy if c.sts_http_proxy
491-
opt[:endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url
492-
credentials_options[:client] = Aws::STS::Client.new(**opt)
508+
elsif c.sts_region
509+
credentials_options[:client] = Aws::STS::Client.new(region: c.sts_region)
510+
elsif @s3_region
511+
credentials_options[:client] = Aws::STS::Client.new(region: @s3_region)
493512
end
494513
options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
495514
when @web_identity_credentials
@@ -499,7 +518,9 @@ def setup_credentials
499518
credentials_options[:web_identity_token_file] = c.web_identity_token_file
500519
credentials_options[:policy] = c.policy if c.policy
501520
credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
502-
if @s3_region
521+
if c.sts_region
522+
credentials_options[:client] = Aws::STS::Client.new(:region => c.sts_region)
523+
elsif @s3_region
503524
credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
504525
end
505526
options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)

test/test_out_s3.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,36 @@ def test_web_identity_credentials
593593
assert_equal(expected_credentials, credentials)
594594
end
595595

596+
def test_web_identity_credentials_with_sts_region
597+
expected_credentials = Aws::Credentials.new("test_key", "test_secret")
598+
sts_client = Aws::STS::Client.new(region: 'us-east-1')
599+
mock(Aws::STS::Client).new(region: 'us-east-1'){ sts_client }
600+
mock(Aws::AssumeRoleWebIdentityCredentials).new(
601+
role_arn: "test_arn",
602+
role_session_name: "test_session",
603+
web_identity_token_file: "test_file",
604+
client: sts_client
605+
){
606+
expected_credentials
607+
}
608+
609+
config = CONFIG_TIME_SLICE.split("\n").reject{|x| x =~ /.+aws_.+/}.join("\n")
610+
config += %[
611+
s3_region us-west-2
612+
<web_identity_credentials>
613+
role_arn test_arn
614+
role_session_name test_session
615+
web_identity_token_file test_file
616+
sts_region us-east-1
617+
</web_identity_credentials>
618+
]
619+
d = create_time_sliced_driver(config)
620+
assert_nothing_raised { d.run {} }
621+
client = d.instance.instance_variable_get(:@s3).client
622+
credentials = client.config.credentials
623+
assert_equal(expected_credentials, credentials)
624+
end
625+
596626
def test_instance_profile_credentials
597627
expected_credentials = Aws::Credentials.new("test_key", "test_secret")
598628
mock(Aws::InstanceProfileCredentials).new({}).returns(expected_credentials)

0 commit comments

Comments
 (0)