diff --git a/.builder/actions/localhost_test.py b/.builder/actions/localhost_test.py new file mode 100644 index 000000000..0dd32b385 --- /dev/null +++ b/.builder/actions/localhost_test.py @@ -0,0 +1,19 @@ +import Builder +import sys +import os + + +class LocalhostTest(Builder.Action): + + def run(self, env): + env.shell.setenv('AWS_CRT_MEMORY_TRACING', '2') + actions = [] + if os.system("mvn -Dtest=Http2ClientLocalHostTest test -DredirectTestOutputToFile=true -DforkCount=0 \ + -DrerunFailingTestsCount=5 \ + -Daws.crt.memory.tracing=2 \ + -Daws.crt.debugnative=true \ + -Daws.crt.localhost=true"): + # Failed + actions.append("exit 1") + + return Builder.Script(actions, name='aws-crt-java-test') diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fb59ab1aa..68a24e898 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -182,3 +182,60 @@ jobs: # note: using "@main" because "@${{env.BUILDER_VERSION}}" doesn't work # https://github.com/actions/runner/issues/480 uses: awslabs/aws-crt-builder/.github/actions/check-submodules@main + + + localhost-test-linux: + runs-on: ubuntu-20.04 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + - name: Configure local host + run: | + python3 -m pip install h2 + ls crt/aws-c-http/ + cd crt/aws-c-http/tests/py_localhost/ + python3 server.py & + python3 non_tls_server.py & + - name: Build and test + run: | + python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" + python builder.pyz localhost-test -p ${{ env.PACKAGE_NAME }} --spec=downstream + + localhost-test-mac: + runs-on: macos-11 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + - name: Configure local host + run: | + python3 -m pip install h2 + cd crt/aws-c-http/tests/py_localhost/ + python3 server.py & + python3 non_tls_server.py & + - name: Build and test + run: | + python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')" + chmod a+x builder + ./builder localhost-test -p ${{ env.PACKAGE_NAME }} --spec=downstream + + localhost-test-win: + runs-on: windows-2022 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + - name: Configure local host + run: | + python -m pip install h2 + - name: Build and test + run: | + cd crt/aws-c-http/tests/py_localhost/ + Start-Process -NoNewWindow python .\server.py + Start-Process -NoNewWindow python .\non_tls_server.py + python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" + python builder.pyz localhost-test -p ${{ env.PACKAGE_NAME }} downstream diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java new file mode 100644 index 000000000..6f1abc80d --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java @@ -0,0 +1,245 @@ +package software.amazon.awssdk.crt.http; + +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.AsyncCallback; +import software.amazon.awssdk.crt.io.TlsContext; + +import java.util.concurrent.CompletableFuture; +import java.net.URI; +import java.nio.charset.Charset; + +/** + * Manages a Pool of HTTP/2 Streams. Creates and manages HTTP/2 connections + * under the hood. + */ +public class Http2StreamManager extends CrtResource { + + private static final String HTTP = "http"; + private static final String HTTPS = "https"; + private static final int DEFAULT_HTTP_PORT = 80; + private static final int DEFAULT_HTTPS_PORT = 443; + private final static Charset UTF8 = java.nio.charset.StandardCharsets.UTF_8; + + private final URI uri; + private final int port; + private final int maxConnections; + private final int idealConcurrentStreamsPerConnection; + private final int maxConcurrentStreamsPerConnection; + private final CompletableFuture shutdownComplete = new CompletableFuture<>(); + + /** + * Factory function for Http2StreamManager instances + * + * @param options configuration options + * @return a new instance of an Http2StreamManager + */ + public static Http2StreamManager create(Http2StreamManagerOptions options) { + return new Http2StreamManager(options); + } + + private Http2StreamManager(Http2StreamManagerOptions options) { + options.validateOptions(); + + HttpClientConnectionManagerOptions connectionManagerOptions = options.getConnectionManagerOptions(); + URI uri = connectionManagerOptions.getUri(); + ClientBootstrap clientBootstrap = connectionManagerOptions.getClientBootstrap(); + SocketOptions socketOptions = connectionManagerOptions.getSocketOptions(); + boolean useTls = HTTPS.equals(uri.getScheme()); + TlsContext tlsContext = connectionManagerOptions.getTlsContext(); + int maxConnections = connectionManagerOptions.getMaxConnections(); + int port = connectionManagerOptions.getPort(); + if (port == -1) { + port = uri.getPort(); + /* Pick a default port based on the scheme if one wasn't set */ + if (port == -1) { + if (HTTP.equals(uri.getScheme())) { port = DEFAULT_HTTP_PORT; } + if (HTTPS.equals(uri.getScheme())) { port = DEFAULT_HTTPS_PORT; } + } + } + + int maxConcurrentStreamsPerConnection = options.getMaxConcurrentStreamsPerConnection(); + int idealConcurrentStreamsPerConnection = options.getIdealConcurrentStreamsPerConnection(); + + this.uri = uri; + this.port = port; + this.maxConnections = maxConnections; + this.idealConcurrentStreamsPerConnection = idealConcurrentStreamsPerConnection; + this.maxConcurrentStreamsPerConnection = maxConcurrentStreamsPerConnection; + + int proxyConnectionType = 0; + String proxyHost = null; + int proxyPort = 0; + TlsContext proxyTlsContext = null; + int proxyAuthorizationType = 0; + String proxyAuthorizationUsername = null; + String proxyAuthorizationPassword = null; + HttpProxyOptions proxyOptions = connectionManagerOptions.getProxyOptions(); + + if (proxyOptions != null) { + proxyConnectionType = proxyOptions.getConnectionType().getValue(); + proxyHost = proxyOptions.getHost(); + proxyPort = proxyOptions.getPort(); + proxyTlsContext = proxyOptions.getTlsContext(); + proxyAuthorizationType = proxyOptions.getAuthorizationType().getValue(); + proxyAuthorizationUsername = proxyOptions.getAuthorizationUsername(); + proxyAuthorizationPassword = proxyOptions.getAuthorizationPassword(); + } + + HttpMonitoringOptions monitoringOptions = connectionManagerOptions.getMonitoringOptions(); + long monitoringThroughputThresholdInBytesPerSecond = 0; + int monitoringFailureIntervalInSeconds = 0; + if (monitoringOptions != null) { + monitoringThroughputThresholdInBytesPerSecond = monitoringOptions.getMinThroughputBytesPerSecond(); + monitoringFailureIntervalInSeconds = monitoringOptions.getAllowableThroughputFailureIntervalSeconds(); + } + + acquireNativeHandle(http2StreamManagerNew(this, + clientBootstrap.getNativeHandle(), + socketOptions.getNativeHandle(), + useTls ? tlsContext.getNativeHandle() : 0, + Http2ConnectionSetting.marshallSettingsForJNI(options.getInitialSettingsList()), + uri.getHost().getBytes(UTF8), + port, + proxyConnectionType, + proxyHost != null ? proxyHost.getBytes(UTF8) : null, + proxyPort, + proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0, + proxyAuthorizationType, + proxyAuthorizationUsername != null ? proxyAuthorizationUsername.getBytes(UTF8) : null, + proxyAuthorizationPassword != null ? proxyAuthorizationPassword.getBytes(UTF8) : null, + connectionManagerOptions.isManualWindowManagement(), + monitoringThroughputThresholdInBytesPerSecond, + monitoringFailureIntervalInSeconds, + maxConnections, + idealConcurrentStreamsPerConnection, + maxConcurrentStreamsPerConnection)); + + /* + * we don't need to add a reference to socketOptions since it's copied during + * connection manager construction + */ + addReferenceTo(clientBootstrap); + if (useTls) { + addReferenceTo(tlsContext); + } + } + + /** + * Request a Http2Stream from StreamManager. + * + * @param request The Request to make to the Server. + * @param streamHandler The Stream Handler to be called from the Native + * EventLoop + * @return A future for a Http2Stream that will be completed when the stream is + * acquired. + */ + public CompletableFuture acquireStream(Http2Request request, + HttpStreamBaseResponseHandler streamHandler) { + + return this.acquireStream((HttpRequestBase) request, streamHandler); + } + + public CompletableFuture acquireStream(HttpRequest request, + HttpStreamBaseResponseHandler streamHandler) { + + return this.acquireStream((HttpRequestBase) request, streamHandler); + } + + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler) { + CompletableFuture completionFuture = new CompletableFuture<>(); + AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null); + if (isNull()) { + completionFuture.completeExceptionally(new IllegalStateException( + "Http2StreamManager has been closed, can't acquire new streams")); + return completionFuture; + } + try { + http2StreamManagerAcquireStream(this.getNativeHandle(), + request.marshalForJni(), + request.getBodyStream(), + new HttpStreamResponseHandlerNativeAdapter(streamHandler), + acquireStreamCompleted); + } catch (CrtRuntimeException ex) { + completionFuture.completeExceptionally(ex); + } + return completionFuture; + } + + /** + * Called from Native when all Streams from this Stream manager have finished + * and underlying resources like connections opened under the hood has been + * cleaned up + * begin releasing Native Resources that Http2StreamManager depends on. + */ + private void onShutdownComplete() { + releaseReferences(); + + this.shutdownComplete.complete(null); + } + + /** + * Determines whether a resource releases its dependencies at the same time the + * native handle is released or if it waits. + * Resources that wait are responsible for calling releaseReferences() manually. + */ + @Override + protected boolean canReleaseReferencesImmediately() { + return false; + } + + /** + * Closes this Connection Pool and any pending Connection Acquisitions + */ + @Override + protected void releaseNativeHandle() { + if (!isNull()) { + /* + * Release our Native pointer and schedule tasks on the Native Event Loop to + * start sending HTTP/TLS/TCP + * connection shutdown messages to peers for any open Connections. + */ + http2StreamManagerRelease(getNativeHandle()); + } + } + + public CompletableFuture getShutdownCompleteFuture() { + return shutdownComplete; + } + + /******************************************************************************* + * Native methods + ******************************************************************************/ + + private static native long http2StreamManagerNew(Http2StreamManager thisObj, + long client_bootstrap, + long socketOptions, + long tlsContext, + long[] marshalledSettings, + byte[] endpoint, + int port, + int proxyConnectionType, + byte[] proxyHost, + int proxyPort, + long proxyTlsContext, + int proxyAuthorizationType, + byte[] proxyAuthorizationUsername, + byte[] proxyAuthorizationPassword, + boolean isManualWindowManagement, + long monitoringThroughputThresholdInBytesPerSecond, + int monitoringFailureIntervalInSeconds, + int maxConns, + int ideal_concurrent_streams_per_connection, + int max_concurrent_streams_per_connection) throws CrtRuntimeException; + + private static native void http2StreamManagerRelease(long stream_manager) throws CrtRuntimeException; + + private static native void http2StreamManagerAcquireStream(long stream_manager, + byte[] marshalledRequest, + HttpRequestBodyStream bodyStream, + HttpStreamResponseHandlerNativeAdapter responseHandler, + AsyncCallback completedCallback) throws CrtRuntimeException; +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java new file mode 100644 index 000000000..d0c8c985d --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/Http2StreamManagerOptions.java @@ -0,0 +1,159 @@ +package software.amazon.awssdk.crt.http; + +import java.util.List; +import java.util.ArrayList; + +/** + * Contains all the configuration options for a Http2StreamManager + * instance + */ +public class Http2StreamManagerOptions { + public static final int DEFAULT_MAX_WINDOW_SIZE = Integer.MAX_VALUE; + public static final int DEFAULT_MAX = Integer.MAX_VALUE; + public static final int DEFAULT_MAX_CONNECTIONS = 2; + + private HttpClientConnectionManagerOptions connectionManagerOptions; + + private int idealConcurrentStreamsPerConnection = 100; + private boolean connectionManualWindowManagement = false; + private int maxConcurrentStreamsPerConnection = DEFAULT_MAX; + + private List initialSettingsList = new ArrayList(); + + /** + * Default constructor + */ + public Http2StreamManagerOptions() { + } + + /** + * For HTTP/2 stream manager only. + * + * The initial settings for the HTTP/2 connections made by stream manger. + * `Http2ConnectionSettingListBuilder` can help to build the settings list. + * + * To control the initial stream-level flow-control window, set the INITIAL_WINDOW_SIZE setting in the initial settings. + * + * @param initialSettingsList The List of initial settings + * @return this + */ + public Http2StreamManagerOptions withInitialSettingsList(List initialSettingsList) { + this.initialSettingsList.addAll(initialSettingsList); + return this; + } + + /** + * @return The List of initial settings + */ + public List getInitialSettingsList() { + return this.initialSettingsList; + } + + /** + * For HTTP/2 stream manager only. + * + * The ideal number of concurrent streams for a connection. Stream manager will + * try to create a new connection if one connection reaches this number. But, if + * the max connections reaches, manager will reuse connections to create the + * acquired steams as much as possible. + * + * @param idealConcurrentStreamsPerConnection The ideal number of concurrent + * streams for a connection + * @return this + */ + public Http2StreamManagerOptions withIdealConcurrentStreamsPerConnection(int idealConcurrentStreamsPerConnection) { + this.idealConcurrentStreamsPerConnection = idealConcurrentStreamsPerConnection; + return this; + } + + /** + * @return The ideal number of concurrent streams for a connection used for + * manager + */ + public int getIdealConcurrentStreamsPerConnection() { + return idealConcurrentStreamsPerConnection; + } + + /** + * Default is no limit, which will use the limit from the server. 0 will be + * considered as using the default value. + * The real number of concurrent streams per connection will be controlled by + * the minimal value of the setting from other end and the value here. + * + * @param maxConcurrentStreamsPerConnection The max number of concurrent + * streams for a connection + * @return this + */ + public Http2StreamManagerOptions withMaxConcurrentStreamsPerConnection(int maxConcurrentStreamsPerConnection) { + this.maxConcurrentStreamsPerConnection = maxConcurrentStreamsPerConnection; + return this; + } + + /** + * @return The max number of concurrent streams for a connection set for + * manager. + * It could be different than the real limits, which is the minimal set + * for manager and the settings from the other side. + */ + public int getMaxConcurrentStreamsPerConnection() { + return maxConcurrentStreamsPerConnection; + } + + /** + * @return The connection level manual flow control enabled or not. + */ + public boolean isConnectionManualWindowManagement() { + return connectionManualWindowManagement; + } + + /** + * Set to true to manually manage the flow-control window of whole HTTP/2 connection. + * The stream level flow-control window is controlled by the manualWindowManagement in connectionManagerOptions. + * + * @param connectionManualWindowManagement Enable connection level manual flow control or not. + * @return this + */ + public Http2StreamManagerOptions withConnectionManualWindowManagement(boolean connectionManualWindowManagement) { + this.connectionManualWindowManagement = connectionManualWindowManagement; + return this; + } + + /** + * @return The connection manager options for the underlying connection manager. + */ + public HttpClientConnectionManagerOptions getConnectionManagerOptions() { + return connectionManagerOptions; + } + + /** + * The configuration options for the connection manager under the hood. + * It controls the connection specific thing for the stream manager. See `HttpClientConnectionManagerOptions` for details. + * + * Note: + * 1. the windowSize of connection manager will be ignored, as the initial flow-control window size for HTTP/2 stream + * is controlled by the initial settings. + * 2. The expectedHttpVersion will also be ignored. + * + * @param connectionManagerOptions The connection manager options for the underlying connection manager + * @return this + */ + public Http2StreamManagerOptions withConnectionManagerOptions(HttpClientConnectionManagerOptions connectionManagerOptions) { + this.connectionManagerOptions = connectionManagerOptions; + return this; + } + + /** + * Validate the stream manager options are valid to use. Throw exceptions if not. + */ + public void validateOptions() { + connectionManagerOptions.validateOptions(); + if (maxConcurrentStreamsPerConnection <= 0) { + throw new IllegalArgumentException("Max Concurrent Streams Per Connection must be greater than zero."); + } + if (idealConcurrentStreamsPerConnection <= 0 + || idealConcurrentStreamsPerConnection > maxConcurrentStreamsPerConnection) { + throw new IllegalArgumentException( + "Ideal Concurrent Streams Per Connection must be greater than zero and smaller than max."); + } + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManager.java b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManager.java index 60a66a15f..666f5ac21 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManager.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManager.java @@ -45,31 +45,15 @@ public static HttpClientConnectionManager create(HttpClientConnectionManagerOpti } private HttpClientConnectionManager(HttpClientConnectionManagerOptions options) { - URI uri = options.getUri(); - if (uri == null) { throw new IllegalArgumentException("URI must not be null"); } - if (uri.getScheme() == null) { throw new IllegalArgumentException("URI does not have a Scheme"); } - if (!HTTP.equals(uri.getScheme()) && !HTTPS.equals(uri.getScheme())) { throw new IllegalArgumentException("URI has unknown Scheme"); } - if (uri.getHost() == null) { throw new IllegalArgumentException("URI does not have a Host name"); } + options.validateOptions(); + URI uri = options.getUri(); ClientBootstrap clientBootstrap = options.getClientBootstrap(); - if (clientBootstrap == null) { throw new IllegalArgumentException("ClientBootstrap must not be null"); } - SocketOptions socketOptions = options.getSocketOptions(); - if (socketOptions == null) { throw new IllegalArgumentException("SocketOptions must not be null"); } - boolean useTls = HTTPS.equals(uri.getScheme()); TlsContext tlsContext = options.getTlsContext(); - if (useTls && tlsContext == null) { throw new IllegalArgumentException("TlsContext must not be null if https is used"); } - int windowSize = options.getWindowSize(); - if (windowSize <= 0) { throw new IllegalArgumentException("Window Size must be greater than zero."); } - - int bufferSize = options.getBufferSize(); - if (bufferSize <= 0) { throw new IllegalArgumentException("Buffer Size must be greater than zero."); } - int maxConnections = options.getMaxConnections(); - if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections must be greater than zero."); } - int port = options.getPort(); if (port == -1) { port = uri.getPort(); diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManagerOptions.java b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManagerOptions.java index 4615b8f53..1766bee7a 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManagerOptions.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpClientConnectionManagerOptions.java @@ -31,6 +31,9 @@ public class HttpClientConnectionManagerOptions { private long maxConnectionIdleInMilliseconds = 0; private HttpVersion expectedHttpVersion = HttpVersion.HTTP_1_1; + private static final String HTTP = "http"; + private static final String HTTPS = "https"; + /** * Default constructor */ @@ -99,7 +102,7 @@ public HttpClientConnectionManagerOptions withWindowSize(int windowSize) { public int getWindowSize() { return windowSize; } /** - * Sets the IO buffer size to use for connections in the connection pool + * @deprecated Sets the IO buffer size to use for connections in the connection pool * @param bufferSize Size of I/O buffer per connection * @return this */ @@ -109,11 +112,11 @@ public HttpClientConnectionManagerOptions withBufferSize(int bufferSize) { } /** + * @deprecated * @return the IO buffer size to use for connections in the connection pool */ public int getBufferSize() { return bufferSize; } - /** * Sets the URI to use for connections in the connection pool * @param uri The endpoint URI to connect to @@ -249,4 +252,26 @@ public HttpClientConnectionManagerOptions withMonitoringOptions(HttpMonitoringOp * @return the monitoring options for connections in the connection pool */ public HttpMonitoringOptions getMonitoringOptions() { return monitoringOptions; } + + /** + * Validate the connection manager options are valid to use. Throw exceptions if not. + */ + public void validateOptions() { + URI uri = this.getUri(); + if (uri == null) { throw new IllegalArgumentException("URI must not be null"); } + if (uri.getScheme() == null) { throw new IllegalArgumentException("URI does not have a Scheme"); } + if (!HTTP.equals(uri.getScheme()) && !HTTPS.equals(uri.getScheme())) { throw new IllegalArgumentException("URI has unknown Scheme"); } + if (uri.getHost() == null) { throw new IllegalArgumentException("URI does not have a Host name"); } + + if (clientBootstrap == null) { throw new IllegalArgumentException("ClientBootstrap must not be null"); } + + if (socketOptions == null) { throw new IllegalArgumentException("SocketOptions must not be null"); } + + boolean useTls = HTTPS.equals(uri.getScheme()); + if (useTls && tlsContext == null) { throw new IllegalArgumentException("TlsContext must not be null if https is used"); } + + if (windowSize <= 0) { throw new IllegalArgumentException("Window Size must be greater than zero."); } + + if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections must be greater than zero."); } + } } diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java new file mode 100644 index 000000000..a85ac6ef8 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamManager.java @@ -0,0 +1,197 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.amazon.awssdk.crt.http; + +import software.amazon.awssdk.crt.CrtRuntimeException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages a pool for either HTTP/1.1 or HTTP/2 connection. + * For HTTP/1.1, it will grab a connection from HttpClientConnectionManager to + * make request on it, and will return it back until the request finishes. + * For HTTP/2, it will use Http2StreamManager under the hood. + */ +public class HttpStreamManager implements AutoCloseable { + + private HttpClientConnectionManager connectionManager = null; + private Http2StreamManager h2StreamManager = null; + private CompletableFuture shutdownComplete = null; + private AtomicLong shutdownNum = new AtomicLong(0); + private Throwable shutdownCompleteException = null; + + /** + * Factory function for HttpStreamManager instances + * + * @param options configuration options + * @return a new instance of an HttpStreamManager + */ + public static HttpStreamManager create(Http2StreamManagerOptions options) { + return new HttpStreamManager(options); + } + + private HttpStreamManager(Http2StreamManagerOptions options) { + HttpClientConnectionManagerOptions connManagerOptions = options.getConnectionManagerOptions(); + this.connectionManager = HttpClientConnectionManager.create(connManagerOptions); + this.h2StreamManager = Http2StreamManager.create(options); + this.shutdownComplete = new CompletableFuture(); + this.connectionManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> { + if (throwable != null) { + this.shutdownCompleteException = throwable; + } + long shutdownNum = this.shutdownNum.addAndGet(1); + if (shutdownNum == 2) { + /* both connectionManager and the h2StreamManager has been shutdown. */ + if (this.shutdownCompleteException != null) { + this.shutdownComplete.completeExceptionally(this.shutdownCompleteException); + } else { + this.shutdownComplete.complete(null); + } + } + }); + this.h2StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> { + if (throwable != null) { + this.shutdownCompleteException = throwable; + } + long shutdownNum = this.shutdownNum.addAndGet(1); + if (shutdownNum == 2) { + /* both connectionManager and the h2StreamManager has been shutdown. */ + if (this.shutdownCompleteException != null) { + this.shutdownComplete.completeExceptionally(this.shutdownCompleteException); + } else { + this.shutdownComplete.complete(null); + } + } + }); + } + + /** + * Get the protocol version the manager runs on. + */ + public HttpVersion getHttpVersion() throws Exception { + if (this.h2StreamManager != null && this.connectionManager != null) { + try (HttpClientConnection conn = this.connectionManager.acquireConnection().get(30, TimeUnit.SECONDS)) { + if (conn.getVersion() == HttpVersion.HTTP_2) { + /** + * The connection is HTTP/2, close the connectionManager, which made for HTTP/1 + */ + this.connectionManager.releaseConnection(conn); + this.connectionManager.close(); + this.connectionManager = null; + } else { + /** + * The connection is HTTP/1, close the h2StreamManager, which made for HTTP/2 + */ + this.connectionManager.releaseConnection(conn); + this.h2StreamManager.close(); + this.h2StreamManager = null; + } + } + } + if (this.h2StreamManager != null) { + return HttpVersion.HTTP_2; + } + return HttpVersion.HTTP_1_1; + } + + /** + * Request a HttpStream from StreamManager. If the streamManager is made with + * HTTP/2 connection under the hood, it will be Http2Stream. + * + * @param request + * @param streamHandler + * @return A future for a Http2Stream that will be completed when the stream is + * acquired. + * @throws CrtRuntimeException + */ + public CompletableFuture acquireStream(HttpRequestBase request, + HttpStreamBaseResponseHandler streamHandler) { + CompletableFuture completionFuture = new CompletableFuture<>(); + try { + /* + * Try get version first. If we haven't decided the version yet, this will help + * us to decide the version we are running on + */ + this.getHttpVersion(); + } catch (Exception e) { + completionFuture.completeExceptionally(e); + return completionFuture; + } + if (this.h2StreamManager != null) { + this.h2StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> { + if (throwable != null) { + completionFuture.completeExceptionally(throwable); + } else { + completionFuture.complete((Http2Stream) stream); + } + }); + return completionFuture; + } + HttpClientConnectionManager connManager = this.connectionManager; + this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> { + if (throwable != null) { + completionFuture.completeExceptionally(throwable); + } else { + try { + HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders); + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + streamHandler.onResponseHeadersDone(stream, blockType); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + return streamHandler.onResponseBody(stream, bodyBytesIn); + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + streamHandler.onResponseComplete(stream, errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + }); + completionFuture.complete(stream); + /* Active the stream for user */ + try { + stream.activate(); + } catch (CrtRuntimeException e) { + /* If activate failed, complete callback will not be invoked */ + streamHandler.onResponseComplete(stream, e.errorCode); + /* Release the connection back */ + connManager.releaseConnection(conn); + } + } catch (Exception ex) { + connManager.releaseConnection(conn); + completionFuture.completeExceptionally(ex); + } + } + }); + return completionFuture; + } + + public CompletableFuture getShutdownCompleteFuture() { + return shutdownComplete; + } + + @Override + public void close() { + if (this.connectionManager != null) { + this.connectionManager.close(); + } + if (this.h2StreamManager != null) { + this.h2StreamManager.close(); + } + } +} diff --git a/src/native/aws_signing.c b/src/native/aws_signing.c index afc769bce..d9f2b7e9b 100644 --- a/src/native/aws_signing.c +++ b/src/native/aws_signing.c @@ -73,10 +73,6 @@ static void s_cleanup_callback_data(struct s_aws_sign_request_callback_data *cal } if (callback_data->native_request) { - struct aws_input_stream *input_stream = aws_http_message_get_body_stream(callback_data->native_request); - if (input_stream != NULL) { - aws_input_stream_destroy(input_stream); - } aws_http_message_release(callback_data->native_request); } diff --git a/src/native/http2_stream_manager.c b/src/native/http2_stream_manager.c new file mode 100644 index 000000000..92c0e88e4 --- /dev/null +++ b/src/native/http2_stream_manager.c @@ -0,0 +1,410 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include "crt.h" +#include "http_connection_manager.h" +#include "http_request_response.h" +#include "http_request_utils.h" +#include "java_class_ids.h" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */ +#if UINTPTR_MAX == 0xffffffff +# if defined(_MSC_VER) +# pragma warning(push) +# pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */ +# else +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wpointer-to-int-cast" +# pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +# endif +#endif + +/* + * Stream manager binding, persists across the lifetime of the native object. + */ +struct aws_http2_stream_manager_binding { + JavaVM *jvm; + jweak java_http2_stream_manager; + struct aws_http2_stream_manager *stream_manager; +}; + +static void s_destroy_manager_binding(struct aws_http2_stream_manager_binding *binding, JNIEnv *env) { + if (binding == NULL) { + return; + } + if (binding->java_http2_stream_manager != NULL) { + (*env)->DeleteWeakGlobalRef(env, binding->java_http2_stream_manager); + } + + aws_mem_release(aws_jni_get_allocator(), binding); +} + +static void s_on_stream_manager_shutdown_complete_callback(void *user_data) { + + struct aws_http2_stream_manager_binding *binding = (struct aws_http2_stream_manager_binding *)user_data; + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + + AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM_MANAGER, "Java Stream Manager Shutdown Complete"); + jobject java_http2_stream_manager = (*env)->NewLocalRef(env, binding->java_http2_stream_manager); + if (java_http2_stream_manager != NULL) { + (*env)->CallVoidMethod(env, java_http2_stream_manager, http2_stream_manager_properties.onShutdownComplete); + + /* If exception raised from Java callback, but we already closed the stream manager, just move on */ + aws_jni_check_and_clear_exception(env); + + (*env)->DeleteLocalRef(env, java_http2_stream_manager); + } + + /* We're done with this wrapper, free it. */ + s_destroy_manager_binding(binding, env); + aws_jni_release_thread_env(binding->jvm, env); + /********** JNI ENV RELEASE **********/ +} + +JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_http2StreamManagerNew( + JNIEnv *env, + jclass jni_class, + jobject stream_manager_jobject, + jlong jni_client_bootstrap, + jlong jni_socket_options, + jlong jni_tls_ctx, + jlongArray java_marshalled_settings, + jbyteArray jni_endpoint, + jint jni_port, + jint jni_proxy_connection_type, + jbyteArray jni_proxy_host, + jint jni_proxy_port, + jlong jni_proxy_tls_context, + jint jni_proxy_authorization_type, + jbyteArray jni_proxy_authorization_username, + jbyteArray jni_proxy_authorization_password, + jboolean jni_manual_window_management, + jlong jni_monitoring_throughput_threshold_in_bytes_per_second, + jint jni_monitoring_failure_interval_in_seconds, + jint jni_max_conns, + jint jni_ideal_concurrent_streams_per_connection, + jint jni_max_concurrent_streams_per_connection) { + + (void)jni_class; + + struct aws_client_bootstrap *client_bootstrap = (struct aws_client_bootstrap *)jni_client_bootstrap; + struct aws_socket_options *socket_options = (struct aws_socket_options *)jni_socket_options; + struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_ctx; + struct aws_http2_stream_manager_binding *binding = NULL; + struct aws_allocator *allocator = aws_jni_get_allocator(); + + if (!client_bootstrap) { + aws_jni_throw_illegal_argument_exception(env, "ClientBootstrap can't be null"); + return (jlong)NULL; + } + + if (!socket_options) { + aws_jni_throw_illegal_argument_exception(env, "SocketOptions can't be null"); + return (jlong)NULL; + } + + const size_t marshalled_len = (*env)->GetArrayLength(env, java_marshalled_settings); + AWS_ASSERT(marshalled_len % 2 == 0); + + size_t num_initial_settings = marshalled_len / 2; + struct aws_http2_setting *initial_settings = + num_initial_settings ? aws_mem_calloc(allocator, num_initial_settings, sizeof(struct aws_http2_setting)) : NULL; + + jlong *marshalled_settings = (*env)->GetLongArrayElements(env, java_marshalled_settings, NULL); + for (size_t i = 0; i < num_initial_settings; i++) { + jlong id = marshalled_settings[i * 2]; + initial_settings[i].id = (uint32_t)id; + jlong value = marshalled_settings[i * 2 + 1]; + /* We checked the value can fit into uint32_t in Java already */ + initial_settings[i].value = (uint32_t)value; + } + + struct aws_byte_cursor endpoint = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_endpoint); + + if (jni_port <= 0 || 65535 < jni_port) { + aws_jni_throw_illegal_argument_exception(env, "Port must be between 1 and 65535"); + goto cleanup; + } + + if (jni_max_conns <= 0) { + aws_jni_throw_illegal_argument_exception(env, "Max Connections must be > 0"); + goto cleanup; + } + + uint16_t port = (uint16_t)jni_port; + + int use_tls = (jni_tls_ctx != 0); + + struct aws_tls_connection_options tls_conn_options; + AWS_ZERO_STRUCT(tls_conn_options); + + if (use_tls) { + aws_tls_connection_options_init_from_ctx(&tls_conn_options, tls_ctx); + aws_tls_connection_options_set_server_name(&tls_conn_options, allocator, &endpoint); + } + + binding = aws_mem_calloc(allocator, 1, sizeof(struct aws_http2_stream_manager_binding)); + AWS_FATAL_ASSERT(binding); + binding->java_http2_stream_manager = (*env)->NewWeakGlobalRef(env, stream_manager_jobject); + + jint jvmresult = (*env)->GetJavaVM(env, &binding->jvm); + (void)jvmresult; + AWS_FATAL_ASSERT(jvmresult == 0); + + struct aws_http2_stream_manager_options manager_options; + AWS_ZERO_STRUCT(manager_options); + + manager_options.bootstrap = client_bootstrap; + manager_options.initial_settings_array = initial_settings; + manager_options.num_initial_settings = num_initial_settings; + + manager_options.socket_options = socket_options; + manager_options.tls_connection_options = NULL; + manager_options.host = endpoint; + manager_options.port = port; + manager_options.shutdown_complete_callback = &s_on_stream_manager_shutdown_complete_callback; + manager_options.shutdown_complete_user_data = binding; + manager_options.monitoring_options = NULL; + /* TODO: this variable needs to be renamed in aws-c-http. Come back and change it next revision. */ + manager_options.enable_read_back_pressure = jni_manual_window_management; + + manager_options.max_connections = (size_t)jni_max_conns; + manager_options.ideal_concurrent_streams_per_connection = (size_t)jni_ideal_concurrent_streams_per_connection; + manager_options.max_concurrent_streams_per_connection = (size_t)jni_max_concurrent_streams_per_connection; + + if (use_tls) { + manager_options.tls_connection_options = &tls_conn_options; + } + + struct aws_http_connection_monitoring_options monitoring_options; + AWS_ZERO_STRUCT(monitoring_options); + if (jni_monitoring_throughput_threshold_in_bytes_per_second >= 0 && + jni_monitoring_failure_interval_in_seconds >= 2) { + monitoring_options.minimum_throughput_bytes_per_second = + jni_monitoring_throughput_threshold_in_bytes_per_second; + monitoring_options.allowable_throughput_failure_interval_seconds = jni_monitoring_failure_interval_in_seconds; + + manager_options.monitoring_options = &monitoring_options; + } + + struct aws_http_proxy_options proxy_options; + AWS_ZERO_STRUCT(proxy_options); + + struct aws_tls_connection_options proxy_tls_conn_options; + AWS_ZERO_STRUCT(proxy_tls_conn_options); + + aws_http_proxy_options_jni_init( + env, + &proxy_options, + jni_proxy_connection_type, + &proxy_tls_conn_options, + jni_proxy_host, + (uint16_t)jni_proxy_port, + jni_proxy_authorization_username, + jni_proxy_authorization_password, + jni_proxy_authorization_type, + (struct aws_tls_ctx *)jni_proxy_tls_context); + + if (jni_proxy_host != NULL) { + manager_options.proxy_options = &proxy_options; + } + + binding->stream_manager = aws_http2_stream_manager_new(allocator, &manager_options); + if (binding->stream_manager == NULL) { + aws_jni_throw_runtime_exception(env, "Failed to create stream manager: %s", aws_error_str(aws_last_error())); + } + + aws_http_proxy_options_jni_clean_up( + env, &proxy_options, jni_proxy_host, jni_proxy_authorization_username, jni_proxy_authorization_password); + + if (use_tls) { + aws_tls_connection_options_clean_up(&tls_conn_options); + } + +cleanup: + aws_jni_byte_cursor_from_jbyteArray_release(env, jni_endpoint, endpoint); + + if (binding->stream_manager == NULL) { + s_destroy_manager_binding(binding, env); + binding = NULL; + } + + return (jlong)binding; +} + +/* + * Stream manager binding, persists across the lifetime of the native object. + */ +struct aws_sm_acquire_stream_callback_data { + JavaVM *jvm; + struct http_stream_binding *stream_binding; + jobject java_async_callback; +}; + +static void s_cleanup_sm_acquire_stream_callback_data( + struct aws_sm_acquire_stream_callback_data *callback_data, + JNIEnv *env) { + + if (callback_data->java_async_callback) { + (*env)->DeleteGlobalRef(env, callback_data->java_async_callback); + } + aws_mem_release(aws_jni_get_allocator(), callback_data); +} + +static struct aws_sm_acquire_stream_callback_data *s_new_sm_acquire_stream_callback_data( + JNIEnv *env, + struct aws_allocator *allocator, + struct http_stream_binding *stream_binding, + jobject async_callback) { + struct aws_sm_acquire_stream_callback_data *callback_data = + aws_mem_calloc(allocator, 1, sizeof(struct aws_sm_acquire_stream_callback_data)); + + jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm); + AWS_FATAL_ASSERT(jvmresult == 0); + callback_data->java_async_callback = async_callback ? (*env)->NewGlobalRef(env, async_callback) : NULL; + AWS_FATAL_ASSERT(callback_data->java_async_callback != NULL); + callback_data->stream_binding = stream_binding; + + return callback_data; +} + +static void s_on_stream_acquired(struct aws_http_stream *stream, int error_code, void *user_data) { + struct aws_sm_acquire_stream_callback_data *callback_data = user_data; + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm); + if (error_code) { + jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, error_code); + (*env)->CallVoidMethod( + env, callback_data->java_async_callback, async_callback_properties.on_failure, crt_exception); + (*env)->DeleteLocalRef(env, crt_exception); + aws_http_stream_binding_destroy(env, callback_data->stream_binding); + } else { + callback_data->stream_binding->native_stream = stream; + jobject j_http_stream = + aws_java_http_stream_from_native_new(env, callback_data->stream_binding, AWS_HTTP_VERSION_2); + if (!j_http_stream) { + jobject crt_exception = aws_jni_new_crt_exception_from_error_code(env, aws_last_error()); + (*env)->CallVoidMethod( + env, callback_data->java_async_callback, async_callback_properties.on_failure, crt_exception); + (*env)->DeleteLocalRef(env, crt_exception); + aws_http_stream_binding_destroy(env, callback_data->stream_binding); + } else { + /* Stream is activated once we acquired from the Stream Manager */ + aws_atomic_store_int(&callback_data->stream_binding->activated, 1); + callback_data->stream_binding->java_http_stream_base = (*env)->NewGlobalRef(env, j_http_stream); + (*env)->CallVoidMethod( + env, + callback_data->java_async_callback, + async_callback_properties.on_success_with_object, + callback_data->stream_binding->java_http_stream_base); + (*env)->DeleteLocalRef(env, j_http_stream); + } + } + AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env)); + s_cleanup_sm_acquire_stream_callback_data(callback_data, env); + aws_jni_release_thread_env(callback_data->jvm, env); + /********** JNI ENV RELEASE **********/ +} + +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_http2StreamManagerAcquireStream( + JNIEnv *env, + jclass jni_class, + jlong jni_stream_manager, + jbyteArray marshalled_request, + jobject jni_http_request_body_stream, + jobject jni_http_response_callback_handler, + jobject java_async_callback) { + (void)jni_class; + struct aws_http2_stream_manager_binding *sm_binding = (struct aws_http2_stream_manager_binding *)jni_stream_manager; + struct aws_http2_stream_manager *stream_manager = sm_binding->stream_manager; + + if (!stream_manager) { + aws_jni_throw_illegal_argument_exception(env, "Stream Manager can't be null"); + return; + } + + if (!jni_http_response_callback_handler) { + aws_jni_throw_illegal_argument_exception( + env, "Http2StreamManager.acquireStream: Invalid jni_http_response_callback_handler"); + return; + } + if (!java_async_callback) { + aws_jni_throw_illegal_argument_exception(env, "Http2StreamManager.acquireStream: Invalid async callback"); + return; + } + + struct http_stream_binding *stream_binding = aws_http_stream_binding_alloc(env, jni_http_response_callback_handler); + if (!stream_binding) { + /* Exception already thrown */ + return; + } + + stream_binding->native_request = + aws_http_request_new_from_java_http_request(env, marshalled_request, jni_http_request_body_stream); + if (stream_binding->native_request == NULL) { + /* Exception already thrown */ + aws_http_stream_binding_destroy(env, stream_binding); + return; + } + + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = stream_binding->native_request, + /* Set Callbacks */ + .on_response_headers = aws_java_http_stream_on_incoming_headers_fn, + .on_response_header_block_done = aws_java_http_stream_on_incoming_header_block_done_fn, + .on_response_body = aws_java_http_stream_on_incoming_body_fn, + .on_complete = aws_java_http_stream_on_stream_complete_fn, + .user_data = stream_binding, + }; + + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct aws_sm_acquire_stream_callback_data *callback_data = + s_new_sm_acquire_stream_callback_data(env, allocator, stream_binding, java_async_callback); + + struct aws_http2_stream_manager_acquire_stream_options acquire_options = { + .options = &request_options, + .callback = s_on_stream_acquired, + .user_data = callback_data, + }; + + aws_http2_stream_manager_acquire_stream(sm_binding->stream_manager, &acquire_options); +} + +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2StreamManager_http2StreamManagerRelease( + JNIEnv *env, + jclass jni_class, + jlong jni_stream_manager) { + (void)jni_class; + + struct aws_http2_stream_manager_binding *sm_binding = (struct aws_http2_stream_manager_binding *)jni_stream_manager; + struct aws_http2_stream_manager *stream_manager = sm_binding->stream_manager; + + if (!stream_manager) { + aws_jni_throw_runtime_exception(env, "Stream Manager can't be null"); + return; + } + + AWS_LOGF_DEBUG(AWS_LS_HTTP_CONNECTION, "Releasing StreamManager: id: %p", (void *)stream_manager); + aws_http2_stream_manager_release(stream_manager); +} diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c index c2cadbe8d..0355dbf0a 100644 --- a/src/native/http_request_response.c +++ b/src/native/http_request_response.c @@ -7,6 +7,7 @@ #include "crt.h" #include "http_connection_manager.h" +#include "http_request_response.h" #include "http_request_utils.h" #include "java_class_ids.h" @@ -34,7 +35,7 @@ # endif #endif -static jobject s_java_http_stream_from_native_new(JNIEnv *env, void *opaque, enum aws_http_version version) { +jobject aws_java_http_stream_from_native_new(JNIEnv *env, void *opaque, int version) { jlong jni_native_ptr = (jlong)opaque; AWS_ASSERT(jni_native_ptr); jobject stream = NULL; @@ -57,42 +58,24 @@ static jobject s_java_http_stream_from_native_new(JNIEnv *env, void *opaque, enu return stream; } -static void s_java_http_stream_from_native_delete(JNIEnv *env, jobject jHttpStream) { +void aws_java_http_stream_from_native_delete(JNIEnv *env, jobject jHttpStream) { /* Delete our reference to the HttpStream Object from the JVM. */ (*env)->DeleteGlobalRef(env, jHttpStream); } /******************************************************************************* - * http_stream_callback_data - carries around data needed by the various http request + * http_stream_binding - carries around data needed by the various http request * callbacks. ******************************************************************************/ -struct http_stream_callback_data { - JavaVM *jvm; - - // TEMP: Until Java API changes to match "H1B" native HTTP API, - // create aws_http_message and aws_input_stream under the hood. - struct aws_http_message *native_request; - - jobject java_http_response_stream_handler; - jobject java_http_stream_base; - struct aws_http_stream *native_stream; - struct aws_byte_buf headers_buf; - int response_status; - /* - * Unactivated streams must have their callback data destroyed at release time - */ - struct aws_atomic_var activated; -}; - -static void http_stream_callback_destroy(JNIEnv *env, struct http_stream_callback_data *callback) { +void aws_http_stream_binding_destroy(JNIEnv *env, struct http_stream_binding *callback) { if (callback == NULL) { return; } if (callback->java_http_stream_base) { - s_java_http_stream_from_native_delete(env, callback->java_http_stream_base); + aws_java_http_stream_from_native_delete(env, callback->java_http_stream_base); } if (callback->java_http_response_stream_handler != NULL) { @@ -100,23 +83,17 @@ static void http_stream_callback_destroy(JNIEnv *env, struct http_stream_callbac } if (callback->native_request) { - struct aws_input_stream *input_stream = aws_http_message_get_body_stream(callback->native_request); - if (input_stream != NULL) { - aws_input_stream_destroy(input_stream); - } - - aws_http_message_destroy(callback->native_request); + aws_http_message_release(callback->native_request); } - aws_byte_buf_clean_up(&callback->headers_buf); aws_mem_release(aws_jni_get_allocator(), callback); } // If error occurs, A Java exception is thrown and NULL is returned. -static struct http_stream_callback_data *http_stream_callback_alloc(JNIEnv *env, jobject java_callback_handler) { +struct http_stream_binding *aws_http_stream_binding_alloc(JNIEnv *env, jobject java_callback_handler) { struct aws_allocator *allocator = aws_jni_get_allocator(); - struct http_stream_callback_data *callback = aws_mem_calloc(allocator, 1, sizeof(struct http_stream_callback_data)); + struct http_stream_binding *callback = aws_mem_calloc(allocator, 1, sizeof(struct http_stream_binding)); AWS_FATAL_ASSERT(callback); // GetJavaVM() reference doesn't need a NewGlobalRef() call since it's global by default @@ -133,7 +110,7 @@ static struct http_stream_callback_data *http_stream_callback_alloc(JNIEnv *env, return callback; } -static int s_on_incoming_headers_fn( +int aws_java_http_stream_on_incoming_headers_fn( struct aws_http_stream *stream, enum aws_http_header_block block_type, const struct aws_http_header *header_array, @@ -141,7 +118,7 @@ static int s_on_incoming_headers_fn( void *user_data) { (void)block_type; - struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + struct http_stream_binding *callback = (struct http_stream_binding *)user_data; int resp_status = -1; int err_code = aws_http_stream_get_incoming_response_status(stream, &resp_status); if (err_code != AWS_OP_SUCCESS) { @@ -160,13 +137,13 @@ static int s_on_incoming_headers_fn( return AWS_OP_SUCCESS; } -static int s_on_incoming_header_block_done_fn( +int aws_java_http_stream_on_incoming_header_block_done_fn( struct aws_http_stream *stream, enum aws_http_header_block block_type, void *user_data) { (void)stream; - struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; + struct http_stream_binding *callback = (struct http_stream_binding *)user_data; /********** JNI ENV ACQUIRE **********/ JNIEnv *env = aws_jni_acquire_thread_env(callback->jvm); @@ -222,8 +199,11 @@ static int s_on_incoming_header_block_done_fn( return result; } -static int s_on_incoming_body_fn(struct aws_http_stream *stream, const struct aws_byte_cursor *data, void *user_data) { - struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; +int aws_java_http_stream_on_incoming_body_fn( + struct aws_http_stream *stream, + const struct aws_byte_cursor *data, + void *user_data) { + struct http_stream_binding *callback = (struct http_stream_binding *)user_data; size_t total_window_increment = 0; @@ -275,9 +255,8 @@ static int s_on_incoming_body_fn(struct aws_http_stream *stream, const struct aw return result; } -static void s_on_stream_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data) { - - struct http_stream_callback_data *callback = (struct http_stream_callback_data *)user_data; +void aws_java_http_stream_on_stream_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data) { + struct http_stream_binding *callback = (struct http_stream_binding *)user_data; /********** JNI ENV ACQUIRE **********/ JNIEnv *env = aws_jni_acquire_thread_env(callback->jvm); @@ -301,7 +280,7 @@ static void s_on_stream_complete_fn(struct aws_http_stream *stream, int error_co } JavaVM *jvm = callback->jvm; - http_stream_callback_destroy(env, callback); + aws_http_stream_binding_destroy(env, callback); aws_jni_release_thread_env(jvm, env); /********** JNI ENV RELEASE **********/ } @@ -355,8 +334,7 @@ static jobject s_make_request_general( return (jobject)NULL; } - struct http_stream_callback_data *callback_data = - http_stream_callback_alloc(env, jni_http_response_callback_handler); + struct http_stream_binding *callback_data = aws_http_stream_binding_alloc(env, jni_http_response_callback_handler); if (!callback_data) { /* Exception already thrown */ return (jobject)NULL; @@ -366,7 +344,7 @@ static jobject s_make_request_general( aws_http_request_new_from_java_http_request(env, marshalled_request, jni_http_request_body_stream); if (callback_data->native_request == NULL) { /* Exception already thrown */ - http_stream_callback_destroy(env, callback_data); + aws_http_stream_binding_destroy(env, callback_data); return (jobject)NULL; } @@ -374,10 +352,10 @@ static jobject s_make_request_general( .self_size = sizeof(request_options), .request = callback_data->native_request, /* Set Callbacks */ - .on_response_headers = s_on_incoming_headers_fn, - .on_response_header_block_done = s_on_incoming_header_block_done_fn, - .on_response_body = s_on_incoming_body_fn, - .on_complete = s_on_stream_complete_fn, + .on_response_headers = aws_java_http_stream_on_incoming_headers_fn, + .on_response_header_block_done = aws_java_http_stream_on_incoming_header_block_done_fn, + .on_response_body = aws_java_http_stream_on_incoming_body_fn, + .on_complete = aws_java_http_stream_on_stream_complete_fn, .user_data = callback_data, }; @@ -391,7 +369,7 @@ static jobject s_make_request_general( (void *)native_conn, (void *)callback_data->native_stream); - jHttpStreamBase = s_java_http_stream_from_native_new(env, callback_data, version); + jHttpStreamBase = aws_java_http_stream_from_native_new(env, callback_data, version); } /* Check for errors that might have occurred while holding the lock. */ @@ -399,7 +377,7 @@ static jobject s_make_request_general( /* Failed to create native aws_http_stream. Clean up callback_data. */ AWS_LOGF_ERROR(AWS_LS_HTTP_CONNECTION, "Stream Request Failed. conn: %p", (void *)native_conn); aws_jni_throw_runtime_exception(env, "HttpClientConnection.MakeRequest: Unable to Execute Request"); - http_stream_callback_destroy(env, callback_data); + aws_http_stream_binding_destroy(env, callback_data); return NULL; } else if (!jHttpStreamBase) { /* Failed to create java HttpStream, but did create native aws_http_stream. @@ -449,7 +427,7 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_http_Http2ClientConnec } struct http_stream_chunked_callback_data { - struct http_stream_callback_data *stream_cb_data; + struct http_stream_binding *stream_cb_data; struct aws_byte_buf chunk_data; struct aws_input_stream *chunk_stream; jobject completion_callback; @@ -498,7 +476,7 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStream_httpStrea jobject completion_callback) { (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; struct http_stream_chunked_callback_data *chunked_callback_data = @@ -546,7 +524,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS jobject j_http_stream_base) { (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; if (stream == NULL) { @@ -575,7 +553,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; if (stream == NULL) { @@ -587,7 +565,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS size_t not_activated = 0; if (aws_atomic_compare_exchange_int(&cb_data->activated, ¬_activated, 1)) { - http_stream_callback_destroy(env, cb_data); + aws_http_stream_binding_destroy(env, cb_data); } } @@ -598,7 +576,7 @@ JNIEXPORT jint JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; if (stream == NULL) { @@ -625,7 +603,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; if (stream == NULL) { @@ -651,7 +629,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2Stream_http2Str (void)jni_class; - struct http_stream_callback_data *cb_data = (struct http_stream_callback_data *)jni_cb_data; + struct http_stream_binding *cb_data = (struct http_stream_binding *)jni_cb_data; struct aws_http_stream *stream = cb_data->native_stream; if (stream == NULL) { diff --git a/src/native/http_request_response.h b/src/native/http_request_response.h new file mode 100644 index 000000000..057200442 --- /dev/null +++ b/src/native/http_request_response.h @@ -0,0 +1,61 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#ifndef AWS_JNI_CRT_HTTP_REQUEST_RESPONSE_H +#define AWS_JNI_CRT_HTTP_REQUEST_RESPONSE_H + +#include +#include + +struct aws_http_message; +struct aws_http_stream; +struct aws_byte_buf; +struct aws_atomic_var; + +struct http_stream_binding { + JavaVM *jvm; + + // TEMP: Until Java API changes to match "H1B" native HTTP API, + // create aws_http_message and aws_input_stream under the hood. + struct aws_http_message *native_request; + + jobject java_http_response_stream_handler; + jobject java_http_stream_base; + struct aws_http_stream *native_stream; + struct aws_byte_buf headers_buf; + int response_status; + + /* + * Inactivated streams must have their callback data destroyed at release time + */ + struct aws_atomic_var activated; +}; + +jobject aws_java_http_stream_from_native_new(JNIEnv *env, void *opaque, int version); +void aws_java_http_stream_from_native_delete(JNIEnv *env, jobject jHttpStream); + +void aws_http_stream_binding_destroy(JNIEnv *env, struct http_stream_binding *callback); + +// If error occurs, A Java exception is thrown and NULL is returned. +struct http_stream_binding *aws_http_stream_binding_alloc(JNIEnv *env, jobject java_callback_handler); + +/* Default callbacks using binding */ +int aws_java_http_stream_on_incoming_headers_fn( + struct aws_http_stream *stream, + enum aws_http_header_block block_type, + const struct aws_http_header *header_array, + size_t num_headers, + void *user_data); +int aws_java_http_stream_on_incoming_header_block_done_fn( + struct aws_http_stream *stream, + enum aws_http_header_block block_type, + void *user_data); +int aws_java_http_stream_on_incoming_body_fn( + struct aws_http_stream *stream, + const struct aws_byte_cursor *data, + void *user_data); +void aws_java_http_stream_on_stream_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data); + +#endif /* AWS_JNI_CRT_HTTP_REQUEST_RESPONSE_H */ diff --git a/src/native/http_request_utils.c b/src/native/http_request_utils.c index 075c031e2..bdeb91dfa 100644 --- a/src/native/http_request_utils.c +++ b/src/native/http_request_utils.c @@ -386,6 +386,8 @@ int aws_apply_java_http_request_changes_to_native_request( aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream); aws_http_message_set_body_stream(message, body_stream); + /* request controls the lifetime of body stream fully */ + aws_input_stream_release(body_stream); } return result; @@ -428,6 +430,8 @@ struct aws_http_message *aws_http_request_new_from_java_http_request( } aws_http_message_set_body_stream(request, body_stream); + /* request controls the lifetime of body stream fully */ + aws_input_stream_release(body_stream); } return request; diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 559dac36a..730c2c5c6 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -366,6 +366,16 @@ static void s_cache_http_client_connection_manager(JNIEnv *env) { AWS_FATAL_ASSERT(http_client_connection_manager_properties.onShutdownComplete); } +struct java_http2_stream_manager_properties http2_stream_manager_properties; + +static void s_cache_http2_stream_manager(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/Http2StreamManager"); + AWS_FATAL_ASSERT(cls); + + http2_stream_manager_properties.onShutdownComplete = (*env)->GetMethodID(env, cls, "onShutdownComplete", "()V"); + AWS_FATAL_ASSERT(http2_stream_manager_properties.onShutdownComplete); +} + struct java_http_client_connection_properties http_client_connection_properties; static void s_cache_http_client_connection(JNIEnv *env) { @@ -832,6 +842,7 @@ void cache_java_class_ids(JNIEnv *env) { s_cache_client_bootstrap(env); s_cache_tls_context_pkcs11_options(env); s_cache_http_client_connection_manager(env); + s_cache_http2_stream_manager(env); s_cache_http_client_connection(env); s_cache_http_stream(env); s_cache_http2_stream(env); diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 5967123f2..ccd03f47f 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -164,6 +164,12 @@ struct java_http_client_connection_manager_properties { }; extern struct java_http_client_connection_manager_properties http_client_connection_manager_properties; +/* Http2StreamManager */ +struct java_http2_stream_manager_properties { + jmethodID onShutdownComplete; +}; +extern struct java_http2_stream_manager_properties http2_stream_manager_properties; + /* HttpClientConnection */ struct java_http_client_connection_properties { jclass http_client_connection_class; diff --git a/src/native/s3_client.c b/src/native/s3_client.c index 323106980..9c265240e 100644 --- a/src/native/s3_client.c +++ b/src/native/s3_client.c @@ -428,7 +428,6 @@ static void s_s3_meta_request_callback_cleanup( JNIEnv *env, struct s3_client_make_meta_request_callback_data *callback_data) { if (callback_data) { - aws_input_stream_destroy(callback_data->input_stream); (*env)->DeleteGlobalRef(env, callback_data->java_s3_meta_request); (*env)->DeleteGlobalRef(env, callback_data->java_s3_meta_request_response_handler_native_adapter); aws_mem_release(aws_jni_get_allocator(), callback_data); @@ -484,7 +483,6 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_s3_S3Client_s3ClientMake AWS_FATAL_ASSERT( AWS_OP_SUCCESS == aws_apply_java_http_request_changes_to_native_request( env, jni_marshalled_message_data, jni_http_request_body_stream, request_message)); - callback_data->input_stream = aws_http_message_get_body_stream(request_message); struct aws_uri endpoint; AWS_ZERO_STRUCT(endpoint); diff --git a/src/test/java/software/amazon/awssdk/crt/test/CredentialsProviderTest.java b/src/test/java/software/amazon/awssdk/crt/test/CredentialsProviderTest.java index ffa54979f..15706991c 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/CredentialsProviderTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/CredentialsProviderTest.java @@ -182,7 +182,7 @@ public void testDelegateException() { DelegateCredentialsHandler credentialsHandler = new DelegateCredentialsHandler() { @Override public Credentials getCredentials() { - throw new RuntimeException("hate coding"); + throw new RuntimeException("Some exception. =)"); } }; boolean failed = false; diff --git a/src/test/java/software/amazon/awssdk/crt/test/CrtTestFixture.java b/src/test/java/software/amazon/awssdk/crt/test/CrtTestFixture.java index 3dc46572b..125a682a0 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/CrtTestFixture.java +++ b/src/test/java/software/amazon/awssdk/crt/test/CrtTestFixture.java @@ -109,4 +109,8 @@ protected boolean hasAwsCredentials() { protected void skipIfNetworkUnavailable() { Assume.assumeTrue(System.getProperty("NETWORK_TESTS_DISABLED") == null); } + + protected void skipIfLocalhostUnavailable() { + Assume.assumeTrue(System.getProperty("aws.crt.localhost") != null); + } } diff --git a/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java b/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java new file mode 100644 index 000000000..f94bf46cc --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java @@ -0,0 +1,346 @@ + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.http.Http2StreamManager; +import software.amazon.awssdk.crt.http.Http2Request; +import software.amazon.awssdk.crt.http.Http2Stream; +import software.amazon.awssdk.crt.http.Http2StreamManagerOptions; +import software.amazon.awssdk.crt.http.HttpClientConnection; +import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; +import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.utils.ByteBufferUtils; +import software.amazon.awssdk.crt.Log; + +public class Http2ClientLocalHostTest extends HttpClientTestFixture { + + private Http2StreamManager createStreamManager(URI uri, int numConnections) { + + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1); + HostResolver resolver = new HostResolver(eventLoopGroup); + ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver); + SocketOptions sockOpts = new SocketOptions(); + TlsContextOptions tlsOpts = TlsContextOptions.createDefaultClient().withAlpnList("h2") + .withVerifyPeer(false); + TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) { + Http2StreamManagerOptions options = new Http2StreamManagerOptions(); + HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions(); + connectionManagerOptions.withClientBootstrap(bootstrap) + .withSocketOptions(sockOpts) + .withTlsContext(tlsContext) + .withUri(uri) + .withMaxConnections(numConnections); + options.withConnectionManagerOptions(connectionManagerOptions); + + return Http2StreamManager.create(options); + } + } + + private HttpRequestBodyStream createBodyStreamWithLength(long bodyLength) { + final long payloadSize = bodyLength; + final String payloadString = "This is CRT HTTP test."; + + HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { + + private long remainingBody = payloadSize; + + @Override + public boolean sendRequestBody(ByteBuffer outBuffer) { + + byte[] payloadBytes = null; + + try { + payloadBytes = payloadString.getBytes("ASCII"); + } catch (Exception ex) { + System.out.println("Encountered error trying to get payload bytes."); + return true; + } + + while (remainingBody > 0 && outBuffer.remaining() > 0) { + long amtToTransfer = Math.min(remainingBody, (long) outBuffer.remaining()); + amtToTransfer = Math.min(amtToTransfer, (long) payloadBytes.length); + + // Transfer the data + outBuffer.put(payloadBytes, 0, (int) amtToTransfer); + + remainingBody -= amtToTransfer; + } + + return remainingBody == 0; + } + + @Override + public boolean resetPosition() { + return true; + } + + @Override + public long getLength() { + return payloadSize; + } + }; + return payloadStream; + } + + private Http2Request createHttp2Request(String method, URI uri, long bodyLength) { + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader(":method", method), + new HttpHeader(":path", uri.getPath()), + new HttpHeader(":scheme", uri.getScheme()), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Long.toString(bodyLength)) + }; + HttpRequestBodyStream bodyStream = null; + if (bodyLength > 0) { + bodyStream = createBodyStreamWithLength(bodyLength); + } + Http2Request request = new Http2Request(requestHeaders, bodyStream); + + return request; + } + + @Test + public void testParallelRequestsStress() throws Exception { + skipIfLocalhostUnavailable(); + URI uri = new URI("https://localhost:8443/echo"); + try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { + int numberToAcquire = 500 * 100; + + Http2Request request = createHttp2Request("GET", uri, 0); + List> requestCompleteFutures = new ArrayList<>(); + List> acquireCompleteFutures = new ArrayList<>(); + final AtomicInteger numStreamsFailures = new AtomicInteger(0); + for (int i = 0; i < numberToAcquire; i++) { + final CompletableFuture requestCompleteFuture = new CompletableFuture(); + requestCompleteFutures.add(requestCompleteFuture); + acquireCompleteFutures.add(streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + if (responseStatusCode != 200) { + numStreamsFailures.incrementAndGet(); + } + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + if (errorCode != CRT.AWS_CRT_SUCCESS) { + numStreamsFailures.incrementAndGet(); + } + stream.close(); + requestCompleteFuture.complete(null); + } + })); + } + for (CompletableFuture f : acquireCompleteFutures) { + f.get(30, TimeUnit.SECONDS); + } + // Wait for all Requests to complete + for (CompletableFuture f : requestCompleteFutures) { + f.get(30, TimeUnit.SECONDS); + } + Assert.assertTrue(numStreamsFailures.get() == 0); + } + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testParallelRequestsStressWithBody() throws Exception { + skipIfLocalhostUnavailable(); + URI uri = new URI("https://localhost:8443/uploadTest"); + try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { + int numberToAcquire = 500 * 100; + if (CRT.getOSIdentifier() == "linux") { + /* + * Using Python hyper h2 server frame work, met a weird upload performance issue + * on Linux. Our client against nginx platform has not met the same issue. + * We assume it's because the server framework implementation. + * Use lower number of linux + */ + numberToAcquire = 500; + } + int bodyLength = 2000; + + List> requestCompleteFutures = new ArrayList<>(); + List> acquireCompleteFutures = new ArrayList<>(); + final AtomicInteger numStreamsFailures = new AtomicInteger(0); + for (int i = 0; i < numberToAcquire; i++) { + Http2Request request = createHttp2Request("PUT", uri, bodyLength); + + final CompletableFuture requestCompleteFuture = new CompletableFuture(); + final int expectedLength = bodyLength; + requestCompleteFutures.add(requestCompleteFuture); + acquireCompleteFutures.add(streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + if (responseStatusCode != 200) { + numStreamsFailures.incrementAndGet(); + } + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){ + String bodyString = new String(bodyBytesIn); + int receivedLength = Integer.parseInt(bodyString); + + Assert.assertTrue(receivedLength == expectedLength); + if(receivedLength!=expectedLength) { + numStreamsFailures.incrementAndGet(); + } + return bodyString.length(); + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + if (errorCode != CRT.AWS_CRT_SUCCESS) { + numStreamsFailures.incrementAndGet(); + } + stream.close(); + requestCompleteFuture.complete(null); + } + })); + } + for (CompletableFuture f : acquireCompleteFutures) { + f.get(30, TimeUnit.SECONDS); + } + // Wait for all Requests to complete + for (CompletableFuture f : requestCompleteFutures) { + f.get(30, TimeUnit.SECONDS); + } + Assert.assertTrue(numStreamsFailures.get() == 0); + } + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testRequestsUploadStress() throws Exception { + /* Test that upload a 2.5GB data from local server (0.25GB for linux) */ + skipIfLocalhostUnavailable(); +// Log.initLoggingToStderr(Log.LogLevel.Debug); + URI uri = new URI("https://localhost:8443/uploadTest"); + try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { + long bodyLength = 2500000000L; + if (CRT.getOSIdentifier() == "linux") { + /* + * Using Python hyper h2 server frame work, met a weird upload performance issue + * on Linux. Our client against nginx platform has not met the same issue. + * We assume it's because the server framework implementation. + * Use lower number of linux + */ + bodyLength = 250000000L; + } + + Http2Request request = createHttp2Request("PUT", uri, bodyLength); + + final CompletableFuture requestCompleteFuture = new CompletableFuture(); + final long expectedLength = bodyLength; + CompletableFuture acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + + Assert.assertTrue(responseStatusCode == 200); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){ + String bodyString = new String(bodyBytesIn); + long receivedLength = Long.parseLong(bodyString); + Assert.assertTrue(receivedLength == expectedLength); + return bodyString.length(); + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS); + stream.close(); + requestCompleteFuture.complete(null); + } + }); + + acquireCompleteFuture.get(30, TimeUnit.SECONDS); + requestCompleteFuture.join(); + + } + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testRequestsDownloadStress() throws Exception { + /* Test that download a 2.5GB data from local server */ + skipIfLocalhostUnavailable(); + URI uri = new URI("https://localhost:8443/downloadTest"); + try (Http2StreamManager streamManager = createStreamManager(uri, 100)) { + long bodyLength = 2500000000L; + + Http2Request request = createHttp2Request("GET", uri, 0); + + final CompletableFuture requestCompleteFuture = new CompletableFuture(); + final AtomicLong receivedLength = new AtomicLong(0); + CompletableFuture acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + + Assert.assertTrue(responseStatusCode == 200); + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){ + receivedLength.addAndGet(bodyBytesIn.length); + + return bodyBytesIn.length; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + + Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS); + stream.close(); + requestCompleteFuture.complete(null); + } + }); + + acquireCompleteFuture.get(30, TimeUnit.SECONDS); + requestCompleteFuture.join(); + + Assert.assertTrue(receivedLength.get() == bodyLength); + } + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } +} diff --git a/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java b/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java index 5e3f81a8f..c729e3c4a 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Http2RequestResponseTest.java @@ -183,6 +183,11 @@ public void testHttp2ResetStream() throws Exception { @Override public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) { + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + /* Only invoke once */ Http2Stream h2Stream = (Http2Stream) stream; h2Stream.resetStream(Http2ErrorCode.INTERNAL_ERROR); } diff --git a/src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java b/src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java new file mode 100644 index 000000000..b354c46e1 --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java @@ -0,0 +1,213 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.amazon.awssdk.crt.test; + +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.http.Http2StreamManager; +import software.amazon.awssdk.crt.http.Http2Request; +import software.amazon.awssdk.crt.http.Http2Stream; +import software.amazon.awssdk.crt.http.Http2StreamManagerOptions; +import software.amazon.awssdk.crt.http.HttpClientConnection; +import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.Log; + +public class Http2StreamManagerTest extends HttpClientTestFixture { + private final static Charset UTF8 = StandardCharsets.UTF_8; + private final static int NUM_THREADS = 10; + private final static int NUM_CONNECTIONS = 20; + private final static int NUM_REQUESTS = 60; + private final static int NUM_ITERATIONS = 10; + private final static int GROWTH_PER_THREAD = 0; // expected VM footprint growth per thread + private final static int EXPECTED_HTTP_STATUS = 200; + private final static String endpoint = "https://d1cz66xoahf9cl.cloudfront.net/"; // Use cloudfront for HTTP/2 + private final static String path = "/random_32_byte.data"; + private final String EMPTY_BODY = ""; + + private Http2StreamManager createStreamManager(URI uri, int numConnections) { + + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1); + HostResolver resolver = new HostResolver(eventLoopGroup); + ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver); + SocketOptions sockOpts = new SocketOptions(); + TlsContextOptions tlsOpts = TlsContextOptions.createDefaultClient().withAlpnList("h2"); + TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) { + Http2StreamManagerOptions options = new Http2StreamManagerOptions(); + HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions(); + connectionManagerOptions.withClientBootstrap(bootstrap) + .withSocketOptions(sockOpts) + .withTlsContext(tlsContext) + .withUri(uri) + .withMaxConnections(numConnections); + options.withConnectionManagerOptions(connectionManagerOptions); + + return Http2StreamManager.create(options); + } + } + + private Http2Request createHttp2Request(String method, String endpoint, String path, String requestBody) + throws Exception { + URI uri = new URI(endpoint); + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader(":method", method), + new HttpHeader(":path", path), + new HttpHeader(":scheme", uri.getScheme()), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + Http2Request request = new Http2Request(requestHeaders, null); + + return request; + } + + private void testParallelStreams(Http2StreamManager streamManager, Http2Request request, int numThreads, + int numRequests) { + final AtomicInteger numRequestsMade = new AtomicInteger(0); + final AtomicInteger numStreamsFailures = new AtomicInteger(0); + final ConcurrentHashMap reqIdToStatus = new ConcurrentHashMap<>(); + final AtomicInteger numErrorCode = new AtomicInteger(0); + + final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + List> requestCompleteFutures = new ArrayList<>(); + + for (int i = 0; i < numRequests; i++) { + + Log.log(Log.LogLevel.Trace, Log.LogSubject.HttpConnectionManager, String.format("Starting request %d", i)); + CompletableFuture requestCompleteFuture = new CompletableFuture(); + requestCompleteFutures.add(requestCompleteFuture); + + threadPool.execute(() -> { + // Request a connection from the connection pool + int requestId = numRequestsMade.incrementAndGet(); + streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + reqIdToStatus.put(requestId, responseStatusCode); + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + if (errorCode != CRT.AWS_CRT_SUCCESS + || stream.getResponseStatusCode() != EXPECTED_HTTP_STATUS) { + Log.log(Log.LogLevel.Error, Log.LogSubject.HttpConnectionManager, + String.format("Response completed with error: error_code=%s, response status=%d", + CRT.awsErrorName(errorCode), stream.getResponseStatusCode())); + numErrorCode.incrementAndGet(); + } + stream.close(); + requestCompleteFuture.complete(null); + } + }).whenComplete((stream, throwable) -> { + if (throwable != null) { + numStreamsFailures.incrementAndGet(); + requestCompleteFuture.completeExceptionally(throwable); + } + }); + }); + } + + // Wait for all Requests to complete + for (CompletableFuture f : requestCompleteFutures) { + f.join(); + } + + final int requiredSuccesses = (int) Math.floor(numRequests * 0.95); + final int allowedFailures = numRequests - requiredSuccesses; + + // Verify we got some Http Status Code for each Request + Assert.assertTrue(reqIdToStatus.size() >= requiredSuccesses); + // Verify that the failure counts aren't too high + Assert.assertTrue(numErrorCode.get() <= allowedFailures); + Assert.assertTrue(numStreamsFailures.get() <= allowedFailures); + } + + public void testParallelRequests(int numThreads, int numRequests) throws Exception { + skipIfNetworkUnavailable(); + + URI uri = new URI(endpoint); + + try (Http2StreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS)) { + Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY); + testParallelStreams(streamManager, request, 1, numRequests); + } + + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + public void testParallelRequestsWithLeakCheck(int numThreads, int numRequests) throws Exception { + skipIfNetworkUnavailable(); + Callable fn = () -> { + testParallelRequests(numThreads, numRequests); + Thread.sleep(2000); // wait for async shutdowns to complete + return null; + }; + + // Dalvik is SUPER STOCHASTIC about when it frees JVM memory, it has no + // observable correlation + // to when System.gc() is called. Therefore, we cannot reliably sample it, so we + // don't bother. + // If we have a leak, we should have it on all platforms, and we'll catch it + // elsewhere. + if (CRT.getOSIdentifier() != "android") { + int fixedGrowth = CrtMemoryLeakDetector.expectedFixedGrowth(); + fixedGrowth += (numThreads * GROWTH_PER_THREAD); + // On Mac, JVM seems to expand by about 4K no matter how careful we are. With + // the workload + // we're running, 8K worth of growth (an additional 4K for an increased healthy + // margin) + // in the JVM only is acceptable. + fixedGrowth = Math.max(fixedGrowth, 8192); + CrtMemoryLeakDetector.leakCheck(NUM_ITERATIONS, fixedGrowth, fn); + } + } + + @Test + public void testSanitizer() throws Exception { + URI uri = new URI(endpoint); + try (Http2StreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS)) { + } + + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSerialRequests() throws Exception { + testParallelRequestsWithLeakCheck(1, NUM_REQUESTS / NUM_THREADS); + } + + @Test + public void testMaxParallelRequests() throws Exception { + testParallelRequestsWithLeakCheck(NUM_THREADS, NUM_REQUESTS); + } +} diff --git a/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java b/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java new file mode 100644 index 000000000..46afaae65 --- /dev/null +++ b/src/test/java/software/amazon/awssdk/crt/test/HttpStreamManagerTest.java @@ -0,0 +1,295 @@ +package software.amazon.awssdk.crt.test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; +import java.util.Arrays; + +import javax.management.RuntimeErrorException; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.http.HttpStreamManager; +import software.amazon.awssdk.crt.http.Http2StreamManager; +import software.amazon.awssdk.crt.http.Http2Request; +import software.amazon.awssdk.crt.http.Http2Stream; +import software.amazon.awssdk.crt.http.Http2StreamManagerOptions; +import software.amazon.awssdk.crt.http.HttpClientConnection; +import software.amazon.awssdk.crt.http.HttpClientConnectionManager; +import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crt.http.HttpRequestBase; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; +import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.crt.http.HttpVersion; +import software.amazon.awssdk.crt.http.HttpStream; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.io.EventLoopGroup; +import software.amazon.awssdk.crt.io.HostResolver; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContext; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.utils.ByteBufferUtils; +import software.amazon.awssdk.crt.Log; +import software.amazon.awssdk.crt.Log.LogLevel; + +public class HttpStreamManagerTest extends HttpRequestResponseFixture { + private final static String endpoint = "https://httpbin.org"; + private final static String path = "/anything"; + private final String EMPTY_BODY = ""; + private final static int NUM_CONNECTIONS = 20; + private final static Charset UTF8 = StandardCharsets.UTF_8; + private final static int EXPECTED_HTTP_STATUS = 200; + + private HttpStreamManager createStreamManager(URI uri, int numConnections, HttpVersion expectedVersion) { + + try (EventLoopGroup eventLoopGroup = new EventLoopGroup(1); + HostResolver resolver = new HostResolver(eventLoopGroup); + ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver); + SocketOptions sockOpts = new SocketOptions(); + TlsContextOptions tlsOpts = expectedVersion == HttpVersion.HTTP_2 + ? TlsContextOptions.createDefaultClient().withAlpnList("h2") + : TlsContextOptions.createDefaultClient().withAlpnList("http/1.1"); + TlsContext tlsContext = createHttpClientTlsContext(tlsOpts)) { + Http2StreamManagerOptions options = new Http2StreamManagerOptions(); + HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions(); + connectionManagerOptions.withClientBootstrap(bootstrap) + .withSocketOptions(sockOpts) + .withTlsContext(tlsContext) + .withUri(uri) + .withMaxConnections(numConnections); + options.withConnectionManagerOptions(connectionManagerOptions); + + return HttpStreamManager.create(options); + } + } + + private Http2Request createHttp2Request(String method, String endpoint, String path, String requestBody) + throws Exception { + URI uri = new URI(endpoint); + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader(":method", method), + new HttpHeader(":path", path), + new HttpHeader(":scheme", uri.getScheme()), + new HttpHeader(":authority", uri.getHost()), + new HttpHeader("content-length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + final ByteBuffer payload = ByteBuffer.wrap(requestBody.getBytes()); + HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { + @Override + public boolean sendRequestBody(ByteBuffer outBuffer) { + ByteBufferUtils.transferData(payload, outBuffer); + return payload.remaining() == 0; + } + + @Override + public boolean resetPosition() { + return true; + } + + @Override + public long getLength() { + return payload.capacity(); + } + }; + Http2Request request = new Http2Request(requestHeaders, payloadStream); + + return request; + } + + private HttpRequest createHttp1Request(String method, String endpoint, String path, String requestBody) + throws Exception { + URI uri = new URI(endpoint); + HttpHeader[] requestHeaders = new HttpHeader[] { + new HttpHeader("host", uri.getHost()), + new HttpHeader("content-length", Integer.toString(requestBody.getBytes(UTF8).length)) + }; + final ByteBuffer payload = ByteBuffer.wrap(requestBody.getBytes()); + HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() { + @Override + public boolean sendRequestBody(ByteBuffer outBuffer) { + ByteBufferUtils.transferData(payload, outBuffer); + return payload.remaining() == 0; + } + + @Override + public boolean resetPosition() { + return true; + } + + @Override + public long getLength() { + return payload.capacity(); + } + }; + return new HttpRequest(method, path, requestHeaders, payloadStream); + } + + private TestHttpResponse getResponseFromManager(HttpStreamManager streamManager, HttpRequestBase request) + throws Exception { + + final CompletableFuture reqCompleted = new CompletableFuture<>(); + + final TestHttpResponse response = new TestHttpResponse(); + + try { + HttpStreamBaseResponseHandler streamHandler = new HttpStreamBaseResponseHandler() { + @Override + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType, + HttpHeader[] nextHeaders) { + response.statusCode = responseStatusCode; + Assert.assertEquals(responseStatusCode, stream.getResponseStatusCode()); + response.headers.addAll(Arrays.asList(nextHeaders)); + } + + @Override + public void onResponseHeadersDone(HttpStreamBase stream, int blockType) { + response.blockType = blockType; + } + + @Override + public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { + try { + response.bodyBuffer.put(bodyBytesIn); + } catch (Exception e) { + Assert.assertNull(e); + } + int amountRead = bodyBytesIn.length; + + // Slide the window open by the number of bytes just read + return amountRead; + } + + @Override + public void onResponseComplete(HttpStreamBase stream, int errorCode) { + response.onCompleteErrorCode = errorCode; + reqCompleted.complete(null); + stream.close(); + } + }; + streamManager.acquireStream(request, streamHandler).get(60, TimeUnit.SECONDS); + // Give the request up to 60 seconds to complete, otherwise throw a + // TimeoutException + reqCompleted.get(60, TimeUnit.SECONDS); + } catch (Exception e) { + throw e; + } + + return response; + + } + + @Test + public void testSanitizerHTTP1() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + Assert.assertEquals(streamManager.getHttpVersion(), HttpVersion.HTTP_1_1); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSanitizerHTTP2() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_2)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + Assert.assertEquals(streamManager.getHttpVersion(), HttpVersion.HTTP_2); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSingleHTTP2Requests() throws Exception { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_2)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + @Test + public void testSingleHTTP1Request() throws Throwable { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + shutdownComplete = streamManager.getShutdownCompleteFuture(); + HttpRequest request = createHttp1Request("GET", endpoint, path, EMPTY_BODY); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } + + /* + * Create HTTP/1.1 stream manager, with HTTP/2 request, which should fail with + * invalid header name. Make sure the exception pops out and everything clean up + * correctly. + */ + @Test + public void testSingleHTTP1RequestsFailure() throws Throwable { + URI uri = new URI(endpoint); + CompletableFuture shutdownComplete = null; + try (HttpStreamManager streamManager = createStreamManager(uri, NUM_CONNECTIONS, HttpVersion.HTTP_1_1)) { + /* + * http2 request which will have :method headers that is not allowed for + * HTTP/1.1 + */ + Http2Request request = createHttp2Request("GET", endpoint, path, EMPTY_BODY); + shutdownComplete = streamManager.getShutdownCompleteFuture(); + TestHttpResponse response = this.getResponseFromManager(streamManager, request); + Assert.assertEquals(response.statusCode, EXPECTED_HTTP_STATUS); + } catch (ExecutionException e) { + try { + throw e.getCause(); + } catch (CrtRuntimeException causeException) { + /** + * Assert the exceptions are set correctly. + */ + Assert.assertTrue(causeException.errorName.equals("AWS_ERROR_HTTP_INVALID_HEADER_NAME")); + } + } + + shutdownComplete.get(60, TimeUnit.SECONDS); + CrtResource.logNativeResources(); + CrtResource.waitForNoResources(); + } +}