Skip to content

Commit 2120ac6

Browse files
authored
Use CRT response file in low level CRT S3AsyncClient for getObject (#6289)
* Use CRT response file in getObject * Undo cancel changes not required for low level client only * Add changelog + fix warning usage * PR cleanups
1 parent dbaaa34 commit 2120ac6

File tree

8 files changed

+284
-10
lines changed

8 files changed

+284
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS S3",
4+
"contributor": "",
5+
"description": "Add support for using CRT's response file in the CRT based S3AsyncClient - CRT will directly write to the file when calling getObject with a Path."
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Consumer;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
25+
import software.amazon.awssdk.core.async.SdkPublisher;
26+
import software.amazon.awssdk.utils.Logger;
27+
28+
/**
29+
* When the CRT Response File option is used in a request, the body is streamed directly to the file.
30+
* The S3CrtResponseHandlerAdapter in this case will never receive a response body but will call onStream
31+
* when the request is complete with a publisher that will complete immediately.
32+
* This transformer is effectively a no-op transformer that waits for the stream to complete and then
33+
* completes the future with the response.
34+
*
35+
* @param <ResponseT> Pojo response type.
36+
*/
37+
@SdkInternalApi
38+
public final class CrtResponseFileResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT,
39+
ResponseT> {
40+
41+
private static final Logger log = Logger.loggerFor(CrtResponseFileResponseTransformer.class);
42+
43+
private volatile CompletableFuture<Void> cf;
44+
private volatile ResponseT response;
45+
46+
@Override
47+
public CompletableFuture<ResponseT> prepare() {
48+
cf = new CompletableFuture<>();
49+
return cf.thenApply(ignored -> response);
50+
}
51+
52+
@Override
53+
public void onResponse(ResponseT response) {
54+
this.response = response;
55+
}
56+
57+
@Override
58+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
59+
publisher.subscribe(new OnCompleteSubscriber(cf, this::exceptionOccurred));
60+
}
61+
62+
@Override
63+
public void exceptionOccurred(Throwable throwable) {
64+
if (cf != null) {
65+
cf.completeExceptionally(throwable);
66+
} else {
67+
log.warn(() -> "An exception occurred before the call to prepare() was able to instantiate the CompletableFuture. "
68+
+ "The future cannot be completed exceptionally because it is null");
69+
}
70+
}
71+
72+
private static final class OnCompleteSubscriber implements Subscriber<ByteBuffer> {
73+
74+
private final CompletableFuture<Void> future;
75+
private final Consumer<Throwable> onErrorMethod;
76+
private Subscription subscription;
77+
78+
private OnCompleteSubscriber(CompletableFuture<Void> future, Consumer<Throwable> onErrorMethod) {
79+
this.future = future;
80+
this.onErrorMethod = onErrorMethod;
81+
}
82+
83+
@Override
84+
public void onSubscribe(Subscription s) {
85+
if (this.subscription != null) {
86+
s.cancel();
87+
return;
88+
}
89+
this.subscription = s;
90+
// do not request data from the subscription since body is written directly to file
91+
}
92+
93+
@Override
94+
public void onNext(ByteBuffer byteBuffer) {
95+
// The response body is streamed directly to the file - this method should never be called.
96+
// ensure the future is completed exceptionally if this occurs
97+
onErrorMethod.accept(new IllegalStateException("OnCompleteSubscriber received unexpected call to onNext."));
98+
}
99+
100+
@Override
101+
public void onError(Throwable throwable) {
102+
onErrorMethod.accept(throwable);
103+
}
104+
105+
@Override
106+
public void onComplete() {
107+
future.complete(null);
108+
}
109+
}
110+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
4343
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
4444
import software.amazon.awssdk.core.SdkRequest;
45+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4546
import software.amazon.awssdk.core.checksums.ChecksumValidation;
4647
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
4748
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
@@ -57,6 +58,7 @@
5758
import software.amazon.awssdk.core.signer.NoOpSigner;
5859
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions;
5960
import software.amazon.awssdk.crt.io.StandardRetryOptions;
61+
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
6062
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
6163
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
6264
import software.amazon.awssdk.identity.spi.IdentityProvider;
@@ -72,6 +74,8 @@
7274
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
7375
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
7476
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
77+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
78+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
7579
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
7680
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
7781
import software.amazon.awssdk.utils.CollectionUtils;
@@ -80,6 +84,9 @@
8084
@SdkInternalApi
8185
public final class DefaultS3CrtAsyncClient extends DelegatingS3AsyncClient implements S3CrtAsyncClient {
8286
public static final ExecutionAttribute<Path> OBJECT_FILE_PATH = new ExecutionAttribute<>("objectFilePath");
87+
public static final ExecutionAttribute<Path> RESPONSE_FILE_PATH = new ExecutionAttribute<>("responseFilePath");
88+
public static final ExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
89+
new ExecutionAttribute<>("responseFileOption");
8390
private static final String CRT_CLIENT_CLASSPATH = "software.amazon.awssdk.crt.s3.S3Client";
8491
private final CopyObjectHelper copyObjectHelper;
8592

@@ -106,6 +113,22 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
106113
new CrtContentLengthOnlyAsyncFileRequestBody(sourcePath));
107114
}
108115

116+
@Override
117+
public CompletableFuture<GetObjectResponse> getObject(GetObjectRequest getObjectRequest, Path destinationPath) {
118+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
119+
new CrtResponseFileResponseTransformer<>();
120+
121+
AwsRequestOverrideConfiguration overrideConfig =
122+
getObjectRequest.overrideConfiguration()
123+
.map(config -> config.toBuilder().putExecutionAttribute(RESPONSE_FILE_PATH, destinationPath))
124+
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
125+
.putExecutionAttribute(RESPONSE_FILE_PATH,
126+
destinationPath))
127+
.build();
128+
129+
return getObject(getObjectRequest.toBuilder().overrideConfiguration(overrideConfig).build(), responseTransformer);
130+
}
131+
109132
@Override
110133
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
111134
return copyObjectHelper.copyObject(copyObjectRequest);
@@ -243,7 +266,7 @@ public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider cred
243266

244267
@Override
245268
public DefaultS3CrtClientBuilder credentialsProvider(
246-
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
269+
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
247270
this.credentialsProvider = credentialsProvider;
248271
return this;
249272
}
@@ -396,6 +419,10 @@ public void afterMarshalling(Context.AfterMarshalling context,
396419
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_CHECKSUM_CALCULATION))
397420
.put(RESPONSE_CHECKSUM_VALIDATION,
398421
executionAttributes.getAttribute(SdkInternalExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION))
422+
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH,
423+
executionAttributes.getAttribute(RESPONSE_FILE_PATH))
424+
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION,
425+
executionAttributes.getAttribute(RESPONSE_FILE_OPTION))
399426
.build();
400427

401428
// We rely on CRT to perform checksum validation, disable SDK flexible checksum implementation

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
2525
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.REQUEST_CHECKSUM_CALCULATION;
2626
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION;
27+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION;
28+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH;
2729
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
2830
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
2931
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
@@ -135,11 +137,6 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
135137
HttpRequest httpRequest = toCrtRequest(asyncRequest);
136138
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
137139
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
138-
S3CrtResponseHandlerAdapter responseHandler =
139-
new S3CrtResponseHandlerAdapter(executeFuture,
140-
asyncRequest.responseHandler(),
141-
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
142-
s3MetaRequestFuture);
143140

144141
String operationName = asyncRequest.httpExecutionAttributes().getAttribute(OPERATION_NAME);
145142
S3MetaRequestOptions.MetaRequestType requestType = requestType(operationName);
@@ -156,6 +153,16 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
156153
ChecksumConfig checksumConfig = checksumConfig(httpChecksum, requestType, requestChecksumCalculation,
157154
responseChecksumValidation);
158155

156+
Path responseFilePath = httpExecutionAttributes.getAttribute(RESPONSE_FILE_PATH);
157+
S3MetaRequestOptions.ResponseFileOption responseFileOption = httpExecutionAttributes.getAttribute(RESPONSE_FILE_OPTION);
158+
159+
S3CrtResponseHandlerAdapter responseHandler =
160+
new S3CrtResponseHandlerAdapter(
161+
executeFuture,
162+
asyncRequest.responseHandler(),
163+
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
164+
s3MetaRequestFuture);
165+
159166
URI endpoint = getEndpoint(uri);
160167

161168
AwsSigningConfig signingConfig = awsSigningConfig(signingRegion, httpExecutionAttributes);
@@ -169,7 +176,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
169176
.withResumeToken(resumeToken)
170177
.withOperationName(operationName)
171178
.withRequestFilePath(requestFilePath)
172-
.withSigningConfig(signingConfig);
179+
.withSigningConfig(signingConfig)
180+
.withResponseFilePath(responseFilePath);
181+
182+
if (responseFileOption != null) {
183+
requestOptions = requestOptions.withResponseFileOption(responseFileOption);
184+
}
173185

174186
try {
175187
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
2222
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
2323
import software.amazon.awssdk.crt.s3.ResumeToken;
24+
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
2425
import software.amazon.awssdk.http.SdkHttpExecutionAttribute;
2526
import software.amazon.awssdk.regions.Region;
2627

@@ -57,6 +58,12 @@ public final class S3InternalSdkHttpExecutionAttribute<T> extends SdkHttpExecuti
5758
public static final S3InternalSdkHttpExecutionAttribute<ResponseChecksumValidation> RESPONSE_CHECKSUM_VALIDATION =
5859
new S3InternalSdkHttpExecutionAttribute<>(ResponseChecksumValidation.class);
5960

61+
public static final S3InternalSdkHttpExecutionAttribute<Path> RESPONSE_FILE_PATH =
62+
new S3InternalSdkHttpExecutionAttribute<>(Path.class);
63+
64+
public static final S3InternalSdkHttpExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
65+
new S3InternalSdkHttpExecutionAttribute<>(S3MetaRequestOptions.ResponseFileOption.class);
66+
6067
private S3InternalSdkHttpExecutionAttribute(Class<T> valueClass) {
6168
super(valueClass);
6269
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutionException;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.mockito.Mockito;
27+
import org.reactivestreams.Subscriber;
28+
import software.amazon.awssdk.core.SdkResponse;
29+
import software.amazon.awssdk.core.async.AsyncRequestBody;
30+
import software.amazon.awssdk.core.async.SdkPublisher;
31+
32+
public class CrtResponseFileResponseTransformerTest {
33+
private CrtResponseFileResponseTransformer<SdkResponse> transformer;
34+
private SdkResponse response;
35+
private MockCrtPublisher publisher;
36+
37+
@BeforeEach
38+
public void setUp() throws Exception {
39+
transformer = new CrtResponseFileResponseTransformer<>();
40+
response = Mockito.mock(SdkResponse.class);
41+
publisher = new MockCrtPublisher();
42+
}
43+
44+
@Test
45+
void successfulResponseAndStream_returnsResponsePublisher() throws Exception {
46+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
47+
transformer.onResponse(response);
48+
assertThat(responseFuture.isDone()).isFalse();
49+
transformer.onStream(publisher);
50+
publisher.complete();
51+
assertThat(responseFuture.isDone()).isTrue();
52+
SdkResponse returnedResponse = responseFuture.get();
53+
assertThat(returnedResponse).isEqualTo(response);
54+
}
55+
56+
@Test
57+
void failedResponse_completesExceptionally() {
58+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
59+
assertThat(responseFuture.isDone()).isFalse();
60+
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - before response."));
61+
assertThat(responseFuture.isDone()).isTrue();
62+
assertThatThrownBy(responseFuture::get)
63+
.isInstanceOf(ExecutionException.class)
64+
.hasCauseInstanceOf(RuntimeException.class);
65+
}
66+
67+
@Test
68+
void failedStream_completesExceptionally() {
69+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
70+
transformer.onResponse(response);
71+
assertThat(responseFuture.isDone()).isFalse();
72+
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - after response."));
73+
assertThat(responseFuture.isDone()).isTrue();
74+
assertThatThrownBy(responseFuture::get)
75+
.isInstanceOf(ExecutionException.class)
76+
.hasCauseInstanceOf(RuntimeException.class);
77+
}
78+
79+
private static class MockCrtPublisher implements SdkPublisher<ByteBuffer> {
80+
private Subscriber<? super ByteBuffer> subscriber;
81+
@Override
82+
public void subscribe(Subscriber<? super ByteBuffer> s) {
83+
subscriber = s;
84+
}
85+
86+
public void complete() {
87+
subscriber.onComplete();
88+
}
89+
}
90+
91+
}

0 commit comments

Comments
 (0)