Skip to content

Commit b3b1942

Browse files
committed
Support max lifetime on HTTP Client pool
Closes #5360 It would help in some cases: - make sure a connection does not live longer than authorized by some firewall - give an opportunity to the client to find new backend replicas started after the pool reached its maximum size Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent 4c15593 commit b3b1942

File tree

8 files changed

+167
-41
lines changed

8 files changed

+167
-41
lines changed

vertx-core/src/main/java/io/vertx/core/http/PoolOptions.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.vertx.core.impl.Arguments;
1616
import io.vertx.core.json.JsonObject;
1717

18+
import java.util.concurrent.TimeUnit;
19+
1820
/**
1921
* Options configuring a {@link HttpClient} pool.
2022
*
@@ -39,6 +41,16 @@ public class PoolOptions {
3941
*/
4042
public static final int DEFAULT_MAX_WAIT_QUEUE_SIZE = -1;
4143

44+
/**
45+
* Default maximum pooled connection lifetime = 0 (no maximum)
46+
*/
47+
public static final int DEFAULT_MAXIMUM_LIFETIME = 0;
48+
49+
/**
50+
* Default maximum pooled connection lifetime unit = seconds
51+
*/
52+
public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS;
53+
4254
/**
4355
* Default pool cleaner period = 1000 ms (1 second)
4456
*/
@@ -51,6 +63,8 @@ public class PoolOptions {
5163

5264
private int http1MaxSize;
5365
private int http2MaxSize;
66+
private int maxLifetime;
67+
private TimeUnit maxLifetimeUnit;
5468
private int cleanerPeriod;
5569
private int eventLoopSize;
5670
private int maxWaitQueueSize;
@@ -61,6 +75,8 @@ public class PoolOptions {
6175
public PoolOptions() {
6276
http1MaxSize = DEFAULT_MAX_POOL_SIZE;
6377
http2MaxSize = DEFAULT_HTTP2_MAX_POOL_SIZE;
78+
maxLifetime = DEFAULT_MAXIMUM_LIFETIME;
79+
maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT;
6480
cleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD;
6581
eventLoopSize = DEFAULT_POOL_EVENT_LOOP_SIZE;
6682
maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
@@ -74,6 +90,8 @@ public PoolOptions() {
7490
public PoolOptions(PoolOptions other) {
7591
this.http1MaxSize = other.http1MaxSize;
7692
this.http2MaxSize = other.http2MaxSize;
93+
maxLifetime = other.maxLifetime;
94+
maxLifetimeUnit = other.maxLifetimeUnit;
7795
this.cleanerPeriod = other.cleanerPeriod;
7896
this.eventLoopSize = other.eventLoopSize;
7997
this.maxWaitQueueSize = other.maxWaitQueueSize;
@@ -134,6 +152,45 @@ public PoolOptions setHttp2MaxSize(int max) {
134152
return this;
135153
}
136154

155+
/**
156+
* @return the pooled connection max lifetime unit
157+
*/
158+
public TimeUnit getMaxLifetimeUnit() {
159+
return maxLifetimeUnit;
160+
}
161+
162+
/**
163+
* Establish a max lifetime unit for pooled connections.
164+
*
165+
* @param maxLifetimeUnit pooled connection max lifetime unit
166+
* @return a reference to this, so the API can be used fluently
167+
*/
168+
public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) {
169+
this.maxLifetimeUnit = maxLifetimeUnit;
170+
return this;
171+
}
172+
173+
/**
174+
* @return pooled connection max lifetime
175+
*/
176+
public int getMaxLifetime() {
177+
return maxLifetime;
178+
}
179+
180+
/**
181+
* Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime.
182+
*
183+
* @param maxLifetime the pool connection max lifetime
184+
* @return a reference to this, so the API can be used fluently
185+
*/
186+
public PoolOptions setMaxLifetime(int maxLifetime) {
187+
if (maxLifetime < 0) {
188+
throw new IllegalArgumentException("maxLifetime must be >= 0");
189+
}
190+
this.maxLifetime = maxLifetime;
191+
return this;
192+
}
193+
137194
/**
138195
* @return the connection pool cleaner period in ms.
139196
*/

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
import io.netty.handler.codec.compression.Brotli;
1919
import io.netty.handler.codec.compression.ZlibCodecFactory;
2020
import io.netty.handler.codec.compression.Zstd;
21-
import io.netty.handler.codec.http.HttpHeaders;
2221
import io.netty.handler.codec.http.*;
23-
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
22+
import io.netty.handler.codec.http.HttpHeaders;
2423
import io.netty.handler.codec.http.websocketx.*;
24+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2525
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
2626
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandshaker;
2727
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
@@ -32,19 +32,21 @@
3232
import io.netty.util.concurrent.GenericFutureListener;
3333
import io.vertx.core.*;
3434
import io.vertx.core.buffer.Buffer;
35-
import io.vertx.core.http.WebSocketVersion;
36-
import io.vertx.core.internal.buffer.BufferInternal;
35+
import io.vertx.core.http.*;
3736
import io.vertx.core.http.HttpMethod;
3837
import io.vertx.core.http.HttpVersion;
39-
import io.vertx.core.http.*;
38+
import io.vertx.core.http.WebSocketVersion;
4039
import io.vertx.core.http.impl.headers.HeadersAdaptor;
4140
import io.vertx.core.internal.ContextInternal;
4241
import io.vertx.core.internal.PromiseInternal;
42+
import io.vertx.core.internal.buffer.BufferInternal;
4343
import io.vertx.core.internal.concurrent.InboundMessageQueue;
44+
import io.vertx.core.internal.net.NetSocketInternal;
4445
import io.vertx.core.net.HostAndPort;
4546
import io.vertx.core.net.SocketAddress;
46-
import io.vertx.core.net.impl.*;
47-
import io.vertx.core.internal.net.NetSocketInternal;
47+
import io.vertx.core.net.impl.MessageWrite;
48+
import io.vertx.core.net.impl.NetSocketImpl;
49+
import io.vertx.core.net.impl.VertxHandler;
4850
import io.vertx.core.spi.metrics.ClientMetrics;
4951
import io.vertx.core.spi.metrics.HttpClientMetrics;
5052
import io.vertx.core.spi.tracing.SpanKind;
@@ -75,6 +77,7 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
7577
public final ClientMetrics metrics;
7678
private final HttpVersion version;
7779
private final boolean pooled;
80+
private final long lifetimeEvictionTimestamp;
7881

7982
private final Deque<Stream> requests = new ArrayDeque<>();
8083
private final Deque<Stream> responses = new ArrayDeque<>();
@@ -100,7 +103,8 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
100103
HostAndPort authority,
101104
ContextInternal context,
102105
ClientMetrics metrics,
103-
boolean pooled) {
106+
boolean pooled,
107+
long maxLifetime) {
104108
super(context, chctx);
105109
this.client = client;
106110
this.options = client.options();
@@ -109,6 +113,7 @@ public class Http1xClientConnection extends Http1xConnection implements HttpClie
109113
this.authority = authority;
110114
this.metrics = metrics;
111115
this.version = version;
116+
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
112117
this.keepAliveTimeout = options.getKeepAliveTimeout();
113118
this.expirationTimestamp = expirationTimestampOf(keepAliveTimeout);
114119
this.pooled = pooled;
@@ -1261,7 +1266,8 @@ public long lastResponseReceivedTimestamp() {
12611266

12621267
@Override
12631268
public boolean isValid() {
1264-
return expirationTimestamp == 0 || System.currentTimeMillis() <= expirationTimestamp;
1269+
long now = System.currentTimeMillis();
1270+
return now <= expirationTimestamp && now <= lifetimeEvictionTimestamp;
12651271
}
12661272

12671273
/**
@@ -1271,6 +1277,6 @@ public boolean isValid() {
12711277
* @return the expiration timestamp
12721278
*/
12731279
private static long expirationTimestampOf(long timeout) {
1274-
return timeout == 0 ? 0L : System.currentTimeMillis() + timeout * 1000;
1280+
return timeout > 0 ? System.currentTimeMillis() + timeout * 1000 : Long.MAX_VALUE;
12751281
}
12761282
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
4444
private final ClientMetrics metrics;
4545
private final HostAndPort authority;
4646
private final boolean pooled;
47+
private final long lifetimeEvictionTimestamp;
4748
private Handler<Void> evictionHandler = DEFAULT_EVICTION_HANDLER;
4849
private Handler<Long> concurrencyChangeHandler = DEFAULT_CONCURRENCY_CHANGE_HANDLER;
4950
private long expirationTimestamp;
@@ -53,12 +54,15 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
5354
ContextInternal context,
5455
HostAndPort authority,
5556
VertxHttp2ConnectionHandler connHandler,
56-
ClientMetrics metrics, boolean pooled) {
57+
ClientMetrics metrics,
58+
boolean pooled,
59+
long maxLifetime) {
5760
super(context, connHandler);
5861
this.metrics = metrics;
5962
this.client = client;
6063
this.authority = authority;
6164
this.pooled = pooled;
65+
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
6266
}
6367

6468
@Override
@@ -175,12 +179,13 @@ private StreamImpl createStream2(ContextInternal context) {
175179

176180
private void recycle() {
177181
int timeout = client.options().getHttp2KeepAliveTimeout();
178-
expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000L : 0L;
182+
expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000L : Long.MAX_VALUE;
179183
}
180184

181185
@Override
182186
public boolean isValid() {
183-
return expirationTimestamp == 0 || System.currentTimeMillis() <= expirationTimestamp;
187+
long now = System.currentTimeMillis();
188+
return now <= expirationTimestamp && now <= lifetimeEvictionTimestamp;
184189
}
185190

186191
@Override
@@ -688,7 +693,8 @@ public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2Conn
688693
boolean upgrade,
689694
Object socketMetric,
690695
HostAndPort authority,
691-
boolean pooled) {
696+
boolean pooled,
697+
long maxLifetime) {
692698
HttpClientOptions options = client.options();
693699
HttpClientMetrics met = client.metrics();
694700
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>()
@@ -697,7 +703,7 @@ public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2Conn
697703
.gracefulShutdownTimeoutMillis(0) // So client close tests don't hang 30 seconds - make this configurable later but requires HTTP/1 impl
698704
.initialSettings(client.options().getInitialSettings())
699705
.connectionFactory(connHandler -> {
700-
Http2ClientConnection conn = new Http2ClientConnection(client, context, authority, connHandler, metrics, pooled);
706+
Http2ClientConnection conn = new Http2ClientConnection(client, context, authority, connHandler, metrics, pooled, maxLifetime);
701707
if (metrics != null) {
702708
Object m = socketMetric;
703709
conn.metric(m);

vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import io.netty.buffer.ByteBuf;
1414
import io.netty.buffer.ByteBufHolder;
15-
import io.netty.channel.Channel;
1615
import io.netty.channel.ChannelHandlerContext;
1716
import io.netty.channel.ChannelInboundHandlerAdapter;
1817
import io.netty.channel.ChannelPipeline;
@@ -21,7 +20,10 @@
2120
import io.netty.util.ReferenceCountUtil;
2221
import io.netty.util.concurrent.EventExecutor;
2322
import io.vertx.codegen.annotations.Nullable;
24-
import io.vertx.core.*;
23+
import io.vertx.core.Future;
24+
import io.vertx.core.Handler;
25+
import io.vertx.core.MultiMap;
26+
import io.vertx.core.Promise;
2527
import io.vertx.core.buffer.Buffer;
2628
import io.vertx.core.http.*;
2729
import io.vertx.core.http.HttpVersion;
@@ -53,6 +55,7 @@ public class Http2UpgradeClientConnection implements HttpClientConnectionInterna
5355

5456
private HttpClientBase client;
5557
private HttpClientConnectionInternal current;
58+
private final long maxLifetime;
5659
private boolean upgradeProcessed;
5760

5861
private Handler<Void> closeHandler;
@@ -65,9 +68,10 @@ public class Http2UpgradeClientConnection implements HttpClientConnectionInterna
6568
private Handler<Long> concurrencyChangeHandler;
6669
private Handler<Http2Settings> remoteSettingsHandler;
6770

68-
Http2UpgradeClientConnection(HttpClientBase client, Http1xClientConnection connection) {
71+
Http2UpgradeClientConnection(HttpClientBase client, Http1xClientConnection connection, long maxLifetime) {
6972
this.client = client;
7073
this.current = connection;
74+
this.maxLifetime = maxLifetime;
7175
}
7276

7377
public HttpClientConnectionInternal unwrap() {
@@ -277,6 +281,7 @@ private static class UpgradingStream implements HttpClientStream {
277281
private final Http1xClientConnection upgradingConnection;
278282
private final HttpClientStream upgradingStream;
279283
private final Http2UpgradeClientConnection upgradedConnection;
284+
private final long maxLifetime;
280285
private HttpClientStream upgradedStream;
281286
private Handler<HttpResponseHead> headHandler;
282287
private Handler<Buffer> chunkHandler;
@@ -290,10 +295,11 @@ private static class UpgradingStream implements HttpClientStream {
290295
private Handler<HttpFrame> unknownFrameHandler;
291296
private Handler<Void> closeHandler;
292297

293-
UpgradingStream(HttpClientStream stream, Http2UpgradeClientConnection upgradedConnection, Http1xClientConnection upgradingConnection) {
298+
UpgradingStream(HttpClientStream stream, Http2UpgradeClientConnection upgradedConnection, Http1xClientConnection upgradingConnection, long maxLifetime) {
294299
this.upgradedConnection = upgradedConnection;
295300
this.upgradingConnection = upgradingConnection;
296301
this.upgradingStream = stream;
302+
this.maxLifetime = maxLifetime;
297303
}
298304

299305
@Override
@@ -354,7 +360,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
354360
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
355361

356362
// Now we need to upgrade this to an HTTP2
357-
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.context(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority(), upgradingConnection.pooled());
363+
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.context(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority(), upgradingConnection.pooled(), maxLifetime);
358364
upgradingConnection.channel().pipeline().addLast(handler);
359365
handler.connectFuture().addListener(future -> {
360366
if (!future.isSuccess()) {
@@ -782,7 +788,7 @@ public Future<HttpClientStream> createStream(ContextInternal context) {
782788
if (current instanceof Http1xClientConnection && !upgradeProcessed) {
783789
return current
784790
.createStream(context)
785-
.map(stream -> new UpgradingStream(stream, this, (Http1xClientConnection) current));
791+
.map(stream -> new UpgradingStream(stream, this, (Http1xClientConnection) current, maxLifetime));
786792
} else {
787793
return current
788794
.createStream(context)

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import io.vertx.core.internal.ContextInternal;
2828
import io.vertx.core.internal.PromiseInternal;
2929
import io.vertx.core.internal.http.HttpHeadersInternal;
30-
import io.vertx.core.net.*;
3130
import io.vertx.core.internal.net.NetClientInternal;
31+
import io.vertx.core.net.*;
3232
import io.vertx.core.net.impl.NetSocketImpl;
3333
import io.vertx.core.net.impl.VertxHandler;
3434
import io.vertx.core.spi.metrics.ClientMetrics;
@@ -59,6 +59,7 @@ public class HttpChannelConnector {
5959
private final HostAndPort authority;
6060
private final SocketAddress server;
6161
private final boolean pooled;
62+
private final long maxLifetime;
6263

6364
public HttpChannelConnector(HttpClientBase client,
6465
NetClientInternal netClient,
@@ -70,7 +71,8 @@ public HttpChannelConnector(HttpClientBase client,
7071
boolean useAlpn,
7172
HostAndPort authority,
7273
SocketAddress server,
73-
boolean pooled) {
74+
boolean pooled,
75+
long maxLifetime) {
7476
this.client = client;
7577
this.netClient = netClient;
7678
this.metrics = metrics;
@@ -83,6 +85,7 @@ public HttpChannelConnector(HttpClientBase client,
8385
this.authority = authority;
8486
this.server = server;
8587
this.pooled = pooled;
88+
this.maxLifetime = maxLifetime;
8689
}
8790

8891
public SocketAddress server() {
@@ -213,7 +216,7 @@ private void http1xConnected(HttpVersion version,
213216
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
214217
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
215218
HttpClientMetrics met = client.metrics();
216-
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, authority, context, metrics, pooled);
219+
Http1xClientConnection conn = new Http1xClientConnection(upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, authority, context, metrics, pooled, maxLifetime);
217220
if (met != null) {
218221
conn.metric(socketMetric);
219222
met.endpointConnected(metrics);
@@ -224,7 +227,7 @@ private void http1xConnected(HttpVersion version,
224227
if (upgrade) {
225228
boolean preflightRequest = options.isHttp2ClearTextUpgradeWithPreflightRequest();
226229
if (preflightRequest) {
227-
Http2UpgradeClientConnection conn2 = new Http2UpgradeClientConnection(client, conn);
230+
Http2UpgradeClientConnection conn2 = new Http2UpgradeClientConnection(client, conn, maxLifetime);
228231
conn2.concurrencyChangeHandler(concurrency -> {
229232
// Ignore
230233
});
@@ -245,7 +248,7 @@ private void http1xConnected(HttpVersion version,
245248
}
246249
});
247250
} else {
248-
future.complete(new Http2UpgradeClientConnection(client, conn));
251+
future.complete(new Http2UpgradeClientConnection(client, conn, maxLifetime));
249252
}
250253
} else {
251254
future.complete(conn);
@@ -260,7 +263,7 @@ private void http2Connected(ContextInternal context,
260263
PromiseInternal<HttpClientConnectionInternal> promise) {
261264
VertxHttp2ConnectionHandler<Http2ClientConnection> clientHandler;
262265
try {
263-
clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, context, false, metric, authority, pooled);
266+
clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, context, false, metric, authority, pooled, maxLifetime);
264267
ch.pipeline().addLast("handler", clientHandler);
265268
ch.flush();
266269
} catch (Exception e) {

0 commit comments

Comments
 (0)