Skip to content

Commit 270c0f7

Browse files
committed
Donot do buffer the response using BufferedHttpEntity since it might cause memory issue, this behaviour is same as Apache4.x
1 parent 5f33ad9 commit 270c0f7

File tree

3 files changed

+208
-30
lines changed

3 files changed

+208
-30
lines changed

http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/Apache5HttpClient.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import static software.amazon.awssdk.http.apache5.internal.conn.ClientConnectionRequestFactory.THREAD_LOCAL_REQUEST_METRIC_COLLECTOR;
2424
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;
2525

26+
import java.io.ByteArrayInputStream;
2627
import java.io.IOException;
28+
import java.io.InputStream;
2729
import java.net.InetAddress;
2830
import java.security.KeyManagementException;
2931
import java.security.NoSuchAlgorithmException;
@@ -56,10 +58,11 @@
5658
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
5759
import org.apache.hc.core5.http.ClassicHttpResponse;
5860
import org.apache.hc.core5.http.Header;
61+
import org.apache.hc.core5.http.HttpEntity;
62+
import org.apache.hc.core5.http.HttpResponse;
5963
import org.apache.hc.core5.http.config.Registry;
6064
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
6165
import org.apache.hc.core5.http.io.SocketConfig;
62-
import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
6366
import org.apache.hc.core5.io.CloseMode;
6467
import org.apache.hc.core5.pool.PoolStats;
6568
import org.apache.hc.core5.ssl.SSLInitializationException;
@@ -80,6 +83,7 @@
8083
import software.amazon.awssdk.http.apache5.internal.DefaultConfiguration;
8184
import software.amazon.awssdk.http.apache5.internal.SdkProxyRoutePlanner;
8285
import software.amazon.awssdk.http.apache5.internal.conn.ClientConnectionManagerFactory;
86+
import software.amazon.awssdk.http.apache5.internal.conn.ConnectionAwareInputStream;
8387
import software.amazon.awssdk.http.apache5.internal.conn.IdleConnectionReaper;
8488
import software.amazon.awssdk.http.apache5.internal.conn.SdkConnectionKeepAliveStrategy;
8589
import software.amazon.awssdk.http.apache5.internal.conn.SdkTlsSocketFactory;
@@ -90,6 +94,7 @@
9094
import software.amazon.awssdk.metrics.MetricCollector;
9195
import software.amazon.awssdk.metrics.NoOpMetricCollector;
9296
import software.amazon.awssdk.utils.AttributeMap;
97+
import software.amazon.awssdk.utils.IoUtils;
9398
import software.amazon.awssdk.utils.Logger;
9499
import software.amazon.awssdk.utils.Validate;
95100

@@ -262,20 +267,14 @@ private HttpExecuteResponse execute(HttpUriRequestBase apacheRequest, MetricColl
262267
HttpClientContext localRequestContext = Apache5Utils.newClientContext(requestConfig.proxyConfiguration());
263268
THREAD_LOCAL_REQUEST_METRIC_COLLECTOR.set(metricCollector);
264269
try {
265-
return httpClient.execute(apacheRequest, localRequestContext, response -> {
266-
267-
// TODO : This is required since Apache5 closes streams immediately, check memory impacts because of this.
268-
if (response.getEntity() != null) {
269-
response.setEntity(new BufferedHttpEntity(response.getEntity()));
270-
}
271-
return createResponse(response, apacheRequest);
272-
});
270+
HttpResponse httpResponse = httpClient.execute(apacheRequest, localRequestContext);
271+
// Create a connection-aware input stream that closes the response when closed
272+
return createResponse(httpResponse, apacheRequest);
273273
} finally {
274274
THREAD_LOCAL_REQUEST_METRIC_COLLECTOR.remove();
275275
}
276276
}
277277

278-
279278
private HttpUriRequestBase toApacheRequest(HttpExecuteRequest request) {
280279
return apacheHttpRequestFactory.create(request, requestConfig);
281280
}
@@ -288,7 +287,7 @@ private HttpUriRequestBase toApacheRequest(HttpExecuteRequest request) {
288287
* @throws IOException If there were any problems getting any response information from the
289288
* HttpClient method object.
290289
*/
291-
private HttpExecuteResponse createResponse(ClassicHttpResponse apacheHttpResponse,
290+
private HttpExecuteResponse createResponse(HttpResponse apacheHttpResponse,
292291
HttpUriRequestBase apacheRequest) throws IOException {
293292
SdkHttpResponse.Builder responseBuilder =
294293
SdkHttpResponse.builder()
@@ -302,17 +301,50 @@ private HttpExecuteResponse createResponse(ClassicHttpResponse apacheHttpRespons
302301
responseBuilder.appendHeader(header.getName(), header.getValue());
303302

304303
}
305-
306-
AbortableInputStream responseBody = apacheHttpResponse.getEntity() != null ?
307-
toAbortableInputStream(apacheHttpResponse, apacheRequest) : null;
308-
304+
AbortableInputStream responseBody = getResponseBody(apacheHttpResponse, apacheRequest);
309305
return HttpExecuteResponse.builder().response(responseBuilder.build()).responseBody(responseBody).build();
310306

311307
}
312308

313-
private AbortableInputStream toAbortableInputStream(ClassicHttpResponse apacheHttpResponse,
314-
HttpUriRequestBase apacheRequest) throws IOException {
315-
return AbortableInputStream.create(apacheHttpResponse.getEntity().getContent(), apacheRequest::abort);
309+
private AbortableInputStream getResponseBody(HttpResponse apacheHttpResponse,
310+
HttpUriRequestBase apacheRequest) throws IOException {
311+
AbortableInputStream responseBody = null;
312+
if (apacheHttpResponse instanceof ClassicHttpResponse) {
313+
ClassicHttpResponse classicResponse = (ClassicHttpResponse) apacheHttpResponse;
314+
HttpEntity entity = classicResponse.getEntity();
315+
if (entity != null) {
316+
if (entity.getContentLength() == 0) {
317+
// Close immediately for empty responses
318+
classicResponse.close();
319+
responseBody = AbortableInputStream.create(new ByteArrayInputStream(new byte[0]));
320+
} else {
321+
responseBody = createConnectionAwareStream(classicResponse, apacheRequest);
322+
}
323+
} else {
324+
// No entity, close the response immediately
325+
classicResponse.close();
326+
}
327+
}
328+
return responseBody;
329+
}
330+
331+
private AbortableInputStream createConnectionAwareStream(ClassicHttpResponse apacheResponse,
332+
HttpUriRequestBase apacheRequest) throws IOException {
333+
InputStream entityStream = null;
334+
try {
335+
entityStream = apacheResponse.getEntity().getContent();
336+
return AbortableInputStream.create(
337+
new ConnectionAwareInputStream(entityStream, apacheResponse),
338+
() -> {
339+
apacheRequest.abort();
340+
IoUtils.closeQuietlyV2(apacheResponse, log);
341+
}
342+
);
343+
} catch (IOException e) {
344+
// Ensure response is closed on error
345+
IoUtils.closeQuietlyV2(apacheResponse, log);
346+
throw e;
347+
}
316348
}
317349

318350
private Apache5HttpRequestConfig createRequestConfig(DefaultBuilder builder,

http-clients/apache5-client/src/test/java/software/amazon/awssdk/http/apache5/MetricReportingTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,8 @@ public class MetricReportingTest {
6464
@Before
6565
public void methodSetup() throws IOException {
6666

67-
ClassicHttpResponse httpResponse = new BasicClassicHttpResponse(200, "OK");
68-
when(mockHttpClient.execute(any(HttpUriRequest.class), any(HttpContext.class), any(HttpClientResponseHandler.class)))
69-
.thenAnswer(invocation -> {
70-
HttpClientResponseHandler<HttpExecuteResponse> handler = invocation.getArgument(2);
71-
return handler.handleResponse(httpResponse);
72-
});
67+
when(mockHttpClient.execute(any(HttpUriRequest.class), any(HttpContext.class)))
68+
.thenReturn(new BasicClassicHttpResponse(200, "OK"));
7369

7470
when(mockHttpClient.getHttpClientConnectionManager()).thenReturn(cm);
7571

test/http-client-tests/src/main/java/software/amazon/awssdk/http/SdkHttpClientTestSuite.java

Lines changed: 156 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import static com.github.tomakehurst.wiremock.client.WireMock.any;
2121
import static com.github.tomakehurst.wiremock.client.WireMock.containing;
2222
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
2324
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
2425
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
2526
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
2627
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2728
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
2829
import static org.assertj.core.api.Assertions.assertThat;
2930
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
3032

3133
import com.github.tomakehurst.wiremock.WireMockServer;
3234
import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder;
@@ -236,6 +238,147 @@ public void doesNotRetryOn429StatusCode() throws Exception {
236238
}
237239
}
238240

241+
@Test
242+
public void handlesNoContentResponse() throws Exception {
243+
SdkHttpClient client = createSdkHttpClient();
244+
245+
mockServer.stubFor(any(urlPathEqualTo("/no-content"))
246+
.willReturn(aResponse()
247+
.withStatus(204)));
248+
249+
SdkHttpFullRequest req = mockSdkRequest("http://localhost:" + mockServer.port() + "/no-content",
250+
SdkHttpMethod.DELETE);
251+
HttpExecuteResponse rsp = client.prepareRequest(HttpExecuteRequest.builder()
252+
.request(req)
253+
.build())
254+
.call();
255+
256+
assertThat(rsp.httpResponse().statusCode()).isEqualTo(204);
257+
assertThat(rsp.responseBody()).isEmpty();
258+
}
259+
260+
@Test
261+
public void handlesLargeResponseBody() throws Exception {
262+
SdkHttpClient client = createSdkHttpClient();
263+
// Create a large response body (1MB)
264+
byte[] largeBody = new byte[1024 * 1024];
265+
for (int i = 0; i < largeBody.length; i++) {
266+
largeBody[i] = (byte) (i % 256);
267+
}
268+
mockServer.stubFor(any(urlPathEqualTo("/large"))
269+
.willReturn(aResponse()
270+
.withStatus(200)
271+
.withBody(largeBody)));
272+
273+
SdkHttpFullRequest req = mockSdkRequest("http://localhost:" + mockServer.port() + "/large", SdkHttpMethod.GET);
274+
HttpExecuteResponse rsp = client.prepareRequest(HttpExecuteRequest.builder()
275+
.request(req)
276+
.build())
277+
.call();
278+
279+
assertThat(rsp.httpResponse().statusCode()).isEqualTo(200);
280+
assertThat(rsp.responseBody()).isPresent();
281+
282+
// Read the entire response and verify
283+
byte[] readBuffer = IoUtils.toByteArray(rsp.responseBody().get());
284+
assertThat(readBuffer).isEqualTo(largeBody);
285+
}
286+
287+
@Test
288+
public void testAbortResponseStream() throws Exception {
289+
SdkHttpClient client = createSdkHttpClient();
290+
291+
mockServer.stubFor(any(urlPathEqualTo("/streaming"))
292+
.willReturn(aResponse()
293+
.withStatus(200)
294+
.withBody("This is a streaming response that should be aborted")));
295+
296+
SdkHttpFullRequest req = mockSdkRequest("http://localhost:" + mockServer.port() + "/streaming", SdkHttpMethod.POST);
297+
ExecutableHttpRequest executableRequest =
298+
client.prepareRequest(HttpExecuteRequest.builder()
299+
.request(req)
300+
.contentStreamProvider(req.contentStreamProvider().orElse(null))
301+
.build());
302+
HttpExecuteResponse rsp = executableRequest.call();
303+
304+
assertThat(rsp.httpResponse().statusCode()).isEqualTo(200);
305+
assertThat(rsp.responseBody()).isPresent();
306+
307+
// Verify the stream is abortable
308+
AbortableInputStream stream = rsp.responseBody().get();
309+
assertThat(stream).isInstanceOf(AbortableInputStream.class);
310+
311+
// Read a few bytes
312+
byte[] buffer = new byte[10];
313+
int bytesRead = stream.read(buffer);
314+
assertThat(bytesRead).isGreaterThan(0);
315+
assertThatCode(() -> stream.abort()).doesNotThrowAnyException();
316+
stream.close();
317+
}
318+
319+
@Test
320+
public void handlesMultipleSequentialRequests() throws Exception {
321+
SdkHttpClient client = createSdkHttpClient();
322+
323+
mockServer.stubFor(any(urlPathEqualTo("/sequential"))
324+
.willReturn(aResponse()
325+
.withStatus(200)
326+
.withBody("Response body")));
327+
328+
// Execute multiple requests sequentially
329+
for (int i = 0; i < 5; i++) {
330+
SdkHttpFullRequest req = mockSdkRequest("http://localhost:" + mockServer.port() + "/sequential", SdkHttpMethod.GET);
331+
HttpExecuteResponse rsp = client.prepareRequest(HttpExecuteRequest.builder()
332+
.request(req)
333+
.build())
334+
.call();
335+
336+
assertThat(rsp.httpResponse().statusCode()).isEqualTo(200);
337+
assertThat(IoUtils.toUtf8String(rsp.responseBody().orElse(null))).isEqualTo("Response body");
338+
}
339+
mockServer.verify(5, getRequestedFor(urlEqualTo("/sequential")));
340+
}
341+
342+
@Test
343+
public void handlesVariousContentLengths() throws Exception {
344+
SdkHttpClient client = createSdkHttpClient();
345+
int[] contentLengths = {0, 1, 100, 1024, 65536};
346+
347+
for (int length : contentLengths) {
348+
String path = "/content-length-" + length;
349+
byte[] body = new byte[length];
350+
for (int i = 0; i < length; i++) {
351+
body[i] = (byte) ('A' + (i % 26));
352+
}
353+
354+
mockServer.stubFor(any(urlPathEqualTo(path))
355+
.willReturn(aResponse()
356+
.withStatus(200)
357+
.withHeader("Content-Length", String.valueOf(length))
358+
.withBody(body)));
359+
360+
SdkHttpFullRequest req = mockSdkRequest("http://localhost:" + mockServer.port() + path, SdkHttpMethod.GET);
361+
HttpExecuteResponse rsp = client.prepareRequest(HttpExecuteRequest.builder()
362+
.request(req)
363+
.build())
364+
.call();
365+
366+
assertThat(rsp.httpResponse().statusCode()).isEqualTo(200);
367+
368+
if (length == 0) {
369+
// Empty body should still have a response body present, but EOF immediately
370+
if (rsp.responseBody().isPresent()) {
371+
assertThat(rsp.responseBody().get().read()).isEqualTo(-1);
372+
}
373+
} else {
374+
assertThat(rsp.responseBody()).isPresent();
375+
byte[] readBody = IoUtils.toByteArray(rsp.responseBody().get());
376+
assertThat(readBody).isEqualTo(body);
377+
}
378+
}
379+
}
380+
381+
239382
private void validateStatusCodeWithRetryCheck(SdkHttpClient client,
240383
int expectedStatusCode,
241384
int expectedRequestCount) throws IOException {
@@ -294,15 +437,13 @@ protected void testForResponseCodeUsingHttps(SdkHttpClient client, int returnCod
294437
.orElse(null))
295438
.build())
296439
.call();
297-
298440
validateResponse(rsp, returnCode, sdkHttpMethod);
299441
}
300442

301443
protected void stubForMockRequest(int returnCode) {
302444
ResponseDefinitionBuilder responseBuilder = aResponse().withStatus(returnCode)
303445
.withHeader("Some-Header", "With Value")
304446
.withBody("hello");
305-
306447
if (returnCode >= 300 && returnCode <= 399) {
307448
responseBuilder.withHeader("Location", "Some New Location");
308449
}
@@ -316,7 +457,6 @@ private void validateResponse(HttpExecuteResponse response, int returnCode, SdkH
316457
RequestPatternBuilder patternBuilder = RequestPatternBuilder.newRequestPattern(requestMethod, urlMatching("/"))
317458
.withHeader("Host", containing("localhost"))
318459
.withHeader("User-Agent", equalTo("hello-world!"));
319-
320460
if (method == SdkHttpMethod.HEAD) {
321461
patternBuilder.withRequestBody(absent());
322462
} else {
@@ -359,13 +499,23 @@ private static SdkHttpFullRequest.Builder mockSdkRequestBuilder(String uriString
359499
return requestBuilder;
360500
}
361501

362-
363502
protected SdkHttpFullRequest mockSdkRequest(String uriString, SdkHttpMethod method) {
364-
SdkHttpFullRequest.Builder requestBuilder = mockSdkRequestBuilder(uriString, method);
365-
if (method != SdkHttpMethod.HEAD) {
503+
URI uri = URI.create(uriString);
504+
SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder()
505+
.uri(uri)
506+
.method(method)
507+
.putHeader("Host", uri.getHost())
508+
.putHeader("User-Agent", "hello-world!");
509+
510+
// Only add body for methods that typically have a body
511+
if (method != SdkHttpMethod.HEAD && method != SdkHttpMethod.GET && method != SdkHttpMethod.DELETE) {
366512
byte[] content = "Body".getBytes(StandardCharsets.UTF_8);
367513
requestBuilder.putHeader("Content-Length", Integer.toString(content.length));
368514
requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(content));
515+
} else if (method == SdkHttpMethod.GET || method == SdkHttpMethod.DELETE) {
516+
// For GET and DELETE, explicitly set Content-Length to 0 or don't set it at all
517+
// Some clients like AWS CRT are strict about this
518+
requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
369519
}
370520

371521
return requestBuilder.build();

0 commit comments

Comments
 (0)