Skip to content

Commit 950e7e8

Browse files
authored
Merge pull request #5760 from eclipse-vertx/http-client-decoupling
Decouple HttpClient from HTTP/1.x and H2
2 parents f861f4b + 0bfb3ce commit 950e7e8

File tree

58 files changed

+419
-341
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+419
-341
lines changed

vertx-core/src/main/java/io/vertx/core/http/WebSocketFrame.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313

1414
import io.vertx.codegen.annotations.CacheReturn;
1515
import io.vertx.codegen.annotations.DataObject;
16-
import io.vertx.codegen.annotations.VertxGen;
1716
import io.vertx.core.buffer.Buffer;
18-
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
17+
import io.vertx.core.http.impl.websocket.WebSocketFrameImpl;
1918

2019
/**
2120
* A WebSocket frame that represents either text or binary data.

vertx-core/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,6 @@ public VertxInternal vertx() {
9191
return delegate.vertx();
9292
}
9393

94-
@Override
95-
public HttpClientOptions options() {
96-
return delegate.options();
97-
}
98-
9994
@Override
10095
public boolean isMetricsEnabled() {
10196
return delegate.isMetricsEnabled();
@@ -112,8 +107,8 @@ public Function<HttpClientResponse, Future<RequestOptions>> redirectHandler() {
112107
}
113108

114109
@Override
115-
public NetClientInternal netClient() {
116-
return delegate.netClient();
110+
public HttpChannelConnector channelConnector() {
111+
return delegate.channelConnector();
117112
}
118113

119114
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xOrH2ChannelConnector.java

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.vertx.core.http.HttpClientOptions;
2525
import io.vertx.core.http.HttpHeaders;
2626
import io.vertx.core.http.HttpVersion;
27+
import io.vertx.core.http.impl.http1x.Http1xClientConnection;
28+
import io.vertx.core.http.impl.http1x.Http2UpgradeClientConnection;
2729
import io.vertx.core.http.impl.http2.Http2ClientChannelInitializer;
2830
import io.vertx.core.http.impl.http2.codec.Http2CodecClientChannelInitializer;
2931
import io.vertx.core.http.impl.http2.multiplex.Http2MultiplexClientChannelInitializer;
@@ -37,6 +39,7 @@
3739
import io.vertx.core.spi.metrics.ClientMetrics;
3840
import io.vertx.core.spi.metrics.HttpClientMetrics;
3941

42+
import java.time.Duration;
4043
import java.util.ArrayList;
4144
import java.util.List;
4245
import java.util.Map;
@@ -51,16 +54,36 @@
5154
*/
5255
public class Http1xOrH2ChannelConnector implements HttpChannelConnector {
5356

57+
private final HttpClientOptions options;
5458
private final HttpClientMetrics clientMetrics;
5559
private final NetClientInternal netClient;
5660

5761
public Http1xOrH2ChannelConnector(NetClientInternal netClient,
62+
HttpClientOptions options,
5863
HttpClientMetrics clientMetrics) {
64+
65+
if (!options.isKeepAlive() && options.isPipelining()) {
66+
throw new IllegalStateException("Cannot have pipelining with no keep alive");
67+
}
68+
List<HttpVersion> alpnVersions = options.getAlpnVersions();
69+
if (alpnVersions == null || alpnVersions.isEmpty()) {
70+
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
71+
options.setAlpnVersions(List.of(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1));
72+
} else {
73+
options.setAlpnVersions(List.of(options.getProtocolVersion()));
74+
}
75+
}
76+
5977
this.clientMetrics = clientMetrics;
78+
this.options = options;
6079
this.netClient = netClient;
6180
}
6281

63-
private Http2ClientChannelInitializer http2Initializer(HttpClientOptions options) {
82+
public NetClientInternal netClient() {
83+
return netClient;
84+
}
85+
86+
private Http2ClientChannelInitializer http2Initializer() {
6487
if (options.getHttp2MultiplexImplementation()) {
6588
return new Http2MultiplexClientChannelInitializer(
6689
HttpUtils.fromVertxSettings(options.getInitialSettings()),
@@ -80,14 +103,14 @@ private void connect(ContextInternal context, HttpConnectParams params, HostAndP
80103
if (authority != null) {
81104
connectOptions.setHost(authority.host());
82105
connectOptions.setPort(authority.port());
83-
if (params.ssl && params.options.isForceSni()) {
106+
if (params.ssl && options.isForceSni()) {
84107
connectOptions.setSniServerName(authority.host());
85108
}
86109
}
87110
connectOptions.setSsl(params.ssl);
88111
if (params.ssl) {
89112
if (params.sslOptions != null) {
90-
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(params.useAlpn));
113+
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(options.isUseAlpn()));
91114
} else {
92115
connectOptions.setSslOptions(new ClientSSLOptions().setHostnameVerificationAlgorithm("HTTPS"));
93116
}
@@ -116,40 +139,45 @@ public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectPar
116139
Channel ch = so.channelHandlerContext().channel();
117140
if (params.ssl) {
118141
String protocol = so.applicationLayerProtocol();
119-
if (params.useAlpn) {
142+
if (options.isUseAlpn()) {
120143
if ("h2".equals(protocol)) {
121-
applyHttp2ConnectionOptions(ch.pipeline(), params.options);
122-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
144+
applyHttp2ConnectionOptions(ch.pipeline());
145+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
123146
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
124147
} else {
125-
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
148+
applyHttp1xConnectionOptions(ch.pipeline());
126149
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
127150
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
128-
http1xConnected(params.options, fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
151+
http1xConnected(fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
129152
}
130153
} else {
131-
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
132-
http1xConnected(params.options, params.version, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
154+
applyHttp1xConnectionOptions(ch.pipeline());
155+
http1xConnected(options.getProtocolVersion(), server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
133156
}
134157
} else {
135-
if (params.version == HttpVersion.HTTP_2) {
136-
if (params.options.isHttp2ClearTextUpgrade()) {
137-
applyHttp1xConnectionOptions(pipeline, params.options);
138-
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
158+
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
159+
if (options.isHttp2ClearTextUpgrade()) {
160+
applyHttp1xConnectionOptions(pipeline);
161+
http1xConnected(options.getProtocolVersion(), server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
139162
} else {
140-
applyHttp2ConnectionOptions(pipeline, params.options);
141-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
163+
applyHttp2ConnectionOptions(pipeline);
164+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
142165
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
143166
}
144167
} else {
145-
applyHttp1xConnectionOptions(pipeline, params.options);
146-
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
168+
applyHttp1xConnectionOptions(pipeline);
169+
http1xConnected(options.getProtocolVersion(), server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
147170
}
148171
}
149172
return promise.future();
150173
}
151174

152175
public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketAddress server, HostAndPort authority, HttpConnectParams params, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics) {
176+
177+
if (!options.isUseAlpn() && params.ssl && this.options.getProtocolVersion() == HttpVersion.HTTP_2) {
178+
return context.failedFuture("Must enable ALPN when using H2");
179+
}
180+
153181
Promise<NetSocket> promise = context.promise();
154182
Future<NetSocket> future = promise.future();
155183
// We perform the compose operation before calling connect to be sure that the composition happens
@@ -159,7 +187,7 @@ public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketA
159187
return ret;
160188
}
161189

162-
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
190+
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
163191
int idleTimeout = options.getIdleTimeout();
164192
int readIdleTimeout = options.getReadIdleTimeout();
165193
int writeIdleTimeout = options.getWriteIdleTimeout();
@@ -168,7 +196,7 @@ private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOpt
168196
}
169197
}
170198

171-
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
199+
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
172200
int idleTimeout = options.getIdleTimeout();
173201
int readIdleTimeout = options.getReadIdleTimeout();
174202
int writeIdleTimeout = options.getWriteIdleTimeout();
@@ -190,8 +218,7 @@ private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOp
190218
}
191219
}
192220

193-
private void http1xConnected(HttpClientOptions options,
194-
HttpVersion version,
221+
private void http1xConnected(HttpVersion version,
195222
SocketAddress server,
196223
HostAndPort authority,
197224
boolean ssl,
@@ -211,7 +238,7 @@ private void http1xConnected(HttpClientOptions options,
211238
});
212239
clientHandler.addHandler(conn -> {
213240
if (upgrade) {
214-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(options);
241+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
215242
Http2UpgradeClientConnection.Http2ChannelUpgrade channelUpgrade= http2ChannelInitializer.channelUpgrade(conn, maxLifetimeMillis, metrics);
216243
boolean preflightRequest = options.isHttp2ClearTextUpgradeWithPreflightRequest();
217244
if (preflightRequest) {
@@ -244,4 +271,14 @@ private void http1xConnected(HttpClientOptions options,
244271
});
245272
ch.pipeline().addLast("handler", clientHandler);
246273
}
274+
275+
@Override
276+
public Future<Void> shutdown(Duration timeout) {
277+
return netClient.shutdown(timeout.toMillis(), TimeUnit.MILLISECONDS);
278+
}
279+
280+
@Override
281+
public Future<Void> close() {
282+
return netClient.close();
283+
}
247284
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.vertx.core.net.SocketAddress;
1818
import io.vertx.core.spi.metrics.ClientMetrics;
1919

20+
import java.time.Duration;
21+
2022
/**
2123
* Performs the channel configuration and connection according to the client options and the protocol version.
2224
*
@@ -31,4 +33,8 @@ Future<HttpClientConnection> httpConnect(ContextInternal context,
3133
long maxLifetimeMillis,
3234
ClientMetrics<?, ?, ?> metrics);
3335

36+
Future<Void> shutdown(Duration timeout);
37+
38+
Future<Void> close();
39+
3440
}

0 commit comments

Comments
 (0)