Skip to content

Commit 4631ceb

Browse files
dlvenablewandna-amazon
authored andcommitted
Enable cross-region writes in the S3 sink. (opensearch-project#6323)
Signed-off-by: David Venable <dlv@amazon.com> Signed-off-by: Nathan Wand <wandna@amazon.com>
1 parent a37b576 commit 4631ceb

File tree

2 files changed

+14
-58
lines changed

2 files changed

+14
-58
lines changed

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactory.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3;
@@ -16,33 +20,23 @@
1620
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
1721
import software.amazon.awssdk.services.s3.S3AsyncClient;
1822
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
19-
import software.amazon.awssdk.services.s3.S3Client;
2023

2124
public final class ClientFactory {
2225
private ClientFactory() { }
2326

24-
static S3Client createS3Client(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
25-
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions());
26-
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
27-
28-
return S3Client.builder()
29-
.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())
30-
.credentialsProvider(awsCredentialsProvider)
31-
.overrideConfiguration(createOverrideConfiguration(s3SinkConfig)).build();
32-
}
33-
3427
static S3AsyncClient createS3AsyncClient(final S3SinkConfig s3SinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
3528
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(s3SinkConfig.getAwsAuthenticationOptions());
3629
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
3730

38-
S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
31+
final S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
3932
.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())
33+
.crossRegionAccessEnabled(true)
4034
.credentialsProvider(awsCredentialsProvider)
4135
.overrideConfiguration(createOverrideConfiguration(s3SinkConfig));
4236

4337
if (s3SinkConfig.getClientOptions() != null) {
4438
final ClientOptions clientOptions = s3SinkConfig.getClientOptions();
45-
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
39+
final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
4640
.connectionAcquisitionTimeout(clientOptions.getAcquireTimeout())
4741
.maxConcurrency(clientOptions.getMaxConnections())
4842
.build();

data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ClientFactoryTest.java

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
48
*/
59

610
package org.opensearch.dataprepper.plugins.sink.s3;
@@ -25,8 +29,6 @@
2529
import software.amazon.awssdk.services.s3.S3AsyncClient;
2630
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
2731
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
28-
import software.amazon.awssdk.services.s3.S3Client;
29-
import software.amazon.awssdk.services.s3.S3ClientBuilder;
3032

3133
import java.time.Duration;
3234
import java.util.Map;
@@ -62,14 +64,14 @@ void setUp() {
6264
@Test
6365
void createS3AsyncClient_with_real_S3AsyncClient() {
6466
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
65-
final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
67+
final S3AsyncClient s3Client = ClientFactory.createS3AsyncClient(s3SinkConfig, awsCredentialsSupplier);
6668

6769
assertThat(s3Client, notNullValue());
6870
}
6971

7072
@ParameterizedTest
7173
@ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"})
72-
void createS3Client_provides_correct_inputs(final String regionString) {
74+
void createS3AsyncClient_with_client_options_returns_expected_client(final String regionString) {
7375
final Region region = Region.of(regionString);
7476
final String stsRoleArn = UUID.randomUUID().toString();
7577
final String externalId = UUID.randomUUID().toString();
@@ -82,49 +84,9 @@ void createS3Client_provides_correct_inputs(final String regionString) {
8284
final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class);
8385
when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider);
8486

85-
final S3ClientBuilder s3ClientBuilder = mock(S3ClientBuilder.class);
86-
when(s3ClientBuilder.region(region)).thenReturn(s3ClientBuilder);
87-
when(s3ClientBuilder.credentialsProvider(any())).thenReturn(s3ClientBuilder);
88-
when(s3ClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3ClientBuilder);
89-
try(final MockedStatic<S3Client> s3ClientMockedStatic = mockStatic(S3Client.class)) {
90-
s3ClientMockedStatic.when(S3Client::builder)
91-
.thenReturn(s3ClientBuilder);
92-
ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
93-
}
94-
95-
final ArgumentCaptor<AwsCredentialsProvider> credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class);
96-
verify(s3ClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture());
97-
98-
final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue();
99-
100-
assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider));
101-
102-
final ArgumentCaptor<AwsCredentialsOptions> optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
103-
verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture());
104-
105-
final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue();
106-
assertThat(actualCredentialsOptions.getRegion(), equalTo(region));
107-
assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
108-
assertThat(actualCredentialsOptions.getStsExternalId(), equalTo(externalId));
109-
assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides));
110-
}
111-
112-
@Test
113-
void createS3AsyncClient_with_client_options_returns_expected_client() {
114-
final Region region = Region.of("us-east-1");
115-
final String stsRoleArn = UUID.randomUUID().toString();
116-
final String externalId = UUID.randomUUID().toString();
117-
final Map<String, String> stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
118-
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region);
119-
when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn);
120-
when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(externalId);
121-
when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides);
122-
123-
final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class);
124-
when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider);
125-
12687
final S3AsyncClientBuilder s3AsyncClientBuilder = mock(S3AsyncClientBuilder.class);
12788
when(s3AsyncClientBuilder.region(region)).thenReturn(s3AsyncClientBuilder);
89+
when(s3AsyncClientBuilder.crossRegionAccessEnabled(true)).thenReturn(s3AsyncClientBuilder);
12890
when(s3AsyncClientBuilder.credentialsProvider(any())).thenReturn(s3AsyncClientBuilder);
12991
when(s3AsyncClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(s3AsyncClientBuilder);
13092

0 commit comments

Comments
 (0)