Skip to content

Commit e48a0f6

Browse files
committed
Use CRT response file in getObject
1 parent 3cdd323 commit e48a0f6

File tree

8 files changed

+288
-9
lines changed

8 files changed

+288
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Consumer;
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
25+
import software.amazon.awssdk.core.async.SdkPublisher;
26+
import software.amazon.awssdk.utils.Logger;
27+
28+
/**
29+
* When the CRT Response File option is used in a request, the body is streamed directly to the file.
30+
* This transformer is effectively a no-op transformer that waits for the stream to complete and then
31+
* completes the future with the response.
32+
*
33+
* @param <ResponseT> Pojo response type.
34+
*/
35+
@SdkInternalApi
36+
public final class CrtResponseFileResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT,
37+
ResponseT> {
38+
39+
private static final Logger log = Logger.loggerFor(CrtResponseFileResponseTransformer.class);
40+
41+
private volatile CompletableFuture<Void> cf;
42+
private volatile ResponseT response;
43+
44+
@Override
45+
public CompletableFuture<ResponseT> prepare() {
46+
cf = new CompletableFuture<>();
47+
return cf.thenApply(ignored -> response);
48+
}
49+
50+
@Override
51+
public void onResponse(ResponseT response) {
52+
this.response = response;
53+
}
54+
55+
@Override
56+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
57+
publisher.subscribe(new OnCompleteSubscriber(cf, this::exceptionOccurred));
58+
}
59+
60+
@Override
61+
public void exceptionOccurred(Throwable throwable) {
62+
if (cf != null) {
63+
cf.completeExceptionally(throwable);
64+
} else {
65+
log.warn(() -> "An exception occurred before the call to prepare() was able to instantiate the CompletableFuture."
66+
+ "The future cannot be completed exceptionally because it is null");
67+
68+
}
69+
}
70+
71+
private static final class OnCompleteSubscriber implements Subscriber<ByteBuffer> {
72+
73+
private final CompletableFuture<Void> future;
74+
private final Consumer<Throwable> onErrorMethod;
75+
private Subscription subscription;
76+
77+
private OnCompleteSubscriber(CompletableFuture<Void> future, Consumer<Throwable> onErrorMethod) {
78+
this.future = future;
79+
this.onErrorMethod = onErrorMethod;
80+
}
81+
82+
@Override
83+
public void onSubscribe(Subscription s) {
84+
if (this.subscription != null) {
85+
s.cancel();
86+
return;
87+
}
88+
this.subscription = s;
89+
// Request the first chunk to start producing content
90+
s.request(1);
91+
}
92+
93+
@Override
94+
public void onNext(ByteBuffer byteBuffer) {
95+
}
96+
97+
@Override
98+
public void onError(Throwable throwable) {
99+
onErrorMethod.accept(throwable);
100+
}
101+
102+
@Override
103+
public void onComplete() {
104+
future.complete(null);
105+
}
106+
}
107+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
4343
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
4444
import software.amazon.awssdk.core.SdkRequest;
45+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4546
import software.amazon.awssdk.core.checksums.ChecksumValidation;
4647
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
4748
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
@@ -57,6 +58,7 @@
5758
import software.amazon.awssdk.core.signer.NoOpSigner;
5859
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions;
5960
import software.amazon.awssdk.crt.io.StandardRetryOptions;
61+
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
6062
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
6163
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
6264
import software.amazon.awssdk.identity.spi.IdentityProvider;
@@ -72,6 +74,8 @@
7274
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
7375
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
7476
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
77+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
78+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
7579
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
7680
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
7781
import software.amazon.awssdk.utils.CollectionUtils;
@@ -80,6 +84,9 @@
8084
@SdkInternalApi
8185
public final class DefaultS3CrtAsyncClient extends DelegatingS3AsyncClient implements S3CrtAsyncClient {
8286
public static final ExecutionAttribute<Path> OBJECT_FILE_PATH = new ExecutionAttribute<>("objectFilePath");
87+
public static final ExecutionAttribute<Path> RESPONSE_FILE_PATH = new ExecutionAttribute<>("responseFilePath");
88+
public static final ExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
89+
new ExecutionAttribute<>("responseFileOption");
8390
private static final String CRT_CLIENT_CLASSPATH = "software.amazon.awssdk.crt.s3.S3Client";
8491
private final CopyObjectHelper copyObjectHelper;
8592

@@ -106,6 +113,22 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
106113
new CrtContentLengthOnlyAsyncFileRequestBody(sourcePath));
107114
}
108115

116+
@Override
117+
public CompletableFuture<GetObjectResponse> getObject(GetObjectRequest getObjectRequest, Path destinationPath) {
118+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
119+
new CrtResponseFileResponseTransformer<>();
120+
121+
AwsRequestOverrideConfiguration overrideConfig =
122+
getObjectRequest.overrideConfiguration()
123+
.map(config -> config.toBuilder().putExecutionAttribute(RESPONSE_FILE_PATH, destinationPath))
124+
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
125+
.putExecutionAttribute(RESPONSE_FILE_PATH,
126+
destinationPath))
127+
.build();
128+
129+
return getObject(getObjectRequest.toBuilder().overrideConfiguration(overrideConfig).build(), responseTransformer);
130+
}
131+
109132
@Override
110133
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
111134
return copyObjectHelper.copyObject(copyObjectRequest);
@@ -243,7 +266,7 @@ public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider cred
243266

244267
@Override
245268
public DefaultS3CrtClientBuilder credentialsProvider(
246-
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
269+
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
247270
this.credentialsProvider = credentialsProvider;
248271
return this;
249272
}
@@ -396,6 +419,10 @@ public void afterMarshalling(Context.AfterMarshalling context,
396419
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_CHECKSUM_CALCULATION))
397420
.put(RESPONSE_CHECKSUM_VALIDATION,
398421
executionAttributes.getAttribute(SdkInternalExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION))
422+
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH,
423+
executionAttributes.getAttribute(RESPONSE_FILE_PATH))
424+
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION,
425+
executionAttributes.getAttribute(RESPONSE_FILE_OPTION))
399426
.build();
400427

401428
// We rely on CRT to perform checksum validation, disable SDK flexible checksum implementation

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
2525
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.REQUEST_CHECKSUM_CALCULATION;
2626
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION;
27+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION;
28+
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH;
2729
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
2830
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
2931
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
@@ -135,11 +137,6 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
135137
HttpRequest httpRequest = toCrtRequest(asyncRequest);
136138
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
137139
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
138-
S3CrtResponseHandlerAdapter responseHandler =
139-
new S3CrtResponseHandlerAdapter(executeFuture,
140-
asyncRequest.responseHandler(),
141-
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
142-
s3MetaRequestFuture);
143140

144141
String operationName = asyncRequest.httpExecutionAttributes().getAttribute(OPERATION_NAME);
145142
S3MetaRequestOptions.MetaRequestType requestType = requestType(operationName);
@@ -156,6 +153,16 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
156153
ChecksumConfig checksumConfig = checksumConfig(httpChecksum, requestType, requestChecksumCalculation,
157154
responseChecksumValidation);
158155

156+
Path responseFilePath = httpExecutionAttributes.getAttribute(RESPONSE_FILE_PATH);
157+
S3MetaRequestOptions.ResponseFileOption responseFileOption = httpExecutionAttributes.getAttribute(RESPONSE_FILE_OPTION);
158+
159+
S3CrtResponseHandlerAdapter responseHandler =
160+
new S3CrtResponseHandlerAdapter(
161+
executeFuture,
162+
asyncRequest.responseHandler(),
163+
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
164+
s3MetaRequestFuture);
165+
159166
URI endpoint = getEndpoint(uri);
160167

161168
AwsSigningConfig signingConfig = awsSigningConfig(signingRegion, httpExecutionAttributes);
@@ -169,7 +176,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
169176
.withResumeToken(resumeToken)
170177
.withOperationName(operationName)
171178
.withRequestFilePath(requestFilePath)
172-
.withSigningConfig(signingConfig);
179+
.withSigningConfig(signingConfig)
180+
.withResponseFilePath(responseFilePath);
181+
182+
if (responseFileOption != null) {
183+
requestOptions = requestOptions.withResponseFileOption(responseFileOption);
184+
}
173185

174186
try {
175187
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseHandler {
5151
private static final Logger log = Logger.loggerFor(S3CrtResponseHandlerAdapter.class);
5252
private static final Duration META_REQUEST_TIMEOUT = Duration.ofSeconds(10);
53+
private static final int CRT_SUCCESSFUL_CANCEL = 14374;
54+
5355
private final CompletableFuture<Void> resultFuture;
5456
private final SdkAsyncHttpResponseHandler responseHandler;
5557

@@ -152,6 +154,8 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
152154
public void onFinished(S3FinishedResponseContext context) {
153155
int crtCode = context.getErrorCode();
154156
log.debug(() -> "Request finished with code: " + crtCode);
157+
initiateResponseHandling(initialHeadersResponse.build());
158+
155159
if (crtCode != CRT.AWS_CRT_SUCCESS) {
156160
handleError(context);
157161
} else {
@@ -182,11 +186,29 @@ private void handleError(S3FinishedResponseContext context) {
182186

183187
if (isServiceError(responseStatus) && errorPayload != null) {
184188
handleServiceError(responseStatus, headers, errorPayload);
189+
} else if (crtCode == CRT_SUCCESSFUL_CANCEL) {
190+
handleSuccessfulCancel(context, crtCode);
185191
} else {
186192
handleIoError(context, crtCode);
187193
}
188194
}
189195

196+
private void handleSuccessfulCancel(S3FinishedResponseContext context, int crtCode) {
197+
if (!responseHandlingInitiated) {
198+
responseHandlingInitiated = true;
199+
responseHandler.onHeaders(initialHeadersResponse.build());
200+
}
201+
202+
Throwable cause = context.getCause();
203+
204+
// TODO: Potentially subclass this exception
205+
SdkClientException sdkClientException =
206+
SdkClientException.create("Failed to send the request: " +
207+
CRT.awsErrorString(crtCode), cause);
208+
failResponseHandlerAndFuture(sdkClientException);
209+
notifyResponsePublisherErrorIfNeeded(sdkClientException);
210+
}
211+
190212
private void handleIoError(S3FinishedResponseContext context, int crtCode) {
191213
Throwable cause = context.getCause();
192214

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
2222
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
2323
import software.amazon.awssdk.crt.s3.ResumeToken;
24+
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
2425
import software.amazon.awssdk.http.SdkHttpExecutionAttribute;
2526
import software.amazon.awssdk.regions.Region;
2627

@@ -57,6 +58,12 @@ public final class S3InternalSdkHttpExecutionAttribute<T> extends SdkHttpExecuti
5758
public static final S3InternalSdkHttpExecutionAttribute<ResponseChecksumValidation> RESPONSE_CHECKSUM_VALIDATION =
5859
new S3InternalSdkHttpExecutionAttribute<>(ResponseChecksumValidation.class);
5960

61+
public static final S3InternalSdkHttpExecutionAttribute<Path> RESPONSE_FILE_PATH =
62+
new S3InternalSdkHttpExecutionAttribute<>(Path.class);
63+
64+
public static final S3InternalSdkHttpExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
65+
new S3InternalSdkHttpExecutionAttribute<>(S3MetaRequestOptions.ResponseFileOption.class);
66+
6067
private S3InternalSdkHttpExecutionAttribute(Class<T> valueClass) {
6168
super(valueClass);
6269
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3MetaRequestPauseObservable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,11 @@ public void subscribe(S3MetaRequestWrapper request) {
4545
public ResumeToken pause() {
4646
return pause.apply(request);
4747
}
48+
49+
public void cancel() {
50+
if (request != null) {
51+
request.cancel();
52+
}
53+
}
4854
}
4955

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.internal.crt;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutionException;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.mockito.Mockito;
27+
import software.amazon.awssdk.core.SdkResponse;
28+
import software.amazon.awssdk.core.async.AsyncRequestBody;
29+
import software.amazon.awssdk.core.async.SdkPublisher;
30+
31+
public class CrtResponseFileResponseTransformerTest {
32+
private CrtResponseFileResponseTransformer<SdkResponse> transformer;
33+
private SdkResponse response;
34+
private SdkPublisher<ByteBuffer> publisher;
35+
36+
@BeforeEach
37+
public void setUp() throws Exception {
38+
transformer = new CrtResponseFileResponseTransformer<>();
39+
response = Mockito.mock(SdkResponse.class);
40+
publisher = AsyncRequestBody.fromString("");
41+
}
42+
43+
@Test
44+
void successfulResponseAndStream_returnsResponsePublisher() throws Exception {
45+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
46+
transformer.onResponse(response);
47+
assertThat(responseFuture.isDone()).isFalse();
48+
transformer.onStream(publisher);
49+
assertThat(responseFuture.isDone()).isTrue();
50+
SdkResponse returnedResponse = responseFuture.get();
51+
assertThat(returnedResponse).isEqualTo(response);
52+
}
53+
54+
@Test
55+
void failedResponse_completesExceptionally() {
56+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
57+
assertThat(responseFuture.isDone()).isFalse();
58+
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - before response."));
59+
assertThat(responseFuture.isDone()).isTrue();
60+
assertThatThrownBy(responseFuture::get)
61+
.isInstanceOf(ExecutionException.class)
62+
.hasCauseInstanceOf(RuntimeException.class);
63+
}
64+
65+
@Test
66+
void failedStream_completesExceptionally() {
67+
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
68+
transformer.onResponse(response);
69+
assertThat(responseFuture.isDone()).isFalse();
70+
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - after response."));
71+
assertThat(responseFuture.isDone()).isTrue();
72+
assertThatThrownBy(responseFuture::get)
73+
.isInstanceOf(ExecutionException.class)
74+
.hasCauseInstanceOf(RuntimeException.class);
75+
}
76+
77+
}

0 commit comments

Comments
 (0)