Skip to content

Commit e7bcd36

Browse files
authored
Fixed the issue where CRT-based S3 Transfer Manager still uses Java to read file in uploadDirectory (#5012)
* AWS-CRT based S3 Transfer Manager now relies on CRT to perform file reading for upload directory. * Address sonarcloud issues
1 parent e9e75c9 commit e7bcd36

File tree

5 files changed

+145
-34
lines changed

5 files changed

+145
-34
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "AWS-CRT based S3 Transfer Manager now relies on CRT to perform file reading for upload directory. Related to [#4999](https://github.com/aws/aws-sdk-java-v2/issues/4999)"
6+
}

services-custom/s3-transfer-manager/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@
221221
<groupId>com.github.tomakehurst</groupId>
222222
<scope>test</scope>
223223
</dependency>
224+
<dependency>
225+
<groupId>org.mockito</groupId>
226+
<artifactId>mockito-junit-jupiter</artifactId>
227+
<scope>test</scope>
228+
</dependency>
224229
</dependencies>
225230

226231
<build>

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
2020
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2121
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
22-
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
2322

2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.function.Consumer;
@@ -40,21 +39,18 @@
4039
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
4140
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
4241
import software.amazon.awssdk.utils.CompletableFutureUtils;
43-
import software.amazon.awssdk.utils.Logger;
4442
import software.amazon.awssdk.utils.Validate;
4543

4644
/**
4745
* An implementation of {@link S3TransferManager} that uses CRT-based S3 client under the hood.
4846
*/
4947
@SdkInternalApi
50-
class CrtS3TransferManager extends DelegatingS3TransferManager {
51-
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
52-
private static final PauseResumeHelper PAUSE_RESUME_HELPER = new PauseResumeHelper();
48+
class CrtS3TransferManager extends GenericS3TransferManager {
5349
private final S3AsyncClient s3AsyncClient;
5450

5551
CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
5652
boolean isDefaultS3AsyncClient) {
57-
super(new GenericS3TransferManager(transferConfiguration, s3AsyncClient, isDefaultS3AsyncClient));
53+
super(transferConfiguration, s3AsyncClient, isDefaultS3AsyncClient);
5854
this.s3AsyncClient = s3AsyncClient;
5955
}
6056

@@ -70,7 +66,7 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
7066
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable)
7167
.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());
7268

73-
PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);
69+
PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);
7470

7571
CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();
7672

@@ -99,28 +95,15 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
9995
}
10096

10197
@Override
102-
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
103-
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");
104-
105-
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
106-
boolean noResumeToken = !PAUSE_RESUME_HELPER.hasResumeToken(resumableFileUpload);
107-
108-
if (fileModified || noResumeToken) {
109-
return uploadFile(resumableFileUpload.uploadFileRequest());
110-
}
111-
112-
return doResumeUpload(resumableFileUpload);
113-
}
114-
115-
private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
98+
FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
11699
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
117100
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
118101
ResumeToken resumeToken = crtResumeToken(resumableFileUpload);
119102

120103
Consumer<SdkHttpExecutionAttributes.Builder> attachResumeToken =
121104
b -> b.put(CRT_PAUSE_RESUME_TOKEN, resumeToken);
122105

123-
PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);
106+
PutObjectRequest modifiedPutObjectRequest = attachCrtSdkAttribute(putObjectRequest, attachResumeToken);
124107

125108
return uploadFile(uploadFileRequest.toBuilder()
126109
.putObjectRequest(modifiedPutObjectRequest)
@@ -135,8 +118,8 @@ private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUploa
135118
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
136119
}
137120

138-
private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
139-
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
121+
private PutObjectRequest attachCrtSdkAttribute(PutObjectRequest putObjectRequest,
122+
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
140123
SdkHttpExecutionAttributes modifiedAttributes =
141124
putObjectRequest.overrideConfiguration().map(o -> o.executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES))
142125
.map(b -> b.toBuilder().applyMutation(builderMutation).build())

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class GenericS3TransferManager implements S3TransferManager {
124124
}
125125

126126
@Override
127-
public Upload upload(UploadRequest uploadRequest) {
127+
public final Upload upload(UploadRequest uploadRequest) {
128128
Validate.paramNotNull(uploadRequest, "uploadRequest");
129129

130130
AsyncRequestBody requestBody = uploadRequest.requestBody();
@@ -164,6 +164,9 @@ public Upload upload(UploadRequest uploadRequest) {
164164
return new DefaultUpload(returnFuture, progressUpdater.progress());
165165
}
166166

167+
/**
168+
* May be overridden by subclasses to provide customized behavior
169+
*/
167170
@Override
168171
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
169172
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
@@ -215,7 +218,7 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
215218
}
216219

217220
@Override
218-
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
221+
public final FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
219222
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");
220223

221224
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
@@ -233,7 +236,11 @@ private boolean isS3ClientMultipartEnabled() {
233236
return s3AsyncClient instanceof MultipartS3AsyncClient;
234237
}
235238

236-
private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
239+
240+
/**
241+
* Can be overridden by subclasses to provide different implementation
242+
*/
243+
FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
237244
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
238245
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
239246
S3ResumeToken s3ResumeToken = s3ResumeToken(resumableFileUpload);
@@ -280,7 +287,7 @@ private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
280287
}
281288

282289
@Override
283-
public DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
290+
public final DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
284291
Validate.paramNotNull(uploadDirectoryRequest, "uploadDirectoryRequest");
285292

286293
try {
@@ -293,7 +300,7 @@ public DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryReq
293300
}
294301

295302
@Override
296-
public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadRequest) {
303+
public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadRequest) {
297304
Validate.paramNotNull(downloadRequest, "downloadRequest");
298305

299306
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer =
@@ -326,7 +333,7 @@ public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadReq
326333
}
327334

328335
@Override
329-
public FileDownload downloadFile(DownloadFileRequest downloadRequest) {
336+
public final FileDownload downloadFile(DownloadFileRequest downloadRequest) {
330337
Validate.paramNotNull(downloadRequest, "downloadFileRequest");
331338

332339
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
@@ -367,7 +374,7 @@ private TransferProgressUpdater doDownloadFile(
367374
}
368375

369376
@Override
370-
public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownload) {
377+
public final FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownload) {
371378
Validate.paramNotNull(resumableFileDownload, "resumableFileDownload");
372379
CompletableFuture<CompletedFileDownload> returnFuture = new CompletableFuture<>();
373380
DownloadFileRequest originalDownloadRequest = resumableFileDownload.downloadFileRequest();
@@ -432,7 +439,7 @@ private static void handleException(CompletableFuture<CompletedFileDownload> ret
432439
}
433440

434441
@Override
435-
public DirectoryDownload downloadDirectory(DownloadDirectoryRequest downloadDirectoryRequest) {
442+
public final DirectoryDownload downloadDirectory(DownloadDirectoryRequest downloadDirectoryRequest) {
436443
Validate.paramNotNull(downloadDirectoryRequest, "downloadDirectoryRequest");
437444

438445
try {
@@ -445,7 +452,7 @@ public DirectoryDownload downloadDirectory(DownloadDirectoryRequest downloadDire
445452
}
446453

447454
@Override
448-
public Copy copy(CopyRequest copyRequest) {
455+
public final Copy copy(CopyRequest copyRequest) {
449456
Validate.paramNotNull(copyRequest, "copyRequest");
450457

451458
CompletableFuture<CompletedCopy> returnFuture = new CompletableFuture<>();
@@ -476,7 +483,7 @@ public Copy copy(CopyRequest copyRequest) {
476483
}
477484

478485
@Override
479-
public void close() {
486+
public final void close() {
480487
if (isDefaultS3AsyncClient) {
481488
IoUtils.closeQuietly(s3AsyncClient, log.logger());
482489
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.when;
22+
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
23+
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
24+
25+
import com.google.common.jimfs.Jimfs;
26+
import java.io.IOException;
27+
import java.nio.file.FileSystem;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
30+
import java.util.concurrent.CompletableFuture;
31+
import org.apache.commons.lang3.RandomStringUtils;
32+
import org.junit.jupiter.api.AfterAll;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.junit.jupiter.api.extension.ExtendWith;
37+
import org.mockito.ArgumentCaptor;
38+
import org.mockito.Mock;
39+
import org.mockito.junit.jupiter.MockitoExtension;
40+
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
41+
import software.amazon.awssdk.services.s3.S3AsyncClient;
42+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
43+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
44+
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
45+
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
46+
47+
@ExtendWith(MockitoExtension.class)
48+
public class CrtS3TransferManagerTest {
49+
50+
@Mock
51+
private S3AsyncClient s3AsyncClient;
52+
53+
private static Path localDirectory;
54+
private static FileSystem jimfs;
55+
private CrtS3TransferManager transferManager;
56+
57+
@BeforeAll
58+
public static void setUp() throws IOException {
59+
jimfs = Jimfs.newFileSystem();
60+
localDirectory = jimfs.getPath("test");
61+
Files.createDirectory(localDirectory);
62+
Files.write(jimfs.getPath("test", "test.txt"), RandomStringUtils.randomAscii(1024).getBytes());
63+
}
64+
65+
@BeforeEach
66+
public void setUpPerMethod() {
67+
transferManager = new CrtS3TransferManager(TransferManagerConfiguration.builder().build(),
68+
s3AsyncClient, false);
69+
}
70+
71+
@AfterAll
72+
public static void tearDown() throws IOException {
73+
jimfs.close();
74+
}
75+
76+
@Test
77+
void uploadDirectory_shouldUseCrtUploadFile() {
78+
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(Path.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
79+
transferManager.uploadDirectory(UploadDirectoryRequest.builder().bucket("TEST").source(localDirectory).build())
80+
.completionFuture()
81+
.join();
82+
83+
verifyCrtInRequestAttributes();
84+
}
85+
86+
@Test
87+
void uploadFile_shouldUseCrtUploadFile() {
88+
when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(Path.class))).thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
89+
transferManager.uploadFile(UploadFileRequest.builder()
90+
.putObjectRequest(PutObjectRequest.builder().bucket("test").key("test").build())
91+
.source(localDirectory.resolve("test.txt"))
92+
.build())
93+
.completionFuture()
94+
.join();
95+
96+
verifyCrtInRequestAttributes();
97+
}
98+
99+
private void verifyCrtInRequestAttributes() {
100+
ArgumentCaptor<PutObjectRequest> requestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
101+
102+
verify(s3AsyncClient).putObject(requestArgumentCaptor.capture(), ArgumentCaptor.forClass(Path.class).capture());
103+
104+
PutObjectRequest actual = requestArgumentCaptor.getValue();
105+
assertThat(actual.overrideConfiguration()).isPresent();
106+
SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES);
107+
assertThat(attribute).isNotNull();
108+
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
109+
}
110+
}

0 commit comments

Comments
 (0)