Skip to content

Commit f83ee8c

Browse files
feat: add the V2 Async S3 Client (#4396)
* feat: add the V2 Async S3 Client * apply formatting
1 parent 77bf119 commit f83ee8c

File tree

5 files changed

+57
-8
lines changed

5 files changed

+57
-8
lines changed

backend/common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@
198198
<groupId>com.amazonaws</groupId>
199199
<artifactId>aws-java-sdk-sts</artifactId>
200200
</dependency>
201+
<dependency>
202+
<groupId>software.amazon.awssdk</groupId>
203+
<artifactId>s3</artifactId>
204+
</dependency>
201205
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage -->
202206
<dependency>
203207
<groupId>com.google.cloud</groupId>

backend/common/src/main/java/ai/verta/modeldb/common/artifactStore/storageservice/s3/RefCountedS3Client.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,35 @@
55
import java.util.concurrent.atomic.AtomicInteger;
66
import org.apache.logging.log4j.LogManager;
77
import org.apache.logging.log4j.Logger;
8+
import software.amazon.awssdk.services.s3.S3AsyncClient;
89

910
public class RefCountedS3Client implements AutoCloseable {
1011
private static final Logger LOGGER = LogManager.getLogger(RefCountedS3Client.class);
1112
private final AWSCredentials credentials;
1213
private final AmazonS3 s3Client;
14+
private final S3AsyncClient asyncClient;
1315
private final AtomicInteger referenceCounter;
1416

15-
RefCountedS3Client(AWSCredentials credentials, AmazonS3 client, AtomicInteger counter) {
17+
RefCountedS3Client(
18+
AWSCredentials credentials,
19+
AmazonS3 client,
20+
S3AsyncClient asyncClient,
21+
AtomicInteger counter) {
1622
this.credentials = credentials;
17-
s3Client = client;
18-
referenceCounter = counter;
19-
referenceCounter.incrementAndGet();
23+
this.s3Client = client;
24+
this.asyncClient = asyncClient;
25+
this.referenceCounter = counter;
26+
this.referenceCounter.incrementAndGet();
2027
}
2128

2229
public AmazonS3 getClient() {
2330
return s3Client;
2431
}
2532

33+
public S3AsyncClient getAsyncClient() {
34+
return asyncClient;
35+
}
36+
2637
public AWSCredentials getCredentials() {
2738
return credentials;
2839
}
@@ -34,6 +45,9 @@ public void close() {
3445
LOGGER.debug("shutting client down");
3546
s3Client.shutdown();
3647
}
48+
if (asyncClient != null) {
49+
asyncClient.close();
50+
}
3751
}
3852
}
3953
}

backend/common/src/main/java/ai/verta/modeldb/common/artifactStore/storageservice/s3/RefreshS3ClientCron.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.UUID;
1717
import org.apache.logging.log4j.LogManager;
1818
import org.apache.logging.log4j.Logger;
19+
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
1920

2021
public class RefreshS3ClientCron implements Runnable {
2122
private static final Logger LOGGER = LogManager.getLogger(RefreshS3ClientCron.class);
@@ -57,7 +58,13 @@ private void createAndRefreshNewClient(BasicSessionCredentials awsCredentials) {
5758
newS3Client.doesBucketExistV2(bucketName);
5859
LOGGER.trace("New S3 Client created");
5960

60-
s3Client.refreshS3Client(awsCredentials, newS3Client);
61+
s3Client.refreshS3Client(
62+
awsCredentials,
63+
newS3Client,
64+
AwsSessionCredentials.create(
65+
awsCredentials.getAWSAccessKeyId(),
66+
awsCredentials.getAWSSecretKey(),
67+
awsCredentials.getSessionToken()));
6168
}
6269

6370
private BasicSessionCredentials getBasicSessionCredentials() throws IOException {

backend/common/src/main/java/ai/verta/modeldb/common/artifactStore/storageservice/s3/S3Client.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
import java.util.function.Supplier;
1515
import org.apache.logging.log4j.LogManager;
1616
import org.apache.logging.log4j.Logger;
17+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
18+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
19+
import software.amazon.awssdk.regions.Region;
20+
import software.amazon.awssdk.services.s3.S3AsyncClient;
1721

1822
// S3Client provides a wrapper to the regular AmazonS3 object. The goal is to ensure that the
1923
// AmazonS3 object is
@@ -33,8 +37,10 @@ public class S3Client {
3337
private final S3Config s3Config;
3438

3539
private volatile AmazonS3 s3Client;
40+
private volatile S3AsyncClient asyncClient;
3641
private volatile AtomicInteger referenceCounter;
3742
private volatile AWSCredentials awsCredentials;
43+
private volatile AwsCredentials v2Credentials;
3844

3945
public S3Client(S3Config s3Config) throws ModelDBException {
4046
this.s3Config = s3Config;
@@ -89,7 +95,7 @@ private AmazonS3 buildEnvironmentClient(Regions awsRegion) {
8995
*/
9096
private void scheduleRefresh(Supplier<AmazonS3> s3) {
9197
CommonUtils.scheduleTask(
92-
() -> refreshS3Client(awsCredentials, s3.get()),
98+
() -> refreshS3Client(awsCredentials, s3.get(), v2Credentials),
9399
0L,
94100
s3Config.getRefreshIntervalSeconds(),
95101
TimeUnit.SECONDS);
@@ -98,6 +104,7 @@ private void scheduleRefresh(Supplier<AmazonS3> s3) {
98104
private void initializeMinioClient(
99105
String cloudAccessKey, String cloudSecretKey, Regions awsRegion, String minioEndpoint) {
100106
this.awsCredentials = new BasicAWSCredentials(cloudAccessKey, cloudSecretKey);
107+
this.v2Credentials = AwsBasicCredentials.create(cloudAccessKey, cloudSecretKey);
101108
var clientConfiguration = new ClientConfiguration(defaultClientConfig);
102109
clientConfiguration.setSignerOverride("VertaSignOverrideS3Signer");
103110
SignerFactory.registerSigner("VertaSignOverrideS3Signer", SignOverrideS3Signer.class);
@@ -120,6 +127,7 @@ private AmazonS3 buildMinioClient(
120127
private void initializeS3ClientWithAccessKey(
121128
String cloudAccessKey, String cloudSecretKey, Regions awsRegion) {
122129
this.awsCredentials = new BasicAWSCredentials(cloudAccessKey, cloudSecretKey);
130+
this.v2Credentials = AwsBasicCredentials.create(cloudAccessKey, cloudSecretKey);
123131
this.s3Client = buildAccessKeyClient(awsRegion);
124132
scheduleRefresh(() -> buildAccessKeyClient(awsRegion));
125133
}
@@ -133,7 +141,7 @@ private AmazonS3 buildAccessKeyClient(Regions awsRegion) {
133141
}
134142

135143
public RefCountedS3Client getRefCountedClient() {
136-
return new RefCountedS3Client(awsCredentials, s3Client, referenceCounter);
144+
return new RefCountedS3Client(awsCredentials, s3Client, asyncClient, referenceCounter);
137145
}
138146

139147
private void initializeWithWebIdentity(Regions awsRegion) {
@@ -152,7 +160,13 @@ private void initializeWithWebIdentity(Regions awsRegion) {
152160
TimeUnit.SECONDS);
153161
}
154162

155-
void refreshS3Client(AWSCredentials awsCredentials, AmazonS3 s3Client) {
163+
void refreshS3Client(
164+
AWSCredentials awsCredentials, AmazonS3 s3Client, AwsCredentials v2Credentials) {
165+
var s3AsyncClient =
166+
S3AsyncClient.builder()
167+
.credentialsProvider(() -> v2Credentials)
168+
.region(Region.of(s3Config.getAwsRegion()))
169+
.build();
156170
// Once we get to this point, we know that we have a good new s3 client, so it's time to swap
157171
// it. No fail can happen now
158172
LOGGER.debug("Replacing S3 Client");
@@ -163,7 +177,9 @@ void refreshS3Client(AWSCredentials awsCredentials, AmazonS3 s3Client) {
163177
// Swap the references
164178
this.referenceCounter = new AtomicInteger(1);
165179
this.awsCredentials = awsCredentials;
180+
this.v2Credentials = v2Credentials;
166181
this.s3Client = s3Client;
182+
this.asyncClient = s3AsyncClient;
167183
LOGGER.debug("S3 Client replaced");
168184
// At the end of the try, the reference counter will be decremented again and shutdown will
169185
// be called

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<prometheus.version>0.16.0</prometheus.version>
3131
<protobuf.version>3.25.2</protobuf.version>
3232
<aws.version>1.12.656</aws.version>
33+
<aws.java.sdk.v2.version>2.25.1</aws.java.sdk.v2.version>
3334
<jackson.version>2.16.1</jackson.version>
3435
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>
3536
<otel.core.version>1.32.0</otel.core.version>
@@ -258,6 +259,13 @@
258259
<artifactId>aws-java-sdk-sts</artifactId>
259260
<version>${aws.version}</version>
260261
</dependency>
262+
<dependency>
263+
<groupId>software.amazon.awssdk</groupId>
264+
<artifactId>bom</artifactId>
265+
<version>${aws.java.sdk.v2.version}</version>
266+
<type>pom</type>
267+
<scope>import</scope>
268+
</dependency>
261269

262270

263271
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage -->

0 commit comments

Comments
 (0)