Skip to content

Use CRT response file in low level CRT S3AsyncClient for getObject #6289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSS3-233f74c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS S3",
"contributor": "",
"description": "Add support for using CRT's response file in the CRT based S3AsyncClient - CRT will directly write to the file when calling getObject with a Path."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.Logger;

/**
* When the CRT Response File option is used in a request, the body is streamed directly to the file.
* The S3CrtResponseHandlerAdapter in this case will never receive a response body but will call onStream
* when the request is complete with a publisher that will complete immediately.
* This transformer is effectively a no-op transformer that waits for the stream to complete and then
* completes the future with the response.
*
* @param <ResponseT> Pojo response type.
*/
@SdkInternalApi
public final class CrtResponseFileResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT,
ResponseT> {

private static final Logger log = Logger.loggerFor(CrtResponseFileResponseTransformer.class);

private volatile CompletableFuture<Void> cf;
private volatile ResponseT response;

@Override
public CompletableFuture<ResponseT> prepare() {
cf = new CompletableFuture<>();
return cf.thenApply(ignored -> response);
}

@Override
public void onResponse(ResponseT response) {
this.response = response;
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new OnCompleteSubscriber(cf, this::exceptionOccurred));
}

@Override
public void exceptionOccurred(Throwable throwable) {
if (cf != null) {
cf.completeExceptionally(throwable);
} else {
log.warn(() -> "An exception occurred before the call to prepare() was able to instantiate the CompletableFuture. "
+ "The future cannot be completed exceptionally because it is null");
}
}

private static final class OnCompleteSubscriber implements Subscriber<ByteBuffer> {

private final CompletableFuture<Void> future;
private final Consumer<Throwable> onErrorMethod;
private Subscription subscription;

private OnCompleteSubscriber(CompletableFuture<Void> future, Consumer<Throwable> onErrorMethod) {
this.future = future;
this.onErrorMethod = onErrorMethod;
}

@Override
public void onSubscribe(Subscription s) {
if (this.subscription != null) {
s.cancel();
return;
}
this.subscription = s;
// do not request data from the subscription since body is written directly to file
}

@Override
public void onNext(ByteBuffer byteBuffer) {
// The response body is streamed directly to the file - this method should never be called.
// ensure the future is completed exceptionally if this occurs
onErrorMethod.accept(new IllegalStateException("OnCompleteSubscriber received unexpected call to onNext."));
}

@Override
public void onError(Throwable throwable) {
onErrorMethod.accept(throwable);
}

@Override
public void onComplete() {
future.complete(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.checksums.ChecksumValidation;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
Expand All @@ -57,6 +58,7 @@
import software.amazon.awssdk.core.signer.NoOpSigner;
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions;
import software.amazon.awssdk.crt.io.StandardRetryOptions;
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
Expand All @@ -72,6 +74,8 @@
import software.amazon.awssdk.services.s3.internal.s3express.S3ExpressUtils;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.CollectionUtils;
Expand All @@ -80,6 +84,9 @@
@SdkInternalApi
public final class DefaultS3CrtAsyncClient extends DelegatingS3AsyncClient implements S3CrtAsyncClient {
public static final ExecutionAttribute<Path> OBJECT_FILE_PATH = new ExecutionAttribute<>("objectFilePath");
public static final ExecutionAttribute<Path> RESPONSE_FILE_PATH = new ExecutionAttribute<>("responseFilePath");
public static final ExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
new ExecutionAttribute<>("responseFileOption");
private static final String CRT_CLIENT_CLASSPATH = "software.amazon.awssdk.crt.s3.S3Client";
private final CopyObjectHelper copyObjectHelper;

Expand All @@ -106,6 +113,22 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
new CrtContentLengthOnlyAsyncFileRequestBody(sourcePath));
}

@Override
public CompletableFuture<GetObjectResponse> getObject(GetObjectRequest getObjectRequest, Path destinationPath) {
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
new CrtResponseFileResponseTransformer<>();

AwsRequestOverrideConfiguration overrideConfig =
getObjectRequest.overrideConfiguration()
.map(config -> config.toBuilder().putExecutionAttribute(RESPONSE_FILE_PATH, destinationPath))
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.putExecutionAttribute(RESPONSE_FILE_PATH,
destinationPath))
.build();

return getObject(getObjectRequest.toBuilder().overrideConfiguration(overrideConfig).build(), responseTransformer);
}

@Override
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
return copyObjectHelper.copyObject(copyObjectRequest);
Expand Down Expand Up @@ -243,7 +266,7 @@ public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider cred

@Override
public DefaultS3CrtClientBuilder credentialsProvider(
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
IdentityProvider<? extends AwsCredentialsIdentity> credentialsProvider) {
this.credentialsProvider = credentialsProvider;
return this;
}
Expand Down Expand Up @@ -396,6 +419,10 @@ public void afterMarshalling(Context.AfterMarshalling context,
executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_CHECKSUM_CALCULATION))
.put(RESPONSE_CHECKSUM_VALIDATION,
executionAttributes.getAttribute(SdkInternalExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION))
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH,
executionAttributes.getAttribute(RESPONSE_FILE_PATH))
.put(S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION,
executionAttributes.getAttribute(RESPONSE_FILE_OPTION))
.build();

// We rely on CRT to perform checksum validation, disable SDK flexible checksum implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.REQUEST_CHECKSUM_CALCULATION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_CHECKSUM_VALIDATION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_OPTION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.RESPONSE_FILE_PATH;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.USE_S3_EXPRESS_AUTH;
Expand Down Expand Up @@ -135,11 +137,6 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
HttpRequest httpRequest = toCrtRequest(asyncRequest);
SdkHttpExecutionAttributes httpExecutionAttributes = asyncRequest.httpExecutionAttributes();
CompletableFuture<S3MetaRequestWrapper> s3MetaRequestFuture = new CompletableFuture<>();
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture,
asyncRequest.responseHandler(),
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
s3MetaRequestFuture);

String operationName = asyncRequest.httpExecutionAttributes().getAttribute(OPERATION_NAME);
S3MetaRequestOptions.MetaRequestType requestType = requestType(operationName);
Expand All @@ -156,6 +153,16 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
ChecksumConfig checksumConfig = checksumConfig(httpChecksum, requestType, requestChecksumCalculation,
responseChecksumValidation);

Path responseFilePath = httpExecutionAttributes.getAttribute(RESPONSE_FILE_PATH);
S3MetaRequestOptions.ResponseFileOption responseFileOption = httpExecutionAttributes.getAttribute(RESPONSE_FILE_OPTION);

S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(
executeFuture,
asyncRequest.responseHandler(),
httpExecutionAttributes.getAttribute(CRT_PROGRESS_LISTENER),
s3MetaRequestFuture);

URI endpoint = getEndpoint(uri);

AwsSigningConfig signingConfig = awsSigningConfig(signingRegion, httpExecutionAttributes);
Expand All @@ -169,7 +176,12 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withResumeToken(resumeToken)
.withOperationName(operationName)
.withRequestFilePath(requestFilePath)
.withSigningConfig(signingConfig);
.withSigningConfig(signingConfig)
.withResponseFilePath(responseFilePath);

if (responseFileOption != null) {
requestOptions = requestOptions.withResponseFileOption(responseFileOption);
}

try {
S3MetaRequestWrapper requestWrapper = new S3MetaRequestWrapper(crtS3Client.makeMetaRequest(requestOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3MetaRequestOptions;
import software.amazon.awssdk.http.SdkHttpExecutionAttribute;
import software.amazon.awssdk.regions.Region;

Expand Down Expand Up @@ -57,6 +58,12 @@ public final class S3InternalSdkHttpExecutionAttribute<T> extends SdkHttpExecuti
public static final S3InternalSdkHttpExecutionAttribute<ResponseChecksumValidation> RESPONSE_CHECKSUM_VALIDATION =
new S3InternalSdkHttpExecutionAttribute<>(ResponseChecksumValidation.class);

public static final S3InternalSdkHttpExecutionAttribute<Path> RESPONSE_FILE_PATH =
new S3InternalSdkHttpExecutionAttribute<>(Path.class);

public static final S3InternalSdkHttpExecutionAttribute<S3MetaRequestOptions.ResponseFileOption> RESPONSE_FILE_OPTION =
new S3InternalSdkHttpExecutionAttribute<>(S3MetaRequestOptions.ResponseFileOption.class);

private S3InternalSdkHttpExecutionAttribute(Class<T> valueClass) {
super(valueClass);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.SdkPublisher;

public class CrtResponseFileResponseTransformerTest {
private CrtResponseFileResponseTransformer<SdkResponse> transformer;
private SdkResponse response;
private MockCrtPublisher publisher;

@BeforeEach
public void setUp() throws Exception {
transformer = new CrtResponseFileResponseTransformer<>();
response = Mockito.mock(SdkResponse.class);
publisher = new MockCrtPublisher();
}

@Test
void successfulResponseAndStream_returnsResponsePublisher() throws Exception {
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
transformer.onResponse(response);
assertThat(responseFuture.isDone()).isFalse();
transformer.onStream(publisher);
publisher.complete();
assertThat(responseFuture.isDone()).isTrue();
SdkResponse returnedResponse = responseFuture.get();
assertThat(returnedResponse).isEqualTo(response);
}

@Test
void failedResponse_completesExceptionally() {
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
assertThat(responseFuture.isDone()).isFalse();
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - before response."));
assertThat(responseFuture.isDone()).isTrue();
assertThatThrownBy(responseFuture::get)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(RuntimeException.class);
}

@Test
void failedStream_completesExceptionally() {
CompletableFuture<SdkResponse> responseFuture = transformer.prepare();
transformer.onResponse(response);
assertThat(responseFuture.isDone()).isFalse();
transformer.exceptionOccurred(new RuntimeException("Intentional exception for testing purposes - after response."));
assertThat(responseFuture.isDone()).isTrue();
assertThatThrownBy(responseFuture::get)
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(RuntimeException.class);
}

private static class MockCrtPublisher implements SdkPublisher<ByteBuffer> {
private Subscriber<? super ByteBuffer> subscriber;
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
subscriber = s;
}

public void complete() {
subscriber.onComplete();
}
}

}
Loading
Loading