Skip to content

Commit d945a0c

Browse files
committed
Refactors
1 parent edd142e commit d945a0c

File tree

3 files changed

+20
-35
lines changed

3 files changed

+20
-35
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
2020
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2121
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
22-
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
2322

2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.function.Consumer;
@@ -70,36 +69,14 @@ public final Upload upload(UploadRequest uploadRequest) {
7069
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest,
7170
requestBody.contentLength().orElse(null));
7271
progressUpdater.transferInitiated();
73-
// requestBody = progressUpdater.wrapRequestBody(requestBody);
7472
progressUpdater.registerCompletion(returnFuture);
7573

76-
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
77-
78-
Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
79-
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable)
80-
.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());
81-
82-
PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachObservable);
74+
Consumer<SdkHttpExecutionAttributes.Builder> attachProgress =
75+
b -> b.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());
8376

84-
progressUpdater.transferInitiated();
85-
progressUpdater.registerCompletion(returnFuture);
77+
PutObjectRequest putObjectRequest = attachCrtSdkAttribute(uploadRequest.putObjectRequest(), attachProgress);
8678

87-
try {
88-
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
89-
90-
CompletableFuture<PutObjectResponse> future =
91-
s3AsyncClient.putObject(putObjectRequest, requestBody);
92-
93-
// Forward upload cancellation to future
94-
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
95-
96-
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
97-
r -> CompletedUpload.builder()
98-
.response(r)
99-
.build());
100-
} catch (Throwable throwable) {
101-
returnFuture.completeExceptionally(throwable);
102-
}
79+
doUpload(putObjectRequest, requestBody, returnFuture);
10380

10481
return new DefaultUpload(returnFuture, progressUpdater.progress());
10582
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,31 @@ public Upload upload(UploadRequest uploadRequest) {
148148
putObjectRequest = attachSdkAttribute(uploadRequest.putObjectRequest(), attachProgressListener);
149149
}
150150

151+
doUpload(putObjectRequest, requestBody, returnFuture);
152+
153+
return new DefaultUpload(returnFuture, progressUpdater.progress());
154+
}
155+
156+
protected void doUpload(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody,
157+
CompletableFuture<CompletedUpload> returnFuture) {
151158
try {
152-
assertNotUnsupportedArn(uploadRequest.putObjectRequest().bucket(), "upload");
159+
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");
153160

154161
CompletableFuture<PutObjectResponse> future =
155162
s3AsyncClient.putObject(putObjectRequest, requestBody);
156163

157164
// Forward upload cancellation to future
158165
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
159166

160-
CompletableFutureUtils.forwardTransformedResultTo(future, returnFuture,
161-
r -> CompletedUpload.builder()
162-
.response(r)
163-
.build());
167+
CompletableFutureUtils.forwardTransformedResultTo(
168+
future,
169+
returnFuture,
170+
r -> CompletedUpload.builder()
171+
.response(r)
172+
.build());
164173
} catch (Throwable throwable) {
165174
returnFuture.completeExceptionally(throwable);
166175
}
167-
168-
return new DefaultUpload(returnFuture, progressUpdater.progress());
169176
}
170177

171178
/**

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.verify;
2121
import static org.mockito.Mockito.when;
2222
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
23+
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
2324
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2425

2526
import com.google.common.jimfs.Jimfs;
@@ -105,6 +106,6 @@ private void verifyCrtInRequestAttributes() {
105106
assertThat(actual.overrideConfiguration()).isPresent();
106107
SdkHttpExecutionAttributes attribute = actual.overrideConfiguration().get().executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES);
107108
assertThat(attribute).isNotNull();
108-
assertThat(attribute.getAttribute(METAREQUEST_PAUSE_OBSERVABLE)).isNotNull();
109+
assertThat(attribute.getAttribute(CRT_PROGRESS_LISTENER)).isNotNull();
109110
}
110111
}

0 commit comments

Comments
 (0)