Skip to content

Commit 4460429

Browse files
authored
Fixed the issue where closed connection exception thrown from AWS CRT… (#4816)
* Fixed the issue where closed connection exception thrown from AWS CRT sync HTTP client was not retried. * Remove unused import
1 parent 43e3185 commit 4460429

File tree

8 files changed

+504
-308
lines changed

8 files changed

+504
-308
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS CRT HTTP Client",
4+
"contributor": "",
5+
"description": "Fixed the issue where `AWS_ERROR_HTTP_CONNECTION_CLOSED` was not retried by the SDK."
6+
}

http-clients/aws-crt-client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@
169169
<version>${commons-codec.verion}</version>
170170
<scope>test</scope>
171171
</dependency>
172+
<dependency>
173+
<groupId>org.mockito</groupId>
174+
<artifactId>mockito-junit-jupiter</artifactId>
175+
<scope>test</scope>
176+
</dependency>
172177
</dependencies>
173178

174179
<build>

http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/AwsCrtAsyncHttpClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
2929
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
3030
import software.amazon.awssdk.http.crt.internal.CrtAsyncRequestContext;
31-
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
31+
import software.amazon.awssdk.http.crt.internal.CrtAsyncRequestExecutor;
3232
import software.amazon.awssdk.utils.AttributeMap;
3333

3434
/**
@@ -98,7 +98,7 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
9898
.request(asyncRequest)
9999
.build();
100100

101-
return new CrtRequestExecutor().execute(context);
101+
return new CrtAsyncRequestExecutor().execute(context);
102102
}
103103
}
104104

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.crt.internal;
17+
18+
import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics;
19+
import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException;
20+
import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.CompletableFuture;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.crt.CrtRuntimeException;
26+
import software.amazon.awssdk.crt.http.HttpClientConnection;
27+
import software.amazon.awssdk.crt.http.HttpException;
28+
import software.amazon.awssdk.crt.http.HttpRequest;
29+
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
30+
import software.amazon.awssdk.http.SdkCancellationException;
31+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
32+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
33+
import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
34+
import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
35+
import software.amazon.awssdk.metrics.MetricCollector;
36+
import software.amazon.awssdk.metrics.NoOpMetricCollector;
37+
import software.amazon.awssdk.utils.Logger;
38+
39+
@SdkInternalApi
40+
public final class CrtAsyncRequestExecutor {
41+
42+
private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class);
43+
44+
public CompletableFuture<Void> execute(CrtAsyncRequestContext executionContext) {
45+
// go ahead and get a reference to the metricCollector since multiple futures will
46+
// need it regardless.
47+
MetricCollector metricCollector = executionContext.metricCollector();
48+
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
49+
50+
long acquireStartTime = 0;
51+
52+
if (shouldPublishMetrics) {
53+
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
54+
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
55+
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
56+
acquireStartTime = System.nanoTime();
57+
}
58+
59+
CompletableFuture<Void> requestFuture = createAsyncExecutionFuture(executionContext.sdkRequest());
60+
61+
// When a Connection is ready from the Connection Pool, schedule the Request on the connection
62+
CompletableFuture<HttpClientConnection> httpClientConnectionCompletableFuture =
63+
executionContext.crtConnPool().acquireConnection();
64+
65+
long finalAcquireStartTime = acquireStartTime;
66+
67+
httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> {
68+
AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
69+
70+
if (shouldPublishMetrics) {
71+
reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime);
72+
}
73+
74+
// If we didn't get a connection for some reason, fail the request
75+
if (throwable != null) {
76+
Throwable toThrow = wrapConnectionFailureException(throwable);
77+
reportAsyncFailure(crtConn, toThrow, requestFuture, asyncRequest.responseHandler());
78+
return;
79+
}
80+
81+
executeRequest(executionContext, requestFuture, crtConn, asyncRequest);
82+
});
83+
84+
return requestFuture;
85+
}
86+
87+
private void executeRequest(CrtAsyncRequestContext executionContext,
88+
CompletableFuture<Void> requestFuture,
89+
HttpClientConnection crtConn,
90+
AsyncExecuteRequest asyncRequest) {
91+
HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext);
92+
HttpStreamResponseHandler crtResponseHandler =
93+
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
94+
95+
// Submit the request on the connection
96+
try {
97+
crtConn.makeRequest(crtRequest, crtResponseHandler).activate();
98+
} catch (HttpException e) {
99+
Throwable toThrow = wrapWithIoExceptionIfRetryable(e);
100+
reportAsyncFailure(crtConn,
101+
toThrow,
102+
requestFuture,
103+
asyncRequest.responseHandler());
104+
} catch (IllegalStateException | CrtRuntimeException e) {
105+
// CRT throws IllegalStateException if the connection is closed
106+
reportAsyncFailure(crtConn, new IOException("An exception occurred when making the request", e),
107+
requestFuture,
108+
asyncRequest.responseHandler());
109+
} catch (Throwable throwable) {
110+
reportAsyncFailure(crtConn, throwable,
111+
requestFuture,
112+
asyncRequest.responseHandler());
113+
}
114+
}
115+
116+
/**
117+
* Create the execution future and set up the cancellation logic.
118+
* @return The created execution future.
119+
*/
120+
private CompletableFuture<Void> createAsyncExecutionFuture(AsyncExecuteRequest request) {
121+
CompletableFuture<Void> future = new CompletableFuture<>();
122+
123+
future.whenComplete((r, t) -> {
124+
if (t == null) {
125+
return;
126+
}
127+
128+
// TODO: Aborting request once it's supported in CRT
129+
if (future.isCancelled()) {
130+
request.responseHandler().onError(new SdkCancellationException("The request was cancelled"));
131+
}
132+
});
133+
134+
return future;
135+
}
136+
137+
/**
138+
* Notify the provided response handler and future of the failure.
139+
*/
140+
private void reportAsyncFailure(HttpClientConnection crtConn,
141+
Throwable cause,
142+
CompletableFuture<Void> executeFuture,
143+
SdkAsyncHttpResponseHandler responseHandler) {
144+
if (crtConn != null) {
145+
crtConn.close();
146+
}
147+
148+
try {
149+
responseHandler.onError(cause);
150+
} catch (Exception e) {
151+
log.error(() -> "SdkAsyncHttpResponseHandler " + responseHandler + " threw an exception in onError. It will be "
152+
+ "ignored.", e);
153+
}
154+
executeFuture.completeExceptionally(cause);
155+
}
156+
}

0 commit comments

Comments
 (0)