Skip to content

Commit d946c18

Browse files
committed
unified stream manager
1 parent dd0f98a commit d946c18

File tree

7 files changed

+699
-4
lines changed

7 files changed

+699
-4
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package software.amazon.awssdk.crt.http;
2+
3+
import software.amazon.awssdk.crt.CrtRuntimeException;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
7+
/**
8+
* Manages a Pool of HTTP/1.1 Streams. Creates and manages HTTP/1.1 connections
9+
* under the hood. Will grab a connection from HttpClientConnectionManager to
10+
* make request on it, and will return it back until the request finishes.
11+
*/
12+
public class Http1StreamManager implements AutoCloseable {
13+
14+
private HttpClientConnectionManager connectionManager = null;
15+
16+
/**
17+
* Factory function for Http1StreamManager instances
18+
*
19+
* @param options the connection manager options configure to connection manager under the hood
20+
* @return a new instance of an Http1StreamManager
21+
*/
22+
public static Http1StreamManager create(HttpClientConnectionManagerOptions options) {
23+
return new Http1StreamManager(options);
24+
}
25+
26+
private Http1StreamManager(HttpClientConnectionManagerOptions options) {
27+
this.connectionManager = HttpClientConnectionManager.create(options);
28+
}
29+
30+
public CompletableFuture<Void> getShutdownCompleteFuture() {
31+
return this.connectionManager.getShutdownCompleteFuture();
32+
}
33+
34+
/**
35+
* Request an HTTP/1.1 HttpStream from StreamManager.
36+
*
37+
* @param request HttpRequest. The Request to make to the Server.
38+
* @param streamHandler HttpStreamResponseHandler. The Stream Handler to be called from the Native EventLoop
39+
* @return A future for a HttpStream that will be completed when the stream is
40+
* acquired.
41+
* @throws CrtRuntimeException Exception happens from acquiring stream.
42+
*/
43+
public CompletableFuture<HttpStream> acquireStream(HttpRequest request,
44+
HttpStreamResponseHandler streamHandler) {
45+
CompletableFuture<HttpStream> completionFuture = new CompletableFuture<>();
46+
HttpClientConnectionManager connManager = this.connectionManager;
47+
this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> {
48+
if (throwable != null) {
49+
completionFuture.completeExceptionally(throwable);
50+
} else {
51+
try {
52+
HttpStream stream = conn.makeRequest(request, new HttpStreamResponseHandler() {
53+
@Override
54+
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType,
55+
HttpHeader[] nextHeaders) {
56+
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
57+
}
58+
59+
@Override
60+
public void onResponseHeadersDone(HttpStream stream, int blockType) {
61+
streamHandler.onResponseHeadersDone(stream, blockType);
62+
}
63+
64+
@Override
65+
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
66+
return streamHandler.onResponseBody(stream, bodyBytesIn);
67+
}
68+
69+
@Override
70+
public void onResponseComplete(HttpStream stream, int errorCode) {
71+
streamHandler.onResponseComplete(stream, errorCode);
72+
/* Release the connection back */
73+
connManager.releaseConnection(conn);
74+
}
75+
});
76+
completionFuture.complete(stream);
77+
/* Active the stream for user */
78+
try {
79+
stream.activate();
80+
} catch (CrtRuntimeException e) {
81+
/* If activate failed, complete callback will not be invoked */
82+
streamHandler.onResponseComplete(stream, e.errorCode);
83+
/* Release the connection back */
84+
connManager.releaseConnection(conn);
85+
}
86+
} catch (Exception ex) {
87+
connManager.releaseConnection(conn);
88+
completionFuture.completeExceptionally(ex);
89+
}
90+
}
91+
});
92+
return completionFuture;
93+
}
94+
95+
96+
/**
97+
* Request an HTTP/1.1 HttpStream from StreamManager.
98+
*
99+
* @param request HttpRequestBase. The Request to make to the Server.
100+
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
101+
* @return A future for a HttpStreamBase that will be completed when the stream is
102+
* acquired.
103+
* @throws CrtRuntimeException Exception happens from acquiring stream.
104+
*/
105+
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
106+
HttpStreamBaseResponseHandler streamHandler) {
107+
CompletableFuture<HttpStreamBase> completionFuture = new CompletableFuture<>();
108+
HttpClientConnectionManager connManager = this.connectionManager;
109+
this.connectionManager.acquireConnection().whenComplete((conn, throwable) -> {
110+
if (throwable != null) {
111+
completionFuture.completeExceptionally(throwable);
112+
} else {
113+
try {
114+
HttpStreamBase stream = conn.makeRequest(request, new HttpStreamBaseResponseHandler() {
115+
@Override
116+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
117+
HttpHeader[] nextHeaders) {
118+
streamHandler.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
119+
}
120+
121+
@Override
122+
public void onResponseHeadersDone(HttpStreamBase stream, int blockType) {
123+
streamHandler.onResponseHeadersDone(stream, blockType);
124+
}
125+
126+
@Override
127+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
128+
return streamHandler.onResponseBody(stream, bodyBytesIn);
129+
}
130+
131+
@Override
132+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
133+
streamHandler.onResponseComplete(stream, errorCode);
134+
/* Release the connection back */
135+
connManager.releaseConnection(conn);
136+
}
137+
});
138+
completionFuture.complete(stream);
139+
/* Active the stream for user */
140+
try {
141+
stream.activate();
142+
} catch (CrtRuntimeException e) {
143+
/* If activate failed, complete callback will not be invoked */
144+
streamHandler.onResponseComplete(stream, e.errorCode);
145+
/* Release the connection back */
146+
connManager.releaseConnection(conn);
147+
}
148+
} catch (Exception ex) {
149+
connManager.releaseConnection(conn);
150+
completionFuture.completeExceptionally(ex);
151+
}
152+
}
153+
});
154+
return completionFuture;
155+
}
156+
157+
@Override
158+
public void close() {
159+
this.connectionManager.close();
160+
}
161+
}

src/main/java/software/amazon/awssdk/crt/http/Http2StreamManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ public CompletableFuture<Http2Stream> acquireStream(HttpRequest request,
148148
return this.acquireStream((HttpRequestBase) request, streamHandler);
149149
}
150150

151-
private CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
152-
HttpStreamBaseResponseHandler streamHandler) {
153-
151+
public CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request,
152+
HttpStreamBaseResponseHandler streamHandler) {
154153
CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>();
155154
AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null);
156155
if (isNull()) {
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
package software.amazon.awssdk.crt.http;
7+
8+
import software.amazon.awssdk.crt.CrtRuntimeException;
9+
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
/**
14+
* Manages a pool for either HTTP/1.1 or HTTP/2 connection.
15+
*
16+
* Contains two stream manager for two protocols under the hood.
17+
*/
18+
public class HttpStreamManager implements AutoCloseable {
19+
20+
private Http1StreamManager h1StreamManager = null;
21+
private Http2StreamManager h2StreamManager = null;
22+
private CompletableFuture<Void> shutdownComplete = null;
23+
private AtomicLong shutdownNum = new AtomicLong(0);
24+
private Throwable shutdownCompleteException = null;
25+
26+
/**
27+
* Factory function for HttpStreamManager instances
28+
*
29+
* @param options configuration options
30+
* @return a new instance of an HttpStreamManager
31+
*/
32+
public static HttpStreamManager create(HttpStreamManagerOptions options) {
33+
return new HttpStreamManager(options);
34+
}
35+
36+
private HttpStreamManager(HttpStreamManagerOptions options) {
37+
this.shutdownComplete = new CompletableFuture<Void>();
38+
if (options.getExpectedProtocol() == HttpVersion.UNKNOWN) {
39+
this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions());
40+
this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions());
41+
} else {
42+
if (options.getExpectedProtocol() == HttpVersion.HTTP_2) {
43+
this.h2StreamManager = Http2StreamManager.create(options.getHTTP2StreamManagerOptions());
44+
} else {
45+
this.h1StreamManager = Http1StreamManager.create(options.getHTTP1ConnectionManagerOptions());
46+
}
47+
/* Only one manager created. */
48+
this.shutdownNum.addAndGet(1);
49+
}
50+
if (this.h1StreamManager != null) {
51+
this.h1StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> {
52+
if (throwable != null) {
53+
this.shutdownCompleteException = throwable;
54+
}
55+
long shutdownNum = this.shutdownNum.addAndGet(1);
56+
if (shutdownNum == 2) {
57+
/* both connectionManager and the h2StreamManager has been shutdown. */
58+
if (this.shutdownCompleteException != null) {
59+
this.shutdownComplete.completeExceptionally(this.shutdownCompleteException);
60+
} else {
61+
this.shutdownComplete.complete(null);
62+
}
63+
}
64+
});
65+
}
66+
if (this.h2StreamManager != null) {
67+
this.h2StreamManager.getShutdownCompleteFuture().whenComplete((v, throwable) -> {
68+
if (throwable != null) {
69+
this.shutdownCompleteException = throwable;
70+
}
71+
long shutdownNum = this.shutdownNum.addAndGet(1);
72+
if (shutdownNum == 2) {
73+
/* both connectionManager and the h2StreamManager has been shutdown. */
74+
if (this.shutdownCompleteException != null) {
75+
this.shutdownComplete.completeExceptionally(this.shutdownCompleteException);
76+
} else {
77+
this.shutdownComplete.complete(null);
78+
}
79+
}
80+
});
81+
}
82+
}
83+
84+
private void h1AcquireStream(HttpRequestBase request,
85+
HttpStreamBaseResponseHandler streamHandler, CompletableFuture<HttpStreamBase> completionFuture) {
86+
87+
this.h1StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> {
88+
if (throwable != null) {
89+
completionFuture.completeExceptionally(throwable);
90+
} else {
91+
completionFuture.complete(stream);
92+
}
93+
});
94+
}
95+
96+
/**
97+
* Request a HttpStream from StreamManager. If the streamManager is made with
98+
* HTTP/2 connection under the hood, it will be Http2Stream.
99+
*
100+
* @param request HttpRequestBase. The Request to make to the Server.
101+
* @param streamHandler HttpStreamBaseResponseHandler. The Stream Handler to be called from the Native EventLoop
102+
* @return A future for a Http2Stream that will be completed when the stream is
103+
* acquired.
104+
*/
105+
public CompletableFuture<HttpStreamBase> acquireStream(HttpRequestBase request,
106+
HttpStreamBaseResponseHandler streamHandler) {
107+
CompletableFuture<HttpStreamBase> completionFuture = new CompletableFuture<>();
108+
if (this.h2StreamManager != null) {
109+
this.h2StreamManager.acquireStream(request, streamHandler).whenComplete((stream, throwable) -> {
110+
if (throwable != null) {
111+
if (throwable instanceof CrtRuntimeException) {
112+
CrtRuntimeException exception = (CrtRuntimeException) throwable;
113+
if (exception.errorName.equals("AWS_ERROR_HTTP_STREAM_MANAGER_UNEXPECTED_HTTP_VERSION") && this.h1StreamManager != null) {
114+
this.h1AcquireStream(request, streamHandler, completionFuture);
115+
} else {
116+
completionFuture.completeExceptionally(throwable);
117+
}
118+
} else {
119+
completionFuture.completeExceptionally(throwable);
120+
}
121+
} else {
122+
completionFuture.complete((Http2Stream) stream);
123+
}
124+
});
125+
return completionFuture;
126+
}
127+
this.h1AcquireStream(request, streamHandler, completionFuture);
128+
return completionFuture;
129+
}
130+
131+
public CompletableFuture<Void> getShutdownCompleteFuture() {
132+
return shutdownComplete;
133+
}
134+
135+
@Override
136+
public void close() {
137+
if (this.h1StreamManager != null) {
138+
this.h1StreamManager.close();
139+
}
140+
if (this.h2StreamManager != null) {
141+
this.h2StreamManager.close();
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)