Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-09114f5.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based S3 Client",
"contributor": "",
"description": "Fixed an issue in the AWS CRT-based S3 client where a GetObject request with `AsyncResponseTransformer#toBlockingInputStream` may hang if request failed mid streaming"
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private S3MetaRequestWrapper s3MetaRequest() {

@Override
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
log.debug(() -> "Received response header with status code " + statusCode);
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
// whether the request has succeeded or not (e.g. if this is for a HeadObject call that CRT calls under the hood). We
// need to rely on onResponseBody/onFinished being called to determine this.
Expand Down Expand Up @@ -150,6 +151,7 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
@Override
public void onFinished(S3FinishedResponseContext context) {
int crtCode = context.getErrorCode();
log.debug(() -> "Request finished with code: " + crtCode);
if (crtCode != CRT.AWS_CRT_SUCCESS) {
handleError(context);
} else {
Expand Down Expand Up @@ -192,6 +194,19 @@ private void handleIoError(S3FinishedResponseContext context, int crtCode) {
SdkClientException.create("Failed to send the request: " +
CRT.awsErrorString(crtCode), cause);
failResponseHandlerAndFuture(sdkClientException);
notifyResponsePublisherErrorIfNeeded(sdkClientException);
}

private void notifyResponsePublisherErrorIfNeeded(Throwable error) {
if (responseHandlingInitiated) {
responsePublisher.error(error).handle((ignore, throwable) -> {
if (throwable != null) {
log.warn(() -> "Exception thrown in responsePublisher#error, ignoring", throwable);
return null;
}
return null;
});
}
}

private void handleServiceError(int responseStatus, HttpHeader[] headers, byte[] errorPayload) {
Expand All @@ -204,6 +219,7 @@ private void handleServiceError(int responseStatus, HttpHeader[] headers, byte[]
SdkClientException.create("Request failed during the transfer due to an error returned from S3");
s3Exception.addSuppressed(sdkClientException);
failResponseHandlerAndFuture(s3Exception);
notifyResponsePublisherErrorIfNeeded(s3Exception);
} else {
initiateResponseHandling(errorResponse.build());
onErrorResponseComplete(errorPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER_ALTERNATE;
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZ_ID_2_HEADER;

import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.http.HttpHeader;
import com.github.tomakehurst.wiremock.http.HttpHeaders;
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
Expand All @@ -46,7 +47,9 @@
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.Log;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -149,6 +152,77 @@ public void requestFailedMidway_shouldThrowException() {
});
}

@Test
public void toBlockingInputStream_requestFailedMidwayDueToServerError_shouldThrowException() {
HttpHeaders httpHeaders = new HttpHeaders(new HttpHeader("content-length", "12345676"),
new HttpHeader("etag", E_TAG));
stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200)
.withHeaders(httpHeaders)));

stubFor(get(anyUrl())
.inScenario("SucceedThenFail")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("first request")
.willReturn(aResponse()
.withStatus(200)
.withBody("helloworld".getBytes(StandardCharsets.UTF_8))));

stubFor(get(anyUrl())
.inScenario("SucceedThenFail")
.whenScenarioStateIs("first request")
.willSetStateTo("second request")
.willReturn(aResponse()
.withStatus(404)
.withHeader(X_AMZ_ID_2_HEADER, "foo")
.withHeader(X_AMZN_REQUEST_ID_HEADER_ALTERNATE, "bar")
.withBody("".getBytes(StandardCharsets.UTF_8))));
ResponseInputStream<GetObjectResponse> stream = s3AsyncClient.getObject(
r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream())
.join();
byte[] buffer = new byte[1024 * 8];
assertThatThrownBy(() -> stream.read(buffer, 0, buffer.length))
.satisfies(throwable -> {
assertThat(throwable).isInstanceOf(S3Exception.class);
S3Exception s3Exception = (S3Exception) throwable;
assertThat(s3Exception.statusCode()).isEqualTo(404);
assertThat(s3Exception.extendedRequestId()).isEqualTo("foo");
assertThat(s3Exception.requestId()).isEqualTo("bar");
});
}

@Test
public void toBlockingInputStream_requestFailedMidwayDueToIoError_shouldThrowException() {
HttpHeaders httpHeaders = new HttpHeaders(new HttpHeader("content-length", "12345676"),
new HttpHeader("etag", E_TAG));
stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200)
.withHeaders(httpHeaders)));

stubFor(get(anyUrl())
.inScenario("SucceedThenFail")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("first request")
.willReturn(aResponse()
.withStatus(200)
.withBody("helloworld".getBytes(StandardCharsets.UTF_8))));

stubFor(get(anyUrl())
.inScenario("SucceedThenFail")
.whenScenarioStateIs("first request")
.willSetStateTo("second request")
.willReturn(aResponse()
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
ResponseInputStream<GetObjectResponse> stream = s3AsyncClient.getObject(
r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream())
.join();
byte[] buffer = new byte[1024 * 8];
assertThatThrownBy(() -> stream.read(buffer, 0, buffer.length))
.satisfies(throwable -> {
assertThat(throwable).isInstanceOf(SdkClientException.class);
SdkClientException exception = (SdkClientException) throwable;
assertThat(exception.getMessage()).contains("Failed to send the request");
});
}

@Test
void overrideResponseCompletionExecutor_shouldCompleteWithCustomExecutor(WireMockRuntimeInfo wiremock) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.async.DrainingSubscriber;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.crt.http.HttpHeader;
Expand Down Expand Up @@ -173,14 +174,33 @@ public void requestFailedMidwayDueToServerError_shouldCompleteFutureWithS3Except
when(errorContext.getErrorHeaders()).thenReturn(headers.toArray(new HttpHeader[0]));

responseHandlerAdapter.onFinished(errorContext);
Throwable actualException = sdkResponseHandler.error;
assertThat(actualException).isInstanceOf(S3Exception.class);
Throwable exceptionFromResponseHandler = sdkResponseHandler.error;
Throwable exceptionFromSubscriber = sdkResponseHandler.subscriber.error;

assertThat(((S3Exception) actualException).statusCode()).isEqualTo(404);
assertThat(((S3Exception) actualException).requestId()).isEqualTo("1234");
assertThat(((S3Exception) actualException).extendedRequestId()).isEqualTo("5678");
assertThat(exceptionFromResponseHandler).isInstanceOf(S3Exception.class);
assertThat(((S3Exception) exceptionFromResponseHandler).statusCode()).isEqualTo(404);
assertThat(((S3Exception) exceptionFromResponseHandler).requestId()).isEqualTo("1234");
assertThat(((S3Exception) exceptionFromResponseHandler).extendedRequestId()).isEqualTo("5678");
assertThat(exceptionFromResponseHandler).isEqualTo(exceptionFromSubscriber);

assertThatThrownBy(() -> future.join()).hasRootCause(actualException);
assertThatThrownBy(() -> future.join()).hasRootCause(exceptionFromResponseHandler);
assertThat(future).isCompletedExceptionally();
verify(s3MetaRequest).close();
}

@Test
public void requestFailedMidwayDueToIoError_shouldInvokeOnError() {
responseHandlerAdapter.onResponseHeaders(200, new HttpHeader[0]);
responseHandlerAdapter.onResponseBody(ByteBuffer.wrap("helloworld".getBytes(StandardCharsets.UTF_8)), 0, 0);

S3FinishedResponseContext errorContext = stubResponseContext(1079, 0, "".getBytes());
responseHandlerAdapter.onFinished(errorContext);
Throwable exceptionFromResponseHandler = sdkResponseHandler.error;
Throwable exceptionFromSubscriber = sdkResponseHandler.subscriber.error;

assertThat(exceptionFromResponseHandler).isEqualTo(exceptionFromSubscriber);
assertThat(exceptionFromResponseHandler).isInstanceOf(SdkClientException.class);
assertThatThrownBy(() -> future.join()).hasRootCause(exceptionFromResponseHandler);
assertThat(future).isCompletedExceptionally();
verify(s3MetaRequest).close();
}
Expand Down Expand Up @@ -217,19 +237,30 @@ private void stubOnResponseBody() {
private static class TestResponseHandler implements SdkAsyncHttpResponseHandler {
private SdkHttpResponse sdkHttpResponse;
private Throwable error;
private TestSubscriber subscriber = new TestSubscriber();

@Override
public void onHeaders(SdkHttpResponse headers) {
this.sdkHttpResponse = headers;
}

@Override
public void onStream(Publisher<ByteBuffer> stream) {
stream.subscribe(new DrainingSubscriber<>());
stream.subscribe(subscriber);
}

@Override
public void onError(Throwable error) {
this.error = error;
}
}

private static class TestSubscriber extends DrainingSubscriber {
private Throwable error;
@Override
public void onError(Throwable throwable) {
error = throwable;
super.onError(throwable);
}
}
}
Loading
Loading