Skip to content

Commit cf2cbeb

Browse files
committed
Move channel connector construction outside of HttpClient
1 parent 6e5fdf1 commit cf2cbeb

File tree

7 files changed

+55
-19
lines changed

7 files changed

+55
-19
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xOrH2ChannelConnector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.vertx.core.spi.metrics.ClientMetrics;
3838
import io.vertx.core.spi.metrics.HttpClientMetrics;
3939

40+
import java.time.Duration;
4041
import java.util.ArrayList;
4142
import java.util.List;
4243
import java.util.Map;
@@ -244,4 +245,14 @@ private void http1xConnected(HttpClientOptions options,
244245
});
245246
ch.pipeline().addLast("handler", clientHandler);
246247
}
248+
249+
@Override
250+
public Future<Void> shutdown(Duration timeout) {
251+
return netClient.shutdown(timeout.toMillis(), TimeUnit.MILLISECONDS);
252+
}
253+
254+
@Override
255+
public Future<Void> close() {
256+
return netClient.close();
257+
}
247258
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.vertx.core.net.SocketAddress;
1818
import io.vertx.core.spi.metrics.ClientMetrics;
1919

20+
import java.time.Duration;
21+
2022
/**
2123
* Performs the channel configuration and connection according to the client options and the protocol version.
2224
*
@@ -31,4 +33,8 @@ Future<HttpClientConnection> httpConnect(ContextInternal context,
3133
long maxLifetimeMillis,
3234
ClientMetrics<?, ?, ?> metrics);
3335

36+
Future<Void> shutdown(Duration timeout);
37+
38+
Future<Void> close();
39+
3440
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientBase.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.vertx.core.spi.metrics.Metrics;
2424
import io.vertx.core.spi.metrics.MetricsProvider;
2525

26+
import java.time.Duration;
2627
import java.util.*;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.function.Predicate;
@@ -34,15 +35,15 @@ public class HttpClientBase implements MetricsProvider, Closeable {
3435

3536
protected final VertxInternal vertx;
3637
public final HttpClientOptions options;
37-
protected final NetClientInternal netClient;
38-
protected final HttpClientMetrics metrics;
38+
protected final HttpChannelConnector connector;
39+
protected final HttpClientMetrics<?, ?, ?> metrics;
3940
protected final CloseSequence closeSequence;
4041
private volatile ClientSSLOptions defaultSslOptions;
4142
private long closeTimeout = 0L;
4243
private TimeUnit closeTimeoutUnit = TimeUnit.SECONDS;
4344
private Predicate<SocketAddress> proxyFilter;
4445

45-
public HttpClientBase(VertxInternal vertx, HttpClientOptions options) {
46+
public HttpClientBase(VertxInternal vertx, HttpClientOptions options, HttpChannelConnector connector, HttpClientMetrics<?, ?, ?> metrics) {
4647
if (!options.isKeepAlive() && options.isPipelining()) {
4748
throw new IllegalStateException("Cannot have pipelining with no keep alive");
4849
}
@@ -60,7 +61,8 @@ public HttpClientBase(VertxInternal vertx, HttpClientOptions options) {
6061
this.options = options;
6162
this.closeSequence = new CloseSequence(p -> doClose(p), p1 -> doShutdown(p1));
6263
this.proxyFilter = options.getNonProxyHosts() != null ? ProxyFilter.nonProxyHosts(options.getNonProxyHosts()) : ProxyFilter.DEFAULT_PROXY_FILTER;
63-
this.netClient = new NetClientBuilder(vertx, new NetClientOptions(options).setProxyOptions(null)).metrics(metrics).build();
64+
this.connector = connector;
65+
// this.netClient = new NetClientBuilder(vertx, new NetClientOptions(options).setProxyOptions(null)).metrics(metrics).build();
6466
this.defaultSslOptions = options.getSslOptions();
6567

6668
ClientSSLOptions sslOptions = options.getSslOptions();
@@ -76,7 +78,7 @@ private void configureSSLOptions(ClientSSLOptions sslOptions) {
7678
}
7779

7880
public NetClientInternal netClient() {
79-
return netClient;
81+
return null;
8082
}
8183

8284
public Future<Void> closeFuture() {
@@ -144,11 +146,11 @@ public HttpClientMetrics metrics() {
144146
}
145147

146148
protected void doShutdown(Completable<Void> p) {
147-
netClient.shutdown(closeTimeout, closeTimeoutUnit).onComplete(p);
149+
connector.shutdown(Duration.ofMillis(closeTimeoutUnit.toMillis(closeTimeout))).onComplete(p);
148150
}
149151

150152
protected void doClose(Completable<Void> p) {
151-
netClient.close().onComplete(p);
153+
connector.close().onComplete(p);
152154
}
153155

154156
public Future<Void> shutdown(long timeout, TimeUnit unit) {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientBuilderInternal.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
import io.vertx.core.internal.ContextInternal;
1010
import io.vertx.core.internal.VertxInternal;
1111
import io.vertx.core.internal.http.HttpClientInternal;
12+
import io.vertx.core.internal.net.NetClientInternal;
13+
import io.vertx.core.net.NetClientOptions;
1214
import io.vertx.core.net.endpoint.LoadBalancer;
1315
import io.vertx.core.net.AddressResolver;
1416
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
1517
import io.vertx.core.net.endpoint.EndpointResolver;
18+
import io.vertx.core.net.impl.tcp.NetClientBuilder;
19+
import io.vertx.core.spi.metrics.HttpClientMetrics;
1620

1721
import java.util.function.Function;
1822

@@ -89,6 +93,13 @@ private EndpointResolver endpointResolver(HttpClientOptions co) {
8993
return null;
9094
}
9195

96+
private HttpClientImpl createHttpClientImpl(EndpointResolver resolver, HttpClientOptions co, PoolOptions po) {
97+
HttpClientMetrics<?, ?, ?> metrics = vertx.metrics() != null ? vertx.metrics().createHttpClientMetrics(co) : null;
98+
NetClientInternal tcpClient = new NetClientBuilder(vertx, new NetClientOptions(co).setProxyOptions(null)).metrics(metrics).build();
99+
HttpChannelConnector channelConnector = new Http1xOrH2ChannelConnector(tcpClient, metrics);
100+
return new HttpClientImpl(vertx, channelConnector, metrics, resolver, co, po);
101+
}
102+
92103
@Override
93104
public HttpClientAgent build() {
94105
HttpClientOptions co = clientOptions != null ? clientOptions : new HttpClientOptions();
@@ -100,15 +111,14 @@ public HttpClientAgent build() {
100111
if (co.isShared()) {
101112
CloseFuture closeFuture = new CloseFuture();
102113
client = vertx.createSharedResource("__vertx.shared.httpClients", co.getName(), closeFuture, cf_ -> {
103-
104-
HttpClientImpl impl = new HttpClientImpl(vertx, resolver, co, po);
114+
HttpClientImpl impl = createHttpClientImpl(resolver, co, po);
105115
cf_.add(completion -> impl.close().onComplete(completion));
106116
return impl;
107117
});
108118
client = new CleanableHttpClient((HttpClientInternal) client, vertx.cleaner(), (timeout, timeunit) -> closeFuture.close());
109119
closeable = closeFuture;
110120
} else {
111-
HttpClientImpl impl = new HttpClientImpl(vertx, resolver, co, po);
121+
HttpClientImpl impl = createHttpClientImpl(resolver, co, po);
112122
closeable = impl;
113123
client = new CleanableHttpClient(impl, vertx.cleaner(), impl::shutdown);
114124
}

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.vertx.core.net.endpoint.ServerInteraction;
2828
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
2929
import io.vertx.core.spi.metrics.ClientMetrics;
30+
import io.vertx.core.spi.metrics.HttpClientMetrics;
3031
import io.vertx.core.spi.metrics.MetricsProvider;
3132
import io.vertx.core.spi.metrics.PoolMetrics;
3233

@@ -49,22 +50,22 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
4950
private final PoolOptions poolOptions;
5051
private final ResourceManager<EndpointKey, SharedHttpClientConnectionGroup> httpCM;
5152
private final EndpointResolverInternal endpointResolver;
52-
private final HttpChannelConnector connector;
5353
private volatile Function<HttpClientResponse, Future<RequestOptions>> redirectHandler = DEFAULT_REDIRECT_HANDLER;
5454
private long timerID;
5555
volatile Handler<HttpConnection> connectionHandler;
5656
private final Function<ContextInternal, ContextInternal> contextProvider;
5757
private final long maxLifetime;
5858

5959
public HttpClientImpl(VertxInternal vertx,
60+
HttpChannelConnector connector,
61+
HttpClientMetrics<?, ?, ?> metrics,
6062
EndpointResolver endpointResolver,
6163
HttpClientOptions options,
6264
PoolOptions poolOptions) {
63-
super(vertx, options);
65+
super(vertx, options, connector, metrics);
6466

6567
this.endpointResolver = (EndpointResolverImpl) endpointResolver;
6668
this.poolOptions = poolOptions;
67-
this.connector = new Http1xOrH2ChannelConnector(netClient, metrics);
6869
this.httpCM = new ResourceManager<>();
6970
if (poolCheckerIsNeeded(options, poolOptions)) {
7071
PoolChecker checker = new PoolChecker(this);

vertx-core/src/main/java/io/vertx/core/http/impl/WebSocketClientImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.vertx.core.net.ProxyOptions;
2525
import io.vertx.core.net.SocketAddress;
2626
import io.vertx.core.spi.metrics.ClientMetrics;
27+
import io.vertx.core.spi.metrics.HttpClientMetrics;
2728
import io.vertx.core.spi.metrics.PoolMetrics;
2829

2930
import java.net.URI;
@@ -34,15 +35,13 @@
3435
public class WebSocketClientImpl extends HttpClientBase implements WebSocketClient {
3536

3637
private final WebSocketClientOptions options;
37-
private final HttpChannelConnector connector;
3838
private final ResourceManager<EndpointKey, WebSocketGroup> webSocketCM;
3939

40-
public WebSocketClientImpl(VertxInternal vertx, HttpClientOptions options, WebSocketClientOptions wsOptions) {
41-
super(vertx, options);
40+
public WebSocketClientImpl(VertxInternal vertx, HttpClientOptions options, WebSocketClientOptions wsOptions, HttpChannelConnector connector, HttpClientMetrics<?, ?, ?> metrics) {
41+
super(vertx, options, connector, metrics);
4242

4343
this.options = wsOptions;
4444
this.webSocketCM = new ResourceManager<>();
45-
this.connector = new Http1xOrH2ChannelConnector(netClient, metrics);
4645
}
4746

4847
protected void doShutdown(Completable<Void> p) {

vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,21 +411,28 @@ public WebSocketClient createWebSocketClient(WebSocketClientOptions options) {
411411
if (options.isShared()) {
412412
CloseFuture closeFuture = new CloseFuture();
413413
client = createSharedResource("__vertx.shared.webSocketClients", options.getName(), closeFuture, cf_ -> {
414-
WebSocketClientImpl impl = new WebSocketClientImpl(this, o, options);
414+
WebSocketClientImpl impl = createWebSocketClientImpl(o, options);
415415
cf_.add(completion -> impl.close().onComplete(completion));
416416
return impl;
417417
});
418418
client = new CleanableWebSocketClient(client, cleaner, (timeout, timeunit) -> closeFuture.close());
419419
closeable = closeFuture;
420420
} else {
421-
WebSocketClientImpl impl = new WebSocketClientImpl(this, o, options);
421+
WebSocketClientImpl impl = createWebSocketClientImpl(o, options);
422422
closeable = impl;
423423
client = new CleanableWebSocketClient(impl, cleaner, impl::shutdown);
424424
}
425425
cf.add(closeable);
426426
return client;
427427
}
428428

429+
private WebSocketClientImpl createWebSocketClientImpl(HttpClientOptions o, WebSocketClientOptions options) {
430+
HttpClientMetrics<?, ?, ?> metrics = metrics() != null ? metrics().createHttpClientMetrics(o) : null;
431+
NetClientInternal tcpClient = new NetClientBuilder(this, new NetClientOptions(o).setProxyOptions(null)).metrics(metrics).build();
432+
Http1xOrH2ChannelConnector channelConnector = new Http1xOrH2ChannelConnector(tcpClient, metrics);
433+
return new WebSocketClientImpl(this, o, options, channelConnector, metrics);
434+
}
435+
429436
@Override
430437
public HttpClientBuilder httpClientBuilder() {
431438
return new HttpClientBuilderInternal(this);

0 commit comments

Comments
 (0)