Skip to content

Commit f861f4b

Browse files
authored
Merge pull request #5759 from eclipse-vertx/http-improvements
Decouple HttpClient from HTTP/1.x and H2
2 parents b091d3c + 6e5fdf1 commit f861f4b

21 files changed

+456
-413
lines changed

vertx-core/src/main/java/io/vertx/core/http/HttpConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import javax.net.ssl.SSLPeerUnverifiedException;
2121
import javax.net.ssl.SSLSession;
2222
import java.security.cert.Certificate;
23+
import java.util.Arrays;
2324
import java.util.List;
2425
import java.util.concurrent.TimeUnit;
2526

@@ -268,7 +269,14 @@ default Future<Void> close() {
268269
* @see #sslSession()
269270
*/
270271
@GenIgnore()
271-
List<Certificate> peerCertificates() throws SSLPeerUnverifiedException;
272+
default List<Certificate> peerCertificates() throws SSLPeerUnverifiedException {
273+
SSLSession session = sslSession();
274+
if (session != null) {
275+
return Arrays.asList(session.getPeerCertificates());
276+
} else {
277+
return null;
278+
}
279+
}
272280

273281
/**
274282
* Returns the SNI server name presented during the SSL handshake by the client.

vertx-core/src/main/java/io/vertx/core/http/WebSocketBase.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import javax.net.ssl.SSLPeerUnverifiedException;
2828
import javax.net.ssl.SSLSession;
2929
import java.security.cert.Certificate;
30+
import java.util.Arrays;
3031
import java.util.List;
3132
import java.util.concurrent.TimeUnit;
3233

@@ -399,5 +400,12 @@ default Future<Void> shutdown(long timeout, TimeUnit unit, short statusCode) {
399400
* @see #sslSession()
400401
*/
401402
@GenIgnore()
402-
List<Certificate> peerCertificates() throws SSLPeerUnverifiedException;
403+
default List<Certificate> peerCertificates() throws SSLPeerUnverifiedException {
404+
SSLSession session = sslSession();
405+
if (session != null) {
406+
return Arrays.asList(session.getPeerCertificates());
407+
} else {
408+
return null;
409+
}
410+
}
403411
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
6969

7070
private static final Handler<Object> INVALID_MSG_HANDLER = ReferenceCountUtil::release;
7171

72-
private final HttpClientBase client;
72+
private final HttpClientMetrics clientMetrics;
7373
private final HttpClientOptions options;
7474
private final boolean ssl;
7575
private final SocketAddress server;
@@ -95,7 +95,8 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
9595
private long lastResponseReceivedTimestamp;
9696

9797
Http1xClientConnection(HttpVersion version,
98-
HttpClientBase client,
98+
HttpClientMetrics clientMetrics,
99+
HttpClientOptions options,
99100
ChannelHandlerContext chctx,
100101
boolean ssl,
101102
SocketAddress server,
@@ -104,8 +105,8 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
104105
ClientMetrics metrics,
105106
long maxLifetime) {
106107
super(context, chctx);
107-
this.client = client;
108-
this.options = client.options();
108+
this.clientMetrics = clientMetrics;
109+
this.options = options;
109110
this.ssl = ssl;
110111
this.server = server;
111112
this.authority = authority;
@@ -941,7 +942,7 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
941942
}
942943

943944
public HttpClientMetrics metrics() {
944-
return client.metrics();
945+
return clientMetrics;
945946
}
946947

947948
/**
@@ -1016,7 +1017,7 @@ synchronized void toWebSocket(
10161017
if (future.isSuccess()) {
10171018

10181019
VertxHandler<WebSocketConnectionImpl> handler = VertxHandler.create(ctx -> {
1019-
WebSocketConnectionImpl conn = new WebSocketConnectionImpl(context, ctx, false, TimeUnit.SECONDS.toMillis(options.getClosingTimeout()), client.metrics());
1020+
WebSocketConnectionImpl conn = new WebSocketConnectionImpl(context, ctx, false, TimeUnit.SECONDS.toMillis(options.getClosingTimeout()), clientMetrics);
10201021
WebSocketImpl webSocket = new WebSocketImpl(
10211022
context,
10221023
conn,
@@ -1037,9 +1038,8 @@ synchronized void toWebSocket(
10371038
ws.subProtocol(handshaker.actualSubprotocol());
10381039
ws.registerHandler(vertx.eventBus());
10391040

1040-
HttpClientMetrics metrics = client.metrics();
1041-
if (metrics != null) {
1042-
ws.setMetric(metrics.connected(ws));
1041+
if (clientMetrics != null) {
1042+
ws.setMetric(clientMetrics.connected(ws));
10431043
}
10441044
ws.pause();
10451045
Deque<WebSocketFrame> toResubmit = pendingFrames;
@@ -1176,8 +1176,7 @@ protected void handleClosed() {
11761176
super.handleClosed();
11771177
closed = true;
11781178
if (metrics != null) {
1179-
HttpClientMetrics met = client.metrics();
1180-
met.endpointDisconnected(metrics);
1179+
clientMetrics.endpointDisconnected(metrics);
11811180
}
11821181
if (!evicted) {
11831182
evicted = true;
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
12+
package io.vertx.core.http.impl;
13+
14+
import io.netty.channel.Channel;
15+
import io.netty.channel.ChannelHandler;
16+
import io.netty.channel.ChannelPipeline;
17+
import io.netty.handler.codec.http.HttpClientCodec;
18+
import io.netty.handler.codec.http.HttpContentDecompressor;
19+
import io.netty.handler.logging.LoggingHandler;
20+
import io.netty.handler.ssl.SslHandler;
21+
import io.netty.handler.timeout.IdleStateHandler;
22+
import io.vertx.core.Future;
23+
import io.vertx.core.Promise;
24+
import io.vertx.core.http.HttpClientOptions;
25+
import io.vertx.core.http.HttpHeaders;
26+
import io.vertx.core.http.HttpVersion;
27+
import io.vertx.core.http.impl.http2.Http2ClientChannelInitializer;
28+
import io.vertx.core.http.impl.http2.codec.Http2CodecClientChannelInitializer;
29+
import io.vertx.core.http.impl.http2.multiplex.Http2MultiplexClientChannelInitializer;
30+
import io.vertx.core.internal.ContextInternal;
31+
import io.vertx.core.internal.PromiseInternal;
32+
import io.vertx.core.internal.http.HttpHeadersInternal;
33+
import io.vertx.core.internal.net.NetClientInternal;
34+
import io.vertx.core.net.*;
35+
import io.vertx.core.net.impl.VertxHandler;
36+
import io.vertx.core.net.impl.tcp.NetSocketImpl;
37+
import io.vertx.core.spi.metrics.ClientMetrics;
38+
import io.vertx.core.spi.metrics.HttpClientMetrics;
39+
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.concurrent.TimeUnit;
44+
45+
import static io.vertx.core.http.HttpMethod.OPTIONS;
46+
47+
/**
48+
* Performs the channel configuration and connection according to the client options and the protocol version.
49+
*
50+
* @author <a href="mailto:[email protected]">Julien Viet</a>
51+
*/
52+
public class Http1xOrH2ChannelConnector implements HttpChannelConnector {
53+
54+
private final HttpClientMetrics clientMetrics;
55+
private final NetClientInternal netClient;
56+
57+
public Http1xOrH2ChannelConnector(NetClientInternal netClient,
58+
HttpClientMetrics clientMetrics) {
59+
this.clientMetrics = clientMetrics;
60+
this.netClient = netClient;
61+
}
62+
63+
private Http2ClientChannelInitializer http2Initializer(HttpClientOptions options) {
64+
if (options.getHttp2MultiplexImplementation()) {
65+
return new Http2MultiplexClientChannelInitializer(
66+
HttpUtils.fromVertxSettings(options.getInitialSettings()),
67+
clientMetrics,
68+
TimeUnit.SECONDS.toMillis(options.getHttp2KeepAliveTimeout()),
69+
options.getHttp2MultiplexingLimit(),
70+
options.isDecompressionSupported(),
71+
options.getLogActivity());
72+
} else {
73+
return new Http2CodecClientChannelInitializer(options, clientMetrics);
74+
}
75+
}
76+
77+
private void connect(ContextInternal context, HttpConnectParams params, HostAndPort authority, SocketAddress server, Promise<NetSocket> promise) {
78+
ConnectOptions connectOptions = new ConnectOptions();
79+
connectOptions.setRemoteAddress(server);
80+
if (authority != null) {
81+
connectOptions.setHost(authority.host());
82+
connectOptions.setPort(authority.port());
83+
if (params.ssl && params.options.isForceSni()) {
84+
connectOptions.setSniServerName(authority.host());
85+
}
86+
}
87+
connectOptions.setSsl(params.ssl);
88+
if (params.ssl) {
89+
if (params.sslOptions != null) {
90+
connectOptions.setSslOptions(params.sslOptions.copy().setUseAlpn(params.useAlpn));
91+
} else {
92+
connectOptions.setSslOptions(new ClientSSLOptions().setHostnameVerificationAlgorithm("HTTPS"));
93+
}
94+
}
95+
connectOptions.setProxyOptions(params.proxyOptions);
96+
netClient.connectInternal(connectOptions, promise, context);
97+
}
98+
99+
public Future<HttpClientConnection> wrap(ContextInternal context, HttpConnectParams params, HostAndPort authority, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics, SocketAddress server, NetSocket so_) {
100+
NetSocketImpl so = (NetSocketImpl) so_;
101+
Object metric = so.metric();
102+
PromiseInternal<HttpClientConnection> promise = context.promise();
103+
104+
// Remove all un-necessary handlers
105+
ChannelPipeline pipeline = so.channelHandlerContext().pipeline();
106+
List<ChannelHandler> removedHandlers = new ArrayList<>();
107+
for (Map.Entry<String, ChannelHandler> stringChannelHandlerEntry : pipeline) {
108+
ChannelHandler handler = stringChannelHandlerEntry.getValue();
109+
if (!(handler instanceof SslHandler)) {
110+
removedHandlers.add(handler);
111+
}
112+
}
113+
removedHandlers.forEach(pipeline::remove);
114+
115+
//
116+
Channel ch = so.channelHandlerContext().channel();
117+
if (params.ssl) {
118+
String protocol = so.applicationLayerProtocol();
119+
if (params.useAlpn) {
120+
if ("h2".equals(protocol)) {
121+
applyHttp2ConnectionOptions(ch.pipeline(), params.options);
122+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
123+
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
124+
} else {
125+
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
126+
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
127+
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
128+
http1xConnected(params.options, fallbackProtocol, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
129+
}
130+
} else {
131+
applyHttp1xConnectionOptions(ch.pipeline(), params.options);
132+
http1xConnected(params.options, params.version, server, authority, true, context, metric, maxLifetimeMillis, ch, metrics, promise);
133+
}
134+
} else {
135+
if (params.version == HttpVersion.HTTP_2) {
136+
if (params.options.isHttp2ClearTextUpgrade()) {
137+
applyHttp1xConnectionOptions(pipeline, params.options);
138+
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
139+
} else {
140+
applyHttp2ConnectionOptions(pipeline, params.options);
141+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(params.options);
142+
http2ChannelInitializer.http2Connected(context, authority, metric, maxLifetimeMillis, ch, metrics, promise);
143+
}
144+
} else {
145+
applyHttp1xConnectionOptions(pipeline, params.options);
146+
http1xConnected(params.options, params.version, server, authority, false, context, metric, maxLifetimeMillis, ch, metrics, promise);
147+
}
148+
}
149+
return promise.future();
150+
}
151+
152+
public Future<HttpClientConnection> httpConnect(ContextInternal context, SocketAddress server, HostAndPort authority, HttpConnectParams params, long maxLifetimeMillis, ClientMetrics<?, ?, ?> metrics) {
153+
Promise<NetSocket> promise = context.promise();
154+
Future<NetSocket> future = promise.future();
155+
// We perform the compose operation before calling connect to be sure that the composition happens
156+
// before the promise is completed by the connect operation
157+
Future<HttpClientConnection> ret = future.compose(so -> wrap(context, params, authority, maxLifetimeMillis, metrics, server, so));
158+
connect(context, params, authority, server, promise);
159+
return ret;
160+
}
161+
162+
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
163+
int idleTimeout = options.getIdleTimeout();
164+
int readIdleTimeout = options.getReadIdleTimeout();
165+
int writeIdleTimeout = options.getWriteIdleTimeout();
166+
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
167+
pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
168+
}
169+
}
170+
171+
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, HttpClientOptions options) {
172+
int idleTimeout = options.getIdleTimeout();
173+
int readIdleTimeout = options.getReadIdleTimeout();
174+
int writeIdleTimeout = options.getWriteIdleTimeout();
175+
if (idleTimeout > 0 || readIdleTimeout > 0 || writeIdleTimeout > 0) {
176+
pipeline.addLast("idle", new IdleStateHandler(readIdleTimeout, writeIdleTimeout, idleTimeout, options.getIdleTimeoutUnit()));
177+
}
178+
if (options.getLogActivity()) {
179+
pipeline.addLast("logging", new LoggingHandler(options.getActivityLogDataFormat()));
180+
}
181+
pipeline.addLast("codec", new HttpClientCodec(
182+
options.getMaxInitialLineLength(),
183+
options.getMaxHeaderSize(),
184+
options.getMaxChunkSize(),
185+
false,
186+
!HttpHeadersInternal.DISABLE_HTTP_HEADERS_VALIDATION,
187+
options.getDecoderInitialBufferSize()));
188+
if (options.isDecompressionSupported()) {
189+
pipeline.addLast("inflater", new HttpContentDecompressor(false));
190+
}
191+
}
192+
193+
private void http1xConnected(HttpClientOptions options,
194+
HttpVersion version,
195+
SocketAddress server,
196+
HostAndPort authority,
197+
boolean ssl,
198+
ContextInternal context,
199+
Object socketMetric,
200+
long maxLifetimeMillis, Channel ch,
201+
ClientMetrics<?, ?, ?> metrics,
202+
Promise<HttpClientConnection> future) {
203+
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
204+
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
205+
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, clientMetrics, options, chctx, ssl, server, authority, context, metrics, maxLifetimeMillis);
206+
if (clientMetrics != null) {
207+
conn.metric(socketMetric);
208+
clientMetrics.endpointConnected(metrics);
209+
}
210+
return conn;
211+
});
212+
clientHandler.addHandler(conn -> {
213+
if (upgrade) {
214+
Http2ClientChannelInitializer http2ChannelInitializer = http2Initializer(options);
215+
Http2UpgradeClientConnection.Http2ChannelUpgrade channelUpgrade= http2ChannelInitializer.channelUpgrade(conn, maxLifetimeMillis, metrics);
216+
boolean preflightRequest = options.isHttp2ClearTextUpgradeWithPreflightRequest();
217+
if (preflightRequest) {
218+
Http2UpgradeClientConnection conn2 = new Http2UpgradeClientConnection(conn, maxLifetimeMillis, metrics, channelUpgrade);
219+
conn2.concurrencyChangeHandler(concurrency -> {
220+
// Ignore
221+
});
222+
conn2.createStream(conn.context()).onComplete(ar -> {
223+
if (ar.succeeded()) {
224+
HttpClientStream stream = ar.result();
225+
stream.headHandler(resp -> {
226+
Http2UpgradeClientConnection connection = (Http2UpgradeClientConnection) stream.connection();
227+
HttpClientConnection unwrap = connection.unwrap();
228+
future.tryComplete(unwrap);
229+
});
230+
stream.exceptionHandler(future::tryFail);
231+
HttpRequestHead request = new HttpRequestHead(OPTIONS, "/", HttpHeaders.headers(), HostAndPort.authority(server.host(), server.port()),
232+
"http://" + server + "/", null);
233+
stream.writeHead(request, false, null, true, null, false);
234+
} else {
235+
future.fail(ar.cause());
236+
}
237+
});
238+
} else {
239+
future.complete(new Http2UpgradeClientConnection(conn, maxLifetimeMillis, metrics, channelUpgrade));
240+
}
241+
} else {
242+
future.complete(conn);
243+
}
244+
});
245+
ch.pipeline().addLast("handler", clientHandler);
246+
}
247+
}

0 commit comments

Comments
 (0)