Skip to content

Commit 6e5fdf1

Browse files
committed
Let the HttpChannelConnector become an interface and embbed only the resource it needs to.
Motivation: The HttpClient implementations relies on the HttpChannelConnector to obtain a configured HTTP connection. While the HttpChannelConnector does not leak to HttpClient, the creation and usage of HttpChannelConnector makes it hard to swap with another implementations (let's say an HTTP/3 version). HttpChannelConnector references resources (NetClient transport / HttpClientMetrics) as well as configuration (various options), the presence of options implies to create connector instances per option. Ideally we should have a single connector per client and pass options as arguments when a connection is needed (which is fine because the connector does not actually cache state). Changes: Let HttpChannelConnector become an interface defining its needs. The HTTP/1.x / H2 implementations becomes an implementation and references the TCP client it requires. Usage of HttpClientImpl to obtain configuration is removed, so the connector can be created and passed to HttpClientImpl (in other words, we should avoid something like new HttpConnector(this.something)).
1 parent 663c72a commit 6e5fdf1

11 files changed

+375
-323
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
12+
package io.vertx.core.http.impl;
13+
14+
import io.netty.channel.Channel;
15+
import io.netty.channel.ChannelHandler;
16+
import io.netty.channel.ChannelPipeline;
17+
import io.netty.handler.codec.http.HttpClientCodec;
18+
import io.netty.handler.codec.http.HttpContentDecompressor;
19+
import io.netty.handler.logging.LoggingHandler;
20+
import io.netty.handler.ssl.SslHandler;
21+
import io.netty.handler.timeout.IdleStateHandler;
22+
import io.vertx.core.Future;
23+
import io.vertx.core.Promise;
24+
import io.vertx.core.http.HttpClientOptions;
25+
import io.vertx.core.http.HttpHeaders;
26+
import io.vertx.core.http.HttpVersion;
27+
import io.vertx.core.http.impl.http2.Http2ClientChannelInitializer;
28+
import io.vertx.core.http.impl.http2.codec.Http2CodecClientChannelInitializer;
29+
import io.vertx.core.http.impl.http2.multiplex.Http2MultiplexClientChannelInitializer;
30+
import io.vertx.core.internal.ContextInternal;
31+
import io.vertx.core.internal.PromiseInternal;
32+
import io.vertx.core.internal.http.HttpHeadersInternal;
33+
import io.vertx.core.internal.net.NetClientInternal;
34+
import io.vertx.core.net.*;
35+
import io.vertx.core.net.impl.VertxHandler;
36+
import io.vertx.core.net.impl.tcp.NetSocketImpl;
37+
import io.vertx.core.spi.metrics.ClientMetrics;
38+
import io.vertx.core.spi.metrics.HttpClientMetrics;
39+
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.concurrent.TimeUnit;
44+
45+
import static io.vertx.core.http.HttpMethod.OPTIONS;
46+
47+
/**
48+
* Performs the channel configuration and connection according to the client options and the protocol version.
49+
*
50+
* @author <a href="mailto:[email protected]">Julien Viet</a>
51+
*/
52+
public class Http1xOrH2ChannelConnector implements HttpChannelConnector {
53+
54+
private final HttpClientMetrics clientMetrics;
55+
private final NetClientInternal netClient;
56+
57+
public Http1xOrH2ChannelConnector(NetClientInternal netClient,
58+
HttpClientMetrics clientMetrics) {
59+
this.clientMetrics = clientMetrics;
60+
this.netClient = netClient;
61+
}
62+
63+
private Http2ClientChannelInitializer http2Initializer(HttpClientOptions options) {
64+
if (options.getHttp2MultiplexImplementation()) {
65+
return new Http2MultiplexClientChannelInitializer(
66+
HttpUtils.fromVertxSettings(options.getInitialSettings()),
67+
clientMetrics,
68+
TimeUnit.SECONDS.toMillis(options.getHttp2KeepAliveTimeout()),
69+
options.getHttp2MultiplexingLimit(),
70+
options.isDecompressionSupported(),
71+
options.getLogActivity());
72+
} else {
73+
return new Http2CodecClientChannelInitializer(options, clientMetrics);
74+
}
75+
}
76+
77+
private void connect(ContextInternal context, HttpConnectParams params, HostAndPort authority, SocketAddress server, Promise<NetSocket> promise) {
78+
ConnectOptions connectOptions = new ConnectOptions();
79+
connectOptions.setRemoteAddress(server);
80+
if (authority != null) {
81+
connectOptions.setHost(authority.host());
82+
connectOptions.setPort(authority.port());
83+
if (params.ssl && params.options.isForceSni()) {
84+
connectOptions.setSniServerName(authority.host());
85+
}
86+
}
87+
connectOptions.setSsl(params.ssl);
88+
if (params.ssl) {
89+
if (params.sslOptions != null) {
90+
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(params.useAlpn));
91+
} else {
92+
connectOptions.setSslOptions(new ClientSSLOptions().setHostnameVerificationAlgorithm("HTTPS"));
93+
}
94+
}
95+
connectOptions.setProxyOptions(params.proxyOptions);
96+
netClient.connectInternal(connectOptions, promise, context);
97+
}
98+
99+
public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectParams params, HostAndPort authority, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics, SocketAddress server, NetSocket so_) {
100+
NetSocketImpl so = (NetSocketImpl) so_;
101+
Object metric = so.metric();
102+
PromiseInternal<HttpClientConnection> promise = context.promise();
103+
104+
// Remove all un-necessary handlers
105+
ChannelPipeline pipeline = so.channelHandlerContext().pipeline();
106+
List<ChannelHandler> removedHandlers = new ArrayList<>();
107+
for (Map.Entry<String, ChannelHandler> stringChannelHandlerEntry : pipeline) {
108+
ChannelHandler handler = stringChannelHandlerEntry.getValue();
109+
if (!(handler instanceof SslHandler)) {
110+
removedHandlers.add(handler);
111+
}
112+
}
113+
removedHandlers.forEach(pipeline::remove);
114+
115+
//
116+
Channel ch = so.channelHandlerContext().channel();
117+
if (params.ssl) {
118+
String protocol = so.applicationLayerProtocol();
119+
if (params.useAlpn) {
120+
if ("h2".equals(protocol)) {
121+
applyHttp2ConnectionOptions(ch.pipeline(), params.options);
122+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
123+
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
124+
} else {
125+
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
126+
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
127+
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
128+
http1xConnected(params.options, fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
129+
}
130+
} else {
131+
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
132+
http1xConnected(params.options, params.version, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
133+
}
134+
} else {
135+
if (params.version == HttpVersion.HTTP_2) {
136+
if (params.options.isHttp2ClearTextUpgrade()) {
137+
applyHttp1xConnectionOptions(pipeline, params.options);
138+
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
139+
} else {
140+
applyHttp2ConnectionOptions(pipeline, params.options);
141+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
142+
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
143+
}
144+
} else {
145+
applyHttp1xConnectionOptions(pipeline, params.options);
146+
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
147+
}
148+
}
149+
return promise.future();
150+
}
151+
152+
public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketAddress server, HostAndPort authority, HttpConnectParams params, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics) {
153+
Promise<NetSocket> promise = context.promise();
154+
Future<NetSocket> future = promise.future();
155+
// We perform the compose operation before calling connect to be sure that the composition happens
156+
// before the promise is completed by the connect operation
157+
Future<HttpClientConnection> ret = future.compose(so -> wrap(context, params, authority, maxLifetimeMillis, metrics, server, so));
158+
connect(context, params, authority, server, promise);
159+
return ret;
160+
}
161+
162+
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
163+
int idleTimeout = options.getIdleTimeout();
164+
int readIdleTimeout = options.getReadIdleTimeout();
165+
int writeIdleTimeout = options.getWriteIdleTimeout();
166+
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
167+
pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
168+
}
169+
}
170+
171+
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
172+
int idleTimeout = options.getIdleTimeout();
173+
int readIdleTimeout = options.getReadIdleTimeout();
174+
int writeIdleTimeout = options.getWriteIdleTimeout();
175+
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
176+
pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
177+
}
178+
if (options.getLogActivity()) {
179+
pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
180+
}
181+
pipeline.addLast("codec", new HttpClientCodec(
182+
options.getMaxInitialLineLength(),
183+
options.getMaxHeaderSize(),
184+
options.getMaxChunkSize(),
185+
false,
186+
!HttpHeadersInternal.DISABLE_HTTP_HEADERS_VALIDATION,
187+
options.getDecoderInitialBufferSize()));
188+
if (options.isDecompressionSupported()) {
189+
pipeline.addLast("inflater", new HttpContentDecompressor(false));
190+
}
191+
}
192+
193+
private void http1xConnected(HttpClientOptions options,
194+
HttpVersion version,
195+
SocketAddress server,
196+
HostAndPort authority,
197+
boolean ssl,
198+
ContextInternal context,
199+
Object socketMetric,
200+
long maxLifetimeMillis, Channel ch,
201+
ClientMetrics<?, ?, ?> metrics,
202+
Promise<HttpClientConnection> future) {
203+
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
204+
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
205+
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, clientMetrics, options, chctx, ssl, server, authority, context, metrics, maxLifetimeMillis);
206+
if (clientMetrics != null) {
207+
conn.metric(socketMetric);
208+
clientMetrics.endpointConnected(metrics);
209+
}
210+
return conn;
211+
});
212+
clientHandler.addHandler(conn -> {
213+
if (upgrade) {
214+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(options);
215+
Http2UpgradeClientConnection.Http2ChannelUpgrade channelUpgrade= http2ChannelInitializer.channelUpgrade(conn, maxLifetimeMillis, metrics);
216+
boolean preflightRequest = options.isHttp2ClearTextUpgradeWithPreflightRequest();
217+
if (preflightRequest) {
218+
Http2UpgradeClientConnection conn2 = new Http2UpgradeClientConnection(conn, maxLifetimeMillis, metrics, channelUpgrade);
219+
conn2.concurrencyChangeHandler(concurrency -> {
220+
// Ignore
221+
});
222+
conn2.createStream(conn.context()).onComplete(ar -> {
223+
if (ar.succeeded()) {
224+
HttpClientStream stream = ar.result();
225+
stream.headHandler(resp -> {
226+
Http2UpgradeClientConnection connection = (Http2UpgradeClientConnection) stream.connection();
227+
HttpClientConnection unwrap = connection.unwrap();
228+
future.tryComplete(unwrap);
229+
});
230+
stream.exceptionHandler(future::tryFail);
231+
HttpRequestHead request = new HttpRequestHead(OPTIONS, "/", HttpHeaders.headers(), HostAndPort.authority(server.host(), server.port()),
232+
"http://" + server + "/", null);
233+
stream.writeHead(request, false, null, true, null, false);
234+
} else {
235+
future.fail(ar.cause());
236+
}
237+
});
238+
} else {
239+
future.complete(new Http2UpgradeClientConnection(conn, maxLifetimeMillis, metrics, channelUpgrade));
240+
}
241+
} else {
242+
future.complete(conn);
243+
}
244+
});
245+
ch.pipeline().addLast("handler", clientHandler);
246+
}
247+
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@
2929
import io.vertx.core.internal.logging.LoggerFactory;
3030
import io.vertx.core.net.HostAndPort;
3131
import io.vertx.core.net.SocketAddress;
32+
import io.vertx.core.spi.metrics.ClientMetrics;
3233

33-
import javax.net.ssl.SSLPeerUnverifiedException;
3434
import javax.net.ssl.SSLSession;
35-
import java.security.cert.Certificate;
36-
import java.util.List;
3735
import java.util.concurrent.TimeUnit;
3836

3937
/**
@@ -47,7 +45,9 @@ public class Http2UpgradeClientConnection implements HttpClientConnection {
4745

4846
private static final Logger log = LoggerFactory.getLogger(Http2UpgradeClientConnection.class);
4947

48+
private final long maxLifetimeMillis;
5049
private final Http2ChannelUpgrade upgrade;
50+
private final ClientMetrics<?, ?, ?> metrics;
5151
private HttpClientConnection current;
5252
private boolean upgradeProcessed;
5353

@@ -61,9 +61,11 @@ public class Http2UpgradeClientConnection implements HttpClientConnection {
6161
private Handler<Long> concurrencyChangeHandler;
6262
private Handler<Http2Settings> remoteSettingsHandler;
6363

64-
Http2UpgradeClientConnection(Http1xClientConnection connection, Http2ChannelUpgrade upgrade) {
64+
Http2UpgradeClientConnection(Http1xClientConnection connection, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics, Http2ChannelUpgrade upgrade) {
6565
this.current = connection;
66+
this.maxLifetimeMillis = maxLifetimeMillis;
6667
this.upgrade = upgrade;
68+
this.metrics = metrics;
6769
}
6870

6971
public HttpClientConnection unwrap() {
@@ -341,6 +343,8 @@ void handleUpgrade(HttpClientConnection conn, HttpClientStream stream) {
341343
private final Http2ChannelUpgrade upgrade;
342344
private final HttpClientStream upgradingStream;
343345
private final Http2UpgradeClientConnection upgradedConnection;
346+
private final long maxLifetimeMillis;
347+
private final ClientMetrics<?, ?, ?> metrics;
344348
private HttpClientStream upgradedStream;
345349
private Handler<HttpResponseHead> headHandler;
346350
private Handler<Buffer> chunkHandler;
@@ -355,11 +359,13 @@ void handleUpgrade(HttpClientConnection conn, HttpClientStream stream) {
355359
private Handler<HttpFrame> unknownFrameHandler;
356360
private Handler<Void> closeHandler;
357361

358-
UpgradingStream(HttpClientStream stream, Http2UpgradeClientConnection upgradedConnection, Http2ChannelUpgrade upgrade, Http1xClientConnection upgradingConnection) {
362+
UpgradingStream(HttpClientStream stream, Http2UpgradeClientConnection upgradedConnection, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics, Http2ChannelUpgrade upgrade, Http1xClientConnection upgradingConnection) {
363+
this.maxLifetimeMillis = maxLifetimeMillis;
359364
this.upgradedConnection = upgradedConnection;
360365
this.upgradingConnection = upgradingConnection;
361366
this.upgradingStream = stream;
362367
this.upgrade = upgrade;
368+
this.metrics = metrics;
363369
}
364370

365371
@Override
@@ -397,9 +403,7 @@ public void upgradeFailure(Throwable cause) {
397403
}
398404
};
399405
upgrade.upgrade(upgradingStream, request, buf, end,
400-
upgradingConnection.channelHandlerContext().channel(),
401-
blah
402-
);
406+
upgradingConnection.channelHandlerContext().channel(), maxLifetimeMillis, metrics, blah);
403407
PromiseInternal<Void> promise = upgradingStream.context().promise();
404408
writeHead(request, chunked, buf, end, priority, connect, promise);
405409
return promise.future();
@@ -691,7 +695,7 @@ public Future<HttpClientStream> createStream(ContextInternal context) {
691695
if (current instanceof Http1xClientConnection && !upgradeProcessed) {
692696
return current
693697
.createStream(context)
694-
.map(stream -> new UpgradingStream(stream, this, upgrade, (Http1xClientConnection) current));
698+
.map(stream -> new UpgradingStream(stream, this, maxLifetimeMillis, metrics, upgrade, (Http1xClientConnection) current));
695699
} else {
696700
return current
697701
.createStream(context)
@@ -900,6 +904,7 @@ void upgrade(HttpClientStream upgradingStream,
900904
Buffer content,
901905
boolean end,
902906
Channel channel,
903-
UpgradeResult result);
907+
long maxLifetimeMillis,
908+
ClientMetrics<?, ?, ?> clientMetrics, UpgradeResult result);
904909
}
905910
}

0 commit comments

Comments
 (0)