diff --git a/.changes/next-release/bugfix-AWSCRTAsyncHTTPClient-a045823.json b/.changes/next-release/bugfix-AWSCRTAsyncHTTPClient-a045823.json new file mode 100644 index 000000000000..f54b5fa0302e --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTAsyncHTTPClient-a045823.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT Async HTTP Client", + "contributor": "", + "description": "Fixed potential connection leak issue when SDK failed to convert the SDK request to CRT request" +} diff --git a/.changes/next-release/bugfix-AWSCRTHTTPAsyncClient-88e9d07.json b/.changes/next-release/bugfix-AWSCRTHTTPAsyncClient-88e9d07.json new file mode 100644 index 000000000000..7ad23e59bb11 --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPAsyncClient-88e9d07.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT Async HTTP Client", + "contributor": "", + "description": "Fixed the issue where AWS CRT HTTP client was eagerly buffering data before the underlying CRT component was able to handle it" +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 4db35956e8d4..4ede09d7ee3e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -88,12 +88,12 @@ private void executeRequest(CrtAsyncRequestContext executionContext, CompletableFuture requestFuture, HttpClientConnection crtConn, AsyncExecuteRequest asyncRequest) { - HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext); - HttpStreamResponseHandler crtResponseHandler = - CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler()); - // Submit the request on the connection try { + HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext); + HttpStreamResponseHandler crtResponseHandler = + CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler()); + crtConn.makeRequest(crtRequest, crtResponseHandler).activate(); } catch (HttpException e) { Throwable toThrow = wrapWithIoExceptionIfRetryable(e); diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java index 79ee03ad5067..6fa64d8a011d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.http.crt.internal.request; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequestBodyStream; import software.amazon.awssdk.http.async.SdkHttpContentPublisher; @@ -26,15 +27,19 @@ final class CrtRequestBodyAdapter implements HttpRequestBodyStream { private final SdkHttpContentPublisher requestPublisher; private final ByteBufferStoringSubscriber requestBodySubscriber; + private final AtomicBoolean subscribed = new AtomicBoolean(false); CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, long readLimit) { this.requestPublisher = requestPublisher; this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit); - requestPublisher.subscribe(requestBodySubscriber); } @Override public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + if (subscribed.compareAndSet(false, true)) { + requestPublisher.subscribe(requestBodySubscriber); + } + return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java index a83bd27ec2e6..feaf9db3d472 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutorTest.java @@ -105,6 +105,29 @@ public void acquireConnectionThrowException_shouldInvokeOnError() { assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class); } + @Test + public void invalidRequest_requestConversionThrowError_shouldInvokeOnError() { + CrtAsyncRequestContext context = CrtAsyncRequestContext.builder() + .crtConnPool(connectionManager) + .request(AsyncExecuteRequest.builder() + .responseHandler(responseHandler) + .build()) + .build(); + CompletableFuture completableFuture = new CompletableFuture<>(); + + Mockito.when(connectionManager.acquireConnection()).thenReturn(completableFuture); + completableFuture.complete(httpClientConnection); + + CompletableFuture executeFuture = requestExecutor.execute(context); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Exception.class); + Mockito.verify(responseHandler).onError(argumentCaptor.capture()); + + Exception actualException = argumentCaptor.getValue(); + assertThat(actualException).isInstanceOf(NullPointerException.class); + assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class); + } + @Test public void executeAsyncRequest_CrtRuntimeException_shouldInvokeOnError() { CrtRuntimeException exception = new CrtRuntimeException("");