Skip to content

Commit 1a91438

Browse files
committed
Decouple HttpClient implementation from HttpClientOptions
1 parent cf2cbeb commit 1a91438

File tree

12 files changed

+205
-175
lines changed

12 files changed

+205
-175
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/CleanableHttpClient.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,6 @@ public VertxInternal vertx() {
9191
return delegate.vertx();
9292
}
9393

94-
@Override
95-
public HttpClientOptions options() {
96-
return delegate.options();
97-
}
98-
9994
@Override
10095
public boolean isMetricsEnabled() {
10196
return delegate.isMetricsEnabled();
@@ -112,8 +107,8 @@ public Function<HttpClientResponse, Future<RequestOptions>> redirectHandler() {
112107
}
113108

114109
@Override
115-
public NetClientInternal netClient() {
116-
return delegate.netClient();
110+
public HttpChannelConnector channelConnector() {
111+
return delegate.channelConnector();
117112
}
118113

119114
@Override

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xOrH2ChannelConnector.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,36 @@
5252
*/
5353
public class Http1xOrH2ChannelConnector implements HttpChannelConnector {
5454

55+
private final HttpClientOptions options;
5556
private final HttpClientMetrics clientMetrics;
5657
private final NetClientInternal netClient;
5758

5859
public Http1xOrH2ChannelConnector(NetClientInternal netClient,
60+
HttpClientOptions options,
5961
HttpClientMetrics clientMetrics) {
62+
63+
if (!options.isKeepAlive() && options.isPipelining()) {
64+
throw new IllegalStateException("Cannot have pipelining with no keep alive");
65+
}
66+
List<HttpVersion> alpnVersions = options.getAlpnVersions();
67+
if (alpnVersions == null || alpnVersions.isEmpty()) {
68+
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
69+
options.setAlpnVersions(List.of(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1));
70+
} else {
71+
options.setAlpnVersions(List.of(options.getProtocolVersion()));
72+
}
73+
}
74+
6075
this.clientMetrics = clientMetrics;
76+
this.options = options;
6177
this.netClient = netClient;
6278
}
6379

64-
private Http2ClientChannelInitializer http2Initializer(HttpClientOptions options) {
80+
public NetClientInternal netClient() {
81+
return netClient;
82+
}
83+
84+
private Http2ClientChannelInitializer http2Initializer() {
6585
if (options.getHttp2MultiplexImplementation()) {
6686
return new Http2MultiplexClientChannelInitializer(
6787
HttpUtils.fromVertxSettings(options.getInitialSettings()),
@@ -81,14 +101,14 @@ private void connect(ContextInternal context, HttpConnectParams params, HostAndP
81101
if (authority != null) {
82102
connectOptions.setHost(authority.host());
83103
connectOptions.setPort(authority.port());
84-
if (params.ssl && params.options.isForceSni()) {
104+
if (params.ssl && options.isForceSni()) {
85105
connectOptions.setSniServerName(authority.host());
86106
}
87107
}
88108
connectOptions.setSsl(params.ssl);
89109
if (params.ssl) {
90110
if (params.sslOptions != null) {
91-
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(params.useAlpn));
111+
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(options.isUseAlpn()));
92112
} else {
93113
connectOptions.setSslOptions(new ClientSSLOptions().setHostnameVerificationAlgorithm("HTTPS"));
94114
}
@@ -117,40 +137,45 @@ public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectPar
117137
Channel ch = so.channelHandlerContext().channel();
118138
if (params.ssl) {
119139
String protocol = so.applicationLayerProtocol();
120-
if (params.useAlpn) {
140+
if (options.isUseAlpn()) {
121141
if ("h2".equals(protocol)) {
122-
applyHttp2ConnectionOptions(ch.pipeline(), params.options);
123-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
142+
applyHttp2ConnectionOptions(ch.pipeline());
143+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
124144
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
125145
} else {
126-
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
146+
applyHttp1xConnectionOptions(ch.pipeline());
127147
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
128148
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
129-
http1xConnected(params.options, fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
149+
http1xConnected(fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
130150
}
131151
} else {
132-
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
133-
http1xConnected(params.options, params.version, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
152+
applyHttp1xConnectionOptions(ch.pipeline());
153+
http1xConnected(options.getProtocolVersion(), server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
134154
}
135155
} else {
136-
if (params.version == HttpVersion.HTTP_2) {
137-
if (params.options.isHttp2ClearTextUpgrade()) {
138-
applyHttp1xConnectionOptions(pipeline, params.options);
139-
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
156+
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
157+
if (options.isHttp2ClearTextUpgrade()) {
158+
applyHttp1xConnectionOptions(pipeline);
159+
http1xConnected(options.getProtocolVersion(), server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
140160
} else {
141-
applyHttp2ConnectionOptions(pipeline, params.options);
142-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
161+
applyHttp2ConnectionOptions(pipeline);
162+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
143163
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
144164
}
145165
} else {
146-
applyHttp1xConnectionOptions(pipeline, params.options);
147-
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
166+
applyHttp1xConnectionOptions(pipeline);
167+
http1xConnected(options.getProtocolVersion(), server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
148168
}
149169
}
150170
return promise.future();
151171
}
152172

153173
public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketAddress server, HostAndPort authority, HttpConnectParams params, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics) {
174+
175+
if (!options.isUseAlpn() && params.ssl && this.options.getProtocolVersion() == HttpVersion.HTTP_2) {
176+
return context.failedFuture("Must enable ALPN when using H2");
177+
}
178+
154179
Promise<NetSocket> promise = context.promise();
155180
Future<NetSocket> future = promise.future();
156181
// We perform the compose operation before calling connect to be sure that the composition happens
@@ -160,7 +185,7 @@ public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketA
160185
return ret;
161186
}
162187

163-
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
188+
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
164189
int idleTimeout = options.getIdleTimeout();
165190
int readIdleTimeout = options.getReadIdleTimeout();
166191
int writeIdleTimeout = options.getWriteIdleTimeout();
@@ -169,7 +194,7 @@ private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOpt
169194
}
170195
}
171196

172-
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
197+
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
173198
int idleTimeout = options.getIdleTimeout();
174199
int readIdleTimeout = options.getReadIdleTimeout();
175200
int writeIdleTimeout = options.getWriteIdleTimeout();
@@ -191,8 +216,7 @@ private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOp
191216
}
192217
}
193218

194-
private void http1xConnected(HttpClientOptions options,
195-
HttpVersion version,
219+
private void http1xConnected(HttpVersion version,
196220
SocketAddress server,
197221
HostAndPort authority,
198222
boolean ssl,
@@ -212,7 +236,7 @@ private void http1xConnected(HttpClientOptions options,
212236
});
213237
clientHandler.addHandler(conn -> {
214238
if (upgrade) {
215-
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(options);
239+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer();
216240
Http2UpgradeClientConnection.Http2ChannelUpgrade channelUpgrade= http2ChannelInitializer.channelUpgrade(conn, maxLifetimeMillis, metrics);
217241
boolean preflightRequest = options.isHttp2ClearTextUpgradeWithPreflightRequest();
218242
if (preflightRequest) {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientBase.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -34,51 +34,58 @@
3434
public class HttpClientBase implements MetricsProvider, Closeable {
3535

3636
protected final VertxInternal vertx;
37-
public final HttpClientOptions options;
37+
protected final ProxyOptions defaultProxyOptions;
3838
protected final HttpChannelConnector connector;
3939
protected final HttpClientMetrics<?, ?, ?> metrics;
4040
protected final CloseSequence closeSequence;
4141
private volatile ClientSSLOptions defaultSslOptions;
4242
private long closeTimeout = 0L;
4343
private TimeUnit closeTimeoutUnit = TimeUnit.SECONDS;
4444
private Predicate<SocketAddress> proxyFilter;
45-
46-
public HttpClientBase(VertxInternal vertx, HttpClientOptions options, HttpChannelConnector connector, HttpClientMetrics<?, ?, ?> metrics) {
47-
if (!options.isKeepAlive() && options.isPipelining()) {
48-
throw new IllegalStateException("Cannot have pipelining with no keep alive");
49-
}
50-
options = new HttpClientOptions(options);
51-
List<HttpVersion> alpnVersions = options.getAlpnVersions();
52-
if (alpnVersions == null || alpnVersions.isEmpty()) {
53-
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
54-
options.setAlpnVersions(List.of(HttpVersion.HTTP_2, HttpVersion.HTTP_1_1));
55-
} else {
56-
options.setAlpnVersions(List.of(options.getProtocolVersion()));
57-
}
45+
private final boolean verifyHost;
46+
protected final boolean defaultSsl;
47+
protected final String defaultHost;
48+
protected final int defaultPort;
49+
protected final int maxRedirects;
50+
51+
public HttpClientBase(VertxInternal vertx,
52+
HttpChannelConnector connector,
53+
HttpClientMetrics<?, ?, ?> metrics,
54+
ProxyOptions defaultProxyOptions,
55+
ClientSSLOptions defaultSslOptions,
56+
List<String> nonProxyHosts,
57+
boolean verifyHost,
58+
boolean defaultSsl,
59+
String defaultHost,
60+
int defaultPort,
61+
int maxRedirects) {
62+
63+
if (defaultSslOptions != null) {
64+
configureSSLOptions(verifyHost, defaultSslOptions);
5865
}
66+
5967
this.vertx = vertx;
60-
this.metrics = vertx.metrics() != null ? vertx.metrics().createHttpClientMetrics(options) : null;
61-
this.options = options;
68+
this.metrics = metrics;
69+
this.defaultProxyOptions = defaultProxyOptions;
6270
this.closeSequence = new CloseSequence(p -> doClose(p), p1 -> doShutdown(p1));
63-
this.proxyFilter = options.getNonProxyHosts() != null ? ProxyFilter.nonProxyHosts(options.getNonProxyHosts()) : ProxyFilter.DEFAULT_PROXY_FILTER;
71+
this.proxyFilter = nonProxyHosts != null ? ProxyFilter.nonProxyHosts(nonProxyHosts) : ProxyFilter.DEFAULT_PROXY_FILTER;
6472
this.connector = connector;
65-
// this.netClient = new NetClientBuilder(vertx, new NetClientOptions(options).setProxyOptions(null)).metrics(metrics).build();
66-
this.defaultSslOptions = options.getSslOptions();
67-
68-
ClientSSLOptions sslOptions = options.getSslOptions();
69-
if (sslOptions != null) {
70-
configureSSLOptions(sslOptions);
71-
}
73+
this.defaultSslOptions = defaultSslOptions;
74+
this.verifyHost = verifyHost;
75+
this.defaultSsl = defaultSsl;
76+
this.defaultHost = defaultHost;
77+
this.defaultPort = defaultPort;
78+
this.maxRedirects = maxRedirects;
7279
}
7380

74-
private void configureSSLOptions(ClientSSLOptions sslOptions) {
81+
void configureSSLOptions(boolean verifyHost, ClientSSLOptions sslOptions) {
7582
if (sslOptions.getHostnameVerificationAlgorithm() == null) {
76-
sslOptions.setHostnameVerificationAlgorithm(options.isVerifyHost() ? "HTTPS" : "");
83+
sslOptions.setHostnameVerificationAlgorithm(verifyHost ? "HTTPS" : "");
7784
}
7885
}
7986

80-
public NetClientInternal netClient() {
81-
return null;
87+
public HttpChannelConnector channelConnector() {
88+
return connector;
8289
}
8390

8491
public Future<Void> closeFuture() {
@@ -98,12 +105,12 @@ protected int getPort(RequestOptions request) {
98105
if (server != null && server.isInetSocket()) {
99106
return server.port();
100107
}
101-
return options.getDefaultPort();
108+
return defaultPort;
102109
}
103110

104111
private ProxyOptions getProxyOptions(ProxyOptions proxyOptions) {
105112
if (proxyOptions == null) {
106-
proxyOptions = options.getProxyOptions();
113+
proxyOptions = defaultProxyOptions;
107114
}
108115
return proxyOptions;
109116
}
@@ -117,7 +124,7 @@ protected String getHost(RequestOptions request) {
117124
if (server != null && server.isInetSocket()) {
118125
return server.host();
119126
}
120-
return options.getDefaultHost();
127+
return defaultHost;
121128
}
122129

123130
protected ProxyOptions computeProxyOptions(ProxyOptions proxyOptions, SocketAddress addr) {
@@ -134,7 +141,7 @@ protected ClientSSLOptions sslOptions(HttpConnectOptions connectOptions) {
134141
ClientSSLOptions sslOptions = connectOptions.getSslOptions();
135142
if (sslOptions != null) {
136143
sslOptions = sslOptions.copy();
137-
configureSSLOptions(sslOptions);
144+
configureSSLOptions(verifyHost, sslOptions);
138145
} else {
139146
sslOptions = defaultSslOptions;
140147
}
@@ -171,7 +178,7 @@ public Metrics getMetrics() {
171178

172179
public Future<Boolean> updateSSLOptions(ClientSSLOptions options, boolean force) {
173180
options = options.copy();
174-
configureSSLOptions(options);
181+
configureSSLOptions(verifyHost, options);
175182
defaultSslOptions = options;
176183
return Future.succeededFuture(true);
177184
}
@@ -181,10 +188,6 @@ public HttpClientBase proxyFilter(Predicate<SocketAddress> filter) {
181188
return this;
182189
}
183190

184-
public HttpClientOptions options() {
185-
return options;
186-
}
187-
188191
public VertxInternal vertx() {
189192
return vertx;
190193
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientBuilderInternal.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,42 +93,64 @@ private EndpointResolver endpointResolver(HttpClientOptions co) {
9393
return null;
9494
}
9595

96-
private HttpClientImpl createHttpClientImpl(EndpointResolver resolver, HttpClientOptions co, PoolOptions po) {
96+
private HttpClientImpl createHttpClientImpl(EndpointResolver resolver,
97+
Handler<HttpConnection> connectionHandler,
98+
Function<HttpClientResponse, Future<RequestOptions>> redirectHandler,
99+
HttpClientOptions co,
100+
PoolOptions po) {
97101
HttpClientMetrics<?, ?, ?> metrics = vertx.metrics() != null ? vertx.metrics().createHttpClientMetrics(co) : null;
98102
NetClientInternal tcpClient = new NetClientBuilder(vertx, new NetClientOptions(co).setProxyOptions(null)).metrics(metrics).build();
99-
HttpChannelConnector channelConnector = new Http1xOrH2ChannelConnector(tcpClient, metrics);
100-
return new HttpClientImpl(vertx, channelConnector, metrics, resolver, co, po);
103+
HttpChannelConnector channelConnector = new Http1xOrH2ChannelConnector(tcpClient, co, metrics);
104+
HttpClientImpl.Config config = new HttpClientImpl.Config();
105+
config.nonProxyHosts = co.getNonProxyHosts();
106+
config.verifyHost = co.isVerifyHost();
107+
config.defaultSsl = co.isSsl();
108+
config.defaultHost = co.getDefaultHost();
109+
config.defaultPort = co.getDefaultPort();
110+
config.maxRedirects = co.getMaxRedirects();
111+
config.initialPoolKind = co.getProtocolVersion() == HttpVersion.HTTP_2 ? 1 : 0;
112+
return new HttpClientImpl(vertx, connectionHandler, redirectHandler, channelConnector, metrics, resolver, po, co.getProxyOptions(), co.getSslOptions(), config);
113+
}
114+
115+
private Handler<HttpConnection> connectionHandler(HttpClientOptions options) {
116+
int windowSize = options.getHttp2ConnectionWindowSize();
117+
Handler<HttpConnection> handler = connectHandler;
118+
if (windowSize > 0) {
119+
return connection -> {
120+
connection.setWindowSize(windowSize);
121+
if (handler != null) {
122+
handler.handle(connection);
123+
}
124+
};
125+
}
126+
return handler;
101127
}
102128

103129
@Override
104130
public HttpClientAgent build() {
131+
// Copy options here ????
105132
HttpClientOptions co = clientOptions != null ? clientOptions : new HttpClientOptions();
106133
PoolOptions po = poolOptions != null ? poolOptions : new PoolOptions();
107134
CloseFuture cf = resolveCloseFuture();
108135
HttpClientAgent client;
109136
Closeable closeable;
110137
EndpointResolver resolver = endpointResolver(co);
138+
Handler<HttpConnection> connectHandler = connectionHandler(co);
111139
if (co.isShared()) {
112140
CloseFuture closeFuture = new CloseFuture();
113141
client = vertx.createSharedResource("__vertx.shared.httpClients", co.getName(), closeFuture, cf_ -> {
114-
HttpClientImpl impl = createHttpClientImpl(resolver, co, po);
142+
HttpClientImpl impl = createHttpClientImpl(resolver, connectHandler, redirectHandler, co, po);
115143
cf_.add(completion -> impl.close().onComplete(completion));
116144
return impl;
117145
});
118146
client = new CleanableHttpClient((HttpClientInternal) client, vertx.cleaner(), (timeout, timeunit) -> closeFuture.close());
119147
closeable = closeFuture;
120148
} else {
121-
HttpClientImpl impl = createHttpClientImpl(resolver, co, po);
149+
HttpClientImpl impl = createHttpClientImpl(resolver, connectHandler, redirectHandler, co, po);
122150
closeable = impl;
123151
client = new CleanableHttpClient(impl, vertx.cleaner(), impl::shutdown);
124152
}
125153
cf.add(closeable);
126-
if (redirectHandler != null) {
127-
((HttpClientImpl)((CleanableHttpClient)client).delegate).redirectHandler(redirectHandler);
128-
}
129-
if (connectHandler != null) {
130-
((HttpClientImpl)((CleanableHttpClient)client).delegate).connectionHandler = connectHandler;
131-
}
132154
return client;
133155
}
134156
}

0 commit comments

Comments
 (0)