Skip to content

Commit 663c72a

Browse files
committed
Decouple HttpChannelConnector from HttpClientBase object.
1 parent d718a3d commit 663c72a

File tree

6 files changed

+75
-76
lines changed

6 files changed

+75
-76
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
6969

7070
private static final Handler<Object> INVALID_MSG_HANDLER = ReferenceCountUtil::release;
7171

72-
private final HttpClientBase client;
72+
private final HttpClientMetrics clientMetrics;
7373
private final HttpClientOptions options;
7474
private final boolean ssl;
7575
private final SocketAddress server;
@@ -95,7 +95,8 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
9595
private long lastResponseReceivedTimestamp;
9696

9797
Http1xClientConnection(HttpVersion version,
98-
HttpClientBase client,
98+
HttpClientMetrics clientMetrics,
99+
HttpClientOptions options,
99100
ChannelHandlerContext chctx,
100101
boolean ssl,
101102
SocketAddress server,
@@ -104,8 +105,8 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
104105
ClientMetrics metrics,
105106
long maxLifetime) {
106107
super(context, chctx);
107-
this.client = client;
108-
this.options = client.options();
108+
this.clientMetrics = clientMetrics;
109+
this.options = options;
109110
this.ssl = ssl;
110111
this.server = server;
111112
this.authority = authority;
@@ -941,7 +942,7 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
941942
}
942943

943944
public HttpClientMetrics metrics() {
944-
return client.metrics();
945+
return clientMetrics;
945946
}
946947

947948
/**
@@ -1016,7 +1017,7 @@ synchronized void toWebSocket(
10161017
if (future.isSuccess()) {
10171018

10181019
VertxHandler<WebSocketConnectionImpl> handler = VertxHandler.create(ctx -> {
1019-
WebSocketConnectionImpl conn = new WebSocketConnectionImpl(context, ctx, false, TimeUnit.SECONDS.toMillis(options.getClosingTimeout()), client.metrics());
1020+
WebSocketConnectionImpl conn = new WebSocketConnectionImpl(context, ctx, false, TimeUnit.SECONDS.toMillis(options.getClosingTimeout()), clientMetrics);
10201021
WebSocketImpl webSocket = new WebSocketImpl(
10211022
context,
10221023
conn,
@@ -1037,9 +1038,8 @@ synchronized void toWebSocket(
10371038
ws.subProtocol(handshaker.actualSubprotocol());
10381039
ws.registerHandler(vertx.eventBus());
10391040

1040-
HttpClientMetrics metrics = client.metrics();
1041-
if (metrics != null) {
1042-
ws.setMetric(metrics.connected(ws));
1041+
if (clientMetrics != null) {
1042+
ws.setMetric(clientMetrics.connected(ws));
10431043
}
10441044
ws.pause();
10451045
Deque<WebSocketFrame> toResubmit = pendingFrames;
@@ -1176,8 +1176,7 @@ protected void handleClosed() {
11761176
super.handleClosed();
11771177
closed = true;
11781178
if (metrics != null) {
1179-
HttpClientMetrics met = client.metrics();
1180-
met.endpointDisconnected(metrics);
1179+
clientMetrics.endpointDisconnected(metrics);
11811180
}
11821181
if (!evicted) {
11831182
evicted = true;

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
*/
5252
public class HttpChannelConnector {
5353

54-
private final HttpClientBase client;
54+
private final HttpClientMetrics clientMetrics;
5555
private final NetClientInternal netClient;
5656
private final HttpClientOptions options;
5757
private final ClientSSLOptions sslOptions;
@@ -65,10 +65,11 @@ public class HttpChannelConnector {
6565
private final long maxLifetime;
6666
private final Http2ClientChannelInitializer http2ChannelInitializer;
6767

68-
public HttpChannelConnector(HttpClientBase client,
69-
NetClientInternal netClient,
68+
public HttpChannelConnector(NetClientInternal netClient,
69+
HttpClientOptions options,
7070
ClientSSLOptions sslOptions,
7171
ProxyOptions proxyOptions,
72+
HttpClientMetrics clientMetrics,
7273
ClientMetrics metrics,
7374
HttpVersion version,
7475
boolean ssl,
@@ -78,25 +79,25 @@ public HttpChannelConnector(HttpClientBase client,
7879
long maxLifetimeMillis) {
7980

8081
Http2ClientChannelInitializer http2ChannelInitializer;
81-
if (client.options.getHttp2MultiplexImplementation()) {
82+
if (options.getHttp2MultiplexImplementation()) {
8283
http2ChannelInitializer = new Http2MultiplexClientChannelInitializer(
83-
HttpUtils.fromVertxSettings(client.options.getInitialSettings()),
84-
client.metrics,
84+
HttpUtils.fromVertxSettings(options.getInitialSettings()),
85+
clientMetrics,
8586
metrics,
86-
TimeUnit.SECONDS.toMillis(client.options.getHttp2KeepAliveTimeout()),
87+
TimeUnit.SECONDS.toMillis(options.getHttp2KeepAliveTimeout()),
8788
maxLifetimeMillis,
8889
authority,
89-
client.options.getHttp2MultiplexingLimit(),
90-
client.options.isDecompressionSupported(),
91-
client.options.getLogActivity());
90+
options.getHttp2MultiplexingLimit(),
91+
options.isDecompressionSupported(),
92+
options.getLogActivity());
9293
} else {
93-
http2ChannelInitializer = new Http2CodecClientChannelInitializer(client, metrics, maxLifetimeMillis, authority);
94+
http2ChannelInitializer = new Http2CodecClientChannelInitializer(options, clientMetrics, metrics, maxLifetimeMillis, authority);
9495
}
9596

96-
this.client = client;
97+
this.clientMetrics = clientMetrics;
9798
this.netClient = netClient;
9899
this.metrics = metrics;
99-
this.options = client.options();
100+
this.options = options;
100101
this.proxyOptions = proxyOptions;
101102
this.sslOptions = sslOptions;
102103
this.ssl = ssl;
@@ -235,11 +236,10 @@ private void http1xConnected(HttpVersion version,
235236
Promise<HttpClientConnection> future) {
236237
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
237238
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
238-
HttpClientMetrics met = client.metrics();
239-
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, authority, context, metrics, maxLifetime);
240-
if (met != null) {
239+
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, clientMetrics, options, chctx, ssl, server, authority, context, metrics, maxLifetime);
240+
if (clientMetrics != null) {
241241
conn.metric(socketMetric);
242-
met.endpointConnected(metrics);
242+
clientMetrics.endpointConnected(metrics);
243243
}
244244
return conn;
245245
});

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private Function<EndpointKey, SharedHttpClientConnectionGroup> httpEndpointProvi
137137
key = new EndpointKey(key.ssl, key.sslOptions, proxyOptions, server, key.authority);
138138
proxyOptions = null;
139139
}
140-
HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, clientMetrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, maxLifetime);
140+
HttpChannelConnector connector = new HttpChannelConnector(netClient, options, key.sslOptions, proxyOptions, metrics, clientMetrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, maxLifetime);
141141
return new SharedHttpClientConnectionGroup(
142142
vertx,
143143
HttpClientImpl.this,
@@ -234,10 +234,11 @@ public Future<io.vertx.core.http.HttpClientConnection> connect(HttpConnectOption
234234
}
235235
checkClosed();
236236
HttpChannelConnector connector = new HttpChannelConnector(
237-
this,
238237
netClient,
238+
options,
239239
sslOptions,
240240
proxyOptions,
241+
metrics,
241242
clientMetrics,
242243
options.getProtocolVersion(),
243244
useSSL,

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ void webSocket(ContextInternal ctx, WebSocketConnectOptions connectOptions, Prom
7171
int maxPoolSize = options.getMaxConnections();
7272
ClientMetrics clientMetrics = WebSocketClientImpl.this.metrics != null ? WebSocketClientImpl.this.metrics.createEndpointMetrics(key_.server, maxPoolSize) : null;
7373
PoolMetrics queueMetrics = WebSocketClientImpl.this.metrics != null ? vertx.metrics().createPoolMetrics("ws", key_.server.toString(), maxPoolSize) : null;
74-
HttpChannelConnector connector = new HttpChannelConnector(WebSocketClientImpl.this, netClient, sslOptions, key_.proxyOptions, clientMetrics, HttpVersion.HTTP_1_1, key_.ssl, false, key_.authority, key_.server, 0);
74+
HttpChannelConnector connector = new HttpChannelConnector(netClient, super.options, sslOptions, key_.proxyOptions, metrics,
75+
clientMetrics, HttpVersion.HTTP_1_1, key_.ssl, false, key_.authority, key_.server, 0);
7576
return new WebSocketGroup(null, queueMetrics, options, maxPoolSize, connector);
7677
};
7778
webSocketCM

vertx-core/src/main/java/io/vertx/core/http/impl/http2/codec/Http2ClientConnectionImpl.java

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
*/
3434
public class Http2ClientConnectionImpl extends Http2ConnectionImpl implements HttpClientConnection, Http2ClientConnection {
3535

36-
private final HttpClientBase client;
36+
private final HttpClientMetrics clientMetrics;
37+
private final HttpClientOptions options;
3738
private final ClientMetrics metrics;
3839
private final HostAndPort authority;
3940
private final long lifetimeEvictionTimestamp;
@@ -43,24 +44,22 @@ public class Http2ClientConnectionImpl extends Http2ConnectionImpl implements Ht
4344
private boolean evicted;
4445
private final VertxHttp2ConnectionHandler handler;
4546

46-
Http2ClientConnectionImpl(HttpClientBase client,
47-
ContextInternal context,
47+
Http2ClientConnectionImpl(ContextInternal context,
4848
HostAndPort authority,
4949
VertxHttp2ConnectionHandler connHandler,
50+
HttpClientMetrics clientMetrics,
5051
ClientMetrics metrics,
52+
HttpClientOptions options,
5153
long maxLifetime) {
5254
super(context, connHandler);
5355
this.metrics = metrics;
54-
this.client = client;
56+
this.clientMetrics = clientMetrics;
57+
this.options = options;
5558
this.authority = authority;
5659
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
5760
this.handler = connHandler;
5861
}
5962

60-
HttpClientBase client() {
61-
return client;
62-
}
63-
6463
@Override
6564
public MultiMap newHttpRequestHeaders() {
6665
return new HttpResponseHeaders(new DefaultHttp2Headers());
@@ -90,7 +89,7 @@ public Http2ClientConnectionImpl concurrencyChangeHandler(Handler<Long> handler)
9089

9190
public long concurrency() {
9291
long concurrency = remoteSettings().getMaxConcurrentStreams();
93-
long http2MaxConcurrency = client.options().getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : client.options().getHttp2MultiplexingLimit();
92+
long http2MaxConcurrency = options.getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : options.getHttp2MultiplexingLimit();
9493
if (http2MaxConcurrency > 0) {
9594
concurrency = Math.min(concurrency, http2MaxConcurrency);
9695
}
@@ -136,7 +135,7 @@ private void tryEvict() {
136135

137136
@Override
138137
protected void concurrencyChanged(long concurrency) {
139-
int limit = client.options().getHttp2MultiplexingLimit();
138+
int limit = options.getHttp2MultiplexingLimit();
140139
if (limit > 0) {
141140
concurrency = Math.min(concurrency, limit);
142141
}
@@ -145,7 +144,7 @@ protected void concurrencyChanged(long concurrency) {
145144

146145
@Override
147146
public HttpClientMetrics metrics() {
148-
return client.metrics();
147+
return clientMetrics;
149148
}
150149

151150
ClientMetrics clientMetrics() {
@@ -154,7 +153,7 @@ ClientMetrics clientMetrics() {
154153

155154
public HttpClientStream upgradeStream(Object metric, Object trace, ContextInternal context) {
156155
Http2Stream nettyStream = handler.connection().stream(1);
157-
Http2ClientStream s = Http2ClientStream.create(nettyStream.id(), this, context, client.options.getTracingPolicy(), client.options.isDecompressionSupported(), clientMetrics(), isWritable(1));
156+
Http2ClientStream s = Http2ClientStream.create(nettyStream.id(), this, context, options.getTracingPolicy(), options.isDecompressionSupported(), clientMetrics(), isWritable(1));
158157
s.upgrade(metric, trace);
159158
nettyStream.setProperty(streamKey, s);
160159
return s.unwrap();
@@ -176,13 +175,13 @@ private Http2ClientStream createStream0(ContextInternal context) {
176175
return Http2ClientStream.create(
177176
this,
178177
context,
179-
client.options.getTracingPolicy(),
180-
client.options.isDecompressionSupported(),
178+
options.getTracingPolicy(),
179+
options.isDecompressionSupported(),
181180
clientMetrics());
182181
}
183182

184183
private void recycle() {
185-
int timeout = client.options().getHttp2KeepAliveTimeout();
184+
int timeout = options.getHttp2KeepAliveTimeout();
186185
expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000L : Long.MAX_VALUE;
187186
}
188187

@@ -231,7 +230,7 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
231230
if (stream != null) {
232231
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
233232
// Http2ClientStreamImpl pushStream = new Http2ClientStreamImpl(this, context, client.options.getTracingPolicy(), client.options.isDecompressionSupported(), clientMetrics());
234-
Http2ClientStream s = Http2ClientStream.create(this, context, client.options.getTracingPolicy(), client.options.isDecompressionSupported(), clientMetrics());
233+
Http2ClientStream s = Http2ClientStream.create(this, context, options.getTracingPolicy(), options.isDecompressionSupported(), clientMetrics());
235234
// pushStream.init(s);
236235
// pushStream.stream = s;
237236
promisedStream.setProperty(streamKey, s);
@@ -252,22 +251,21 @@ protected void handleIdle(IdleStateEvent event) {
252251
}
253252

254253
public static VertxHttp2ConnectionHandler<Http2ClientConnectionImpl> createHttp2ConnectionHandler(
255-
HttpClientBase client,
254+
HttpClientOptions options,
255+
HttpClientMetrics clientMetrics,
256256
ClientMetrics metrics,
257257
ContextInternal context,
258258
boolean upgrade,
259259
Object socketMetric,
260260
HostAndPort authority,
261261
long maxLifetime) {
262-
HttpClientOptions options = client.options();
263-
HttpClientMetrics met = client.metrics();
264262
VertxHttp2ConnectionHandler<Http2ClientConnectionImpl> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnectionImpl>()
265263
.server(false)
266-
.useDecompression(client.options().isDecompressionSupported())
264+
.useDecompression(options.isDecompressionSupported())
267265
.gracefulShutdownTimeoutMillis(0) // So client close tests don't hang 30 seconds - make this configurable later but requires HTTP/1 impl
268-
.initialSettings(client.options().getInitialSettings())
266+
.initialSettings(options.getInitialSettings())
269267
.connectionFactory(connHandler -> {
270-
Http2ClientConnectionImpl conn = new Http2ClientConnectionImpl(client, context, authority, connHandler, metrics, maxLifetime);
268+
Http2ClientConnectionImpl conn = new Http2ClientConnectionImpl(context, authority, connHandler, clientMetrics, metrics, options, maxLifetime);
271269
if (metrics != null) {
272270
Object m = socketMetric;
273271
conn.metric(m);
@@ -279,13 +277,13 @@ public static VertxHttp2ConnectionHandler<Http2ClientConnectionImpl> createHttp2
279277
handler.addHandler(conn -> {
280278
if (metrics != null) {
281279
if (!upgrade) {
282-
met.endpointConnected(metrics);
280+
clientMetrics.endpointConnected(metrics);
283281
}
284282
}
285283
});
286284
handler.removeHandler(conn -> {
287285
if (metrics != null) {
288-
met.endpointDisconnected(metrics);
286+
clientMetrics.endpointDisconnected(metrics);
289287
}
290288
conn.tryEvict();
291289
});

0 commit comments

Comments
 (0)