Skip to content

Commit dd0f98a

Browse files
authored
Stream manager (#436)
1 parent 004b4ed commit dd0f98a

18 files changed

+1615
-92
lines changed

.builder/actions/localhost_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import Builder
2+
import sys
3+
import os
4+
5+
6+
class LocalhostTest(Builder.Action):
7+
8+
def run(self, env):
9+
env.shell.setenv('AWS_CRT_MEMORY_TRACING', '2')
10+
actions = []
11+
if os.system("mvn -Dtest=Http2ClientLocalHostTest test -DredirectTestOutputToFile=true -DforkCount=0 \
12+
-DrerunFailingTestsCount=5 \
13+
-Daws.crt.memory.tracing=2 \
14+
-Daws.crt.debugnative=true \
15+
-Daws.crt.log.level=Error \
16+
-Daws.crt.localhost=true"):
17+
# Failed
18+
actions.append("exit 1")
19+
20+
return Builder.Script(actions, name='aws-crt-java-test')

.github/workflows/ci.yml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,59 @@ jobs:
182182
# note: using "@main" because "@${{env.BUILDER_VERSION}}" doesn't work
183183
# https://github.com/actions/runner/issues/480
184184
uses: awslabs/aws-crt-builder/.github/actions/check-submodules@main
185+
186+
187+
localhost-test-linux:
188+
runs-on: ubuntu-20.04 # latest
189+
steps:
190+
- name: Checkout
191+
uses: actions/checkout@v3
192+
with:
193+
submodules: true
194+
- name: Configure local host
195+
run: |
196+
python3 -m pip install h2
197+
cd crt/aws-c-http/tests/py_localhost/
198+
python3 server.py &
199+
python3 non_tls_server.py &
200+
- name: Build and test
201+
run: |
202+
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
203+
python builder.pyz localhost-test -p ${{ env.PACKAGE_NAME }} --spec=downstream
204+
205+
localhost-test-mac:
206+
runs-on: macos-11 # latest
207+
steps:
208+
- name: Checkout
209+
uses: actions/checkout@v3
210+
with:
211+
submodules: true
212+
- name: Configure local host
213+
run: |
214+
python3 -m pip install h2
215+
cd crt/aws-c-http/tests/py_localhost/
216+
python3 server.py &
217+
python3 non_tls_server.py &
218+
- name: Build and test
219+
run: |
220+
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
221+
chmod a+x builder
222+
./builder localhost-test -p ${{ env.PACKAGE_NAME }} --spec=downstream
223+
224+
localhost-test-win:
225+
runs-on: windows-2022 # latest
226+
steps:
227+
- name: Checkout
228+
uses: actions/checkout@v3
229+
with:
230+
submodules: true
231+
- name: Configure local host
232+
run: |
233+
python -m pip install h2
234+
- name: Build and test
235+
run: |
236+
cd crt/aws-c-http/tests/py_localhost/
237+
Start-Process -NoNewWindow python .\server.py
238+
Start-Process -NoNewWindow python .\non_tls_server.py
239+
python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
240+
python builder.pyz localhost-test -p ${{ env.PACKAGE_NAME }} downstream
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package software.amazon.awssdk.crt.http;
2+
3+
import software.amazon.awssdk.crt.CrtResource;
4+
import software.amazon.awssdk.crt.CrtRuntimeException;
5+
import software.amazon.awssdk.crt.io.ClientBootstrap;
6+
import software.amazon.awssdk.crt.io.SocketOptions;
7+
import software.amazon.awssdk.crt.AsyncCallback;
8+
import software.amazon.awssdk.crt.io.TlsContext;
9+
10+
import java.util.concurrent.CompletableFuture;
11+
import java.net.URI;
12+
import java.nio.charset.Charset;
13+
14+
/**
15+
* Manages a Pool of HTTP/2 Streams. Creates and manages HTTP/2 connections
16+
* under the hood.
17+
*/
18+
public class Http2StreamManager extends CrtResource {
19+
20+
private static final String HTTP = "http";
21+
private static final String HTTPS = "https";
22+
private static final int DEFAULT_HTTP_PORT = 80;
23+
private static final int DEFAULT_HTTPS_PORT = 443;
24+
private final static Charset UTF8 = java.nio.charset.StandardCharsets.UTF_8;
25+
26+
private final URI uri;
27+
private final int port;
28+
private final int maxConnections;
29+
private final int idealConcurrentStreamsPerConnection;
30+
private final int maxConcurrentStreamsPerConnection;
31+
private final CompletableFuture<Void> shutdownComplete = new CompletableFuture<>();
32+
33+
/**
34+
* Factory function for Http2StreamManager instances
35+
*
36+
* @param options configuration options
37+
* @return a new instance of an Http2StreamManager
38+
*/
39+
public static Http2StreamManager create(Http2StreamManagerOptions options) {
40+
return new Http2StreamManager(options);
41+
}
42+
43+
private Http2StreamManager(Http2StreamManagerOptions options) {
44+
options.validateOptions();
45+
46+
HttpClientConnectionManagerOptions connectionManagerOptions = options.getConnectionManagerOptions();
47+
URI uri = connectionManagerOptions.getUri();
48+
ClientBootstrap clientBootstrap = connectionManagerOptions.getClientBootstrap();
49+
SocketOptions socketOptions = connectionManagerOptions.getSocketOptions();
50+
boolean useTls = HTTPS.equals(uri.getScheme());
51+
TlsContext tlsContext = connectionManagerOptions.getTlsContext();
52+
int maxConnections = connectionManagerOptions.getMaxConnections();
53+
int port = connectionManagerOptions.getPort();
54+
if (port == -1) {
55+
port = uri.getPort();
56+
/* Pick a default port based on the scheme if one wasn't set */
57+
if (port == -1) {
58+
if (HTTP.equals(uri.getScheme())) { port = DEFAULT_HTTP_PORT; }
59+
if (HTTPS.equals(uri.getScheme())) { port = DEFAULT_HTTPS_PORT; }
60+
}
61+
}
62+
63+
int maxConcurrentStreamsPerConnection = options.getMaxConcurrentStreamsPerConnection();
64+
int idealConcurrentStreamsPerConnection = options.getIdealConcurrentStreamsPerConnection();
65+
66+
this.uri = uri;
67+
this.port = port;
68+
this.maxConnections = maxConnections;
69+
this.idealConcurrentStreamsPerConnection = idealConcurrentStreamsPerConnection;
70+
this.maxConcurrentStreamsPerConnection = maxConcurrentStreamsPerConnection;
71+
72+
int proxyConnectionType = 0;
73+
String proxyHost = null;
74+
int proxyPort = 0;
75+
TlsContext proxyTlsContext = null;
76+
int proxyAuthorizationType = 0;
77+
String proxyAuthorizationUsername = null;
78+
String proxyAuthorizationPassword = null;
79+
HttpProxyOptions proxyOptions = connectionManagerOptions.getProxyOptions();
80+
81+
if (proxyOptions != null) {
82+
proxyConnectionType = proxyOptions.getConnectionType().getValue();
83+
proxyHost = proxyOptions.getHost();
84+
proxyPort = proxyOptions.getPort();
85+
proxyTlsContext = proxyOptions.getTlsContext();
86+
proxyAuthorizationType = proxyOptions.getAuthorizationType().getValue();
87+
proxyAuthorizationUsername = proxyOptions.getAuthorizationUsername();
88+
proxyAuthorizationPassword = proxyOptions.getAuthorizationPassword();
89+
}
90+
91+
HttpMonitoringOptions monitoringOptions = connectionManagerOptions.getMonitoringOptions();
92+
long monitoringThroughputThresholdInBytesPerSecond = 0;
93+
int monitoringFailureIntervalInSeconds = 0;
94+
if (monitoringOptions != null) {
95+
monitoringThroughputThresholdInBytesPerSecond = monitoringOptions.getMinThroughputBytesPerSecond();
96+
monitoringFailureIntervalInSeconds = monitoringOptions.getAllowableThroughputFailureIntervalSeconds();
97+
}
98+
99+
acquireNativeHandle(http2StreamManagerNew(this,
100+
clientBootstrap.getNativeHandle(),
101+
socketOptions.getNativeHandle(),
102+
useTls ? tlsContext.getNativeHandle() : 0,
103+
Http2ConnectionSetting.marshallSettingsForJNI(options.getInitialSettingsList()),
104+
uri.getHost().getBytes(UTF8),
105+
port,
106+
proxyConnectionType,
107+
proxyHost != null ? proxyHost.getBytes(UTF8) : null,
108+
proxyPort,
109+
proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0,
110+
proxyAuthorizationType,
111+
proxyAuthorizationUsername != null ? proxyAuthorizationUsername.getBytes(UTF8) : null,
112+
proxyAuthorizationPassword != null ? proxyAuthorizationPassword.getBytes(UTF8) : null,
113+
connectionManagerOptions.isManualWindowManagement(),
114+
monitoringThroughputThresholdInBytesPerSecond,
115+
monitoringFailureIntervalInSeconds,
116+
maxConnections,
117+
idealConcurrentStreamsPerConnection,
118+
maxConcurrentStreamsPerConnection));
119+
120+
/*
121+
* we don't need to add a reference to socketOptions since it's copied during
122+
* connection manager construction
123+
*/
124+
addReferenceTo(clientBootstrap);
125+
if (useTls) {
126+
addReferenceTo(tlsContext);
127+
}
128+
}
129+
130+
/**
131+
* Request a Http2Stream from StreamManager.
132+
*
133+
* @param request The Request to make to the Server.
134+
* @param streamHandler The Stream Handler to be called from the Native
135+
* EventLoop
136+
* @return A future for a Http2Stream that will be completed when the stream is
137+
* acquired.
138+
*/
139+
public CompletableFuture<Http2Stream> acquireStream(Http2Request request,
140+
HttpStreamBaseResponseHandler streamHandler) {
141+
142+
return this.acquireStream((HttpRequestBase) request, streamHandler);
143+
}
144+
145+
public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
146+
HttpStreamBaseResponseHandler streamHandler) {
147+
148+
return this.acquireStream((HttpRequestBase) request, streamHandler);
149+
}
150+
151+
private CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
152+
HttpStreamBaseResponseHandler streamHandler) {
153+
154+
CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>();
155+
AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null);
156+
if (isNull()) {
157+
completionFuture.completeExceptionally(new IllegalStateException(
158+
"Http2StreamManager has been closed, can't acquire new streams"));
159+
return completionFuture;
160+
}
161+
try {
162+
http2StreamManagerAcquireStream(this.getNativeHandle(),
163+
request.marshalForJni(),
164+
request.getBodyStream(),
165+
new HttpStreamResponseHandlerNativeAdapter(streamHandler),
166+
acquireStreamCompleted);
167+
} catch (CrtRuntimeException ex) {
168+
completionFuture.completeExceptionally(ex);
169+
}
170+
return completionFuture;
171+
}
172+
173+
/**
174+
* Called from Native when all Streams from this Stream manager have finished
175+
* and underlying resources like connections opened under the hood has been
176+
* cleaned up
177+
* begin releasing Native Resources that Http2StreamManager depends on.
178+
*/
179+
private void onShutdownComplete() {
180+
releaseReferences();
181+
182+
this.shutdownComplete.complete(null);
183+
}
184+
185+
/**
186+
* Determines whether a resource releases its dependencies at the same time the
187+
* native handle is released or if it waits.
188+
* Resources that wait are responsible for calling releaseReferences() manually.
189+
*/
190+
@Override
191+
protected boolean canReleaseReferencesImmediately() {
192+
return false;
193+
}
194+
195+
/**
196+
* Closes this Connection Pool and any pending Connection Acquisitions
197+
*/
198+
@Override
199+
protected void releaseNativeHandle() {
200+
if (!isNull()) {
201+
/*
202+
* Release our Native pointer and schedule tasks on the Native Event Loop to
203+
* start sending HTTP/TLS/TCP
204+
* connection shutdown messages to peers for any open Connections.
205+
*/
206+
http2StreamManagerRelease(getNativeHandle());
207+
}
208+
}
209+
210+
public CompletableFuture<Void> getShutdownCompleteFuture() {
211+
return shutdownComplete;
212+
}
213+
214+
/*******************************************************************************
215+
* Native methods
216+
******************************************************************************/
217+
218+
private static native long http2StreamManagerNew(Http2StreamManager thisObj,
219+
long client_bootstrap,
220+
long socketOptions,
221+
long tlsContext,
222+
long[] marshalledSettings,
223+
byte[] endpoint,
224+
int port,
225+
int proxyConnectionType,
226+
byte[] proxyHost,
227+
int proxyPort,
228+
long proxyTlsContext,
229+
int proxyAuthorizationType,
230+
byte[] proxyAuthorizationUsername,
231+
byte[] proxyAuthorizationPassword,
232+
boolean isManualWindowManagement,
233+
long monitoringThroughputThresholdInBytesPerSecond,
234+
int monitoringFailureIntervalInSeconds,
235+
int maxConns,
236+
int ideal_concurrent_streams_per_connection,
237+
int max_concurrent_streams_per_connection) throws CrtRuntimeException;
238+
239+
private static native void http2StreamManagerRelease(long stream_manager) throws CrtRuntimeException;
240+
241+
private static native void http2StreamManagerAcquireStream(long stream_manager,
242+
byte[] marshalledRequest,
243+
HttpRequestBodyStream bodyStream,
244+
HttpStreamResponseHandlerNativeAdapter responseHandler,
245+
AsyncCallback completedCallback) throws CrtRuntimeException;
246+
}

0 commit comments

Comments
 (0)