diff --git a/README.md b/README.md index 2c855277d..1694f917d 100644 --- a/README.md +++ b/README.md @@ -248,6 +248,16 @@ available, calls to getConnection() will block for up to ``connectionTimeout`` m before timing out. Please read [about pool sizing](https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing). *Default: 10* +🔢``maximumPendingConnections``
+This property controls the maximum number of threads that are allowed to simultaneously wait +for a connection from the pool. When this limit is exceeded, subsequent calls to +``getConnection()`` will fail immediately with a ``SQLTransientConnectionException`` instead of +blocking for up to ``connectionTimeout`` milliseconds. This acts as a load-shedding mechanism +that prevents unbounded thread accumulation during traffic spikes or database outages, avoiding +thundering-herd timeouts and reducing the blast radius of pool exhaustion. +A value of 0 means unlimited — all threads will wait up to ``connectionTimeout``. +*Default: 0 (unlimited)* + 📈``metricRegistry``
This property is only available via programmatic configuration or IoC container. This property allows you to specify an instance of a *Codahale/Dropwizard* ``MetricRegistry`` to be used by the diff --git a/src/main/java/com/zaxxer/hikari/HikariConfig.java b/src/main/java/com/zaxxer/hikari/HikariConfig.java index cc3de63fb..921221407 100644 --- a/src/main/java/com/zaxxer/hikari/HikariConfig.java +++ b/src/main/java/com/zaxxer/hikari/HikariConfig.java @@ -100,6 +100,7 @@ public class HikariConfig implements HikariConfigMXBean private Properties healthCheckProperties; private long keepaliveTime; + private int maximumPendingConnections; private volatile boolean sealed; @@ -738,6 +739,36 @@ public void addHealthCheckProperty(String key, String value) healthCheckProperties.setProperty(key, value); } + /** + * Get the maximum number of threads that are allowed to simultaneously wait for a connection from the pool. + * When this limit is exceeded, subsequent calls to {@code getConnection()} will fail immediately with a + * {@code SQLTransientConnectionException} instead of blocking for {@code connectionTimeout}. + * + * @return the maximum number of pending connection requests, or 0 (default) for unlimited + */ + public int getMaximumPendingConnections() + { + return maximumPendingConnections; + } + + /** + * Set the maximum number of threads that are allowed to simultaneously wait for a connection from the pool. + * When this limit is exceeded, subsequent calls to {@code getConnection()} will fail immediately with a + * {@code SQLTransientConnectionException} instead of blocking for {@code connectionTimeout}. + *

+ * A value of 0 (default) means unlimited — all threads will wait up to {@code connectionTimeout}. + * + * @param maximumPendingConnections the maximum number of pending connection requests, or 0 for unlimited + */ + public void setMaximumPendingConnections(int maximumPendingConnections) + { + checkIfSealed(); + if (maximumPendingConnections < 0) { + throw new IllegalArgumentException("maximumPendingConnections cannot be negative"); + } + this.maximumPendingConnections = maximumPendingConnections; + } + /** * This property controls the keepalive interval for a connection in the pool. An in-use connection will never be * tested by the keepalive thread, only when it is idle will it be tested. diff --git a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java index a7b21ec47..b04517a77 100644 --- a/src/main/java/com/zaxxer/hikari/pool/HikariPool.java +++ b/src/main/java/com/zaxxer/hikari/pool/HikariPool.java @@ -89,7 +89,7 @@ public HikariPool(final HikariConfig config) { super(config); - this.connectionBag = new ConcurrentBag<>(this); + this.connectionBag = new ConcurrentBag<>(this, config.getMaximumPendingConnections()); this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); diff --git a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java index 0cc5a1f17..eb750a31c 100644 --- a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java +++ b/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java @@ -68,6 +68,7 @@ public class ConcurrentBag implements AutoCloseab private final ThreadLocal> threadLocalList; private final IBagStateListener listener; private final AtomicInteger waiters; + private final int maxWaiters; private volatile boolean closed; private final SynchronousQueue handoffQueue; @@ -106,10 +107,12 @@ public interface IBagStateListener * Construct a ConcurrentBag with the specified listener. * * @param listener the IBagStateListener to attach to this bag + * @param maxWaiters the maximum number of threads allowed to wait for a bag item, or 0 for unlimited */ - public ConcurrentBag(final IBagStateListener listener) + public ConcurrentBag(final IBagStateListener listener, final int maxWaiters) { this.listener = listener; + this.maxWaiters = maxWaiters; this.useWeakThreadLocals = useWeakThreadLocals(); this.handoffQueue = new SynchronousQueue<>(true); @@ -155,6 +158,13 @@ public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedExcepti } } + // TODO: should the condition include getTotalConnections() < config.getMaximumPoolSize()? + // It would allow threads to wait for the initial connection establishment, and it would + // protect against the pile of the connection requests if the pool is already full. + if (maxWaiters > 0 && waiting > maxWaiters) { + return null; + } + listener.addBagItem(waiting); timeout = timeUnit.toNanos(timeout); diff --git a/src/test/java/com/zaxxer/hikari/pool/TestConcurrentBag.java b/src/test/java/com/zaxxer/hikari/pool/TestConcurrentBag.java index b70a87c37..0fd9302c2 100644 --- a/src/test/java/com/zaxxer/hikari/pool/TestConcurrentBag.java +++ b/src/test/java/com/zaxxer/hikari/pool/TestConcurrentBag.java @@ -69,7 +69,7 @@ public static void teardown() @Test public void testConcurrentBag() throws Exception { - try (ConcurrentBag bag = new ConcurrentBag<>(x -> CompletableFuture.completedFuture(Boolean.TRUE))) { + try (ConcurrentBag bag = new ConcurrentBag<>(x -> CompletableFuture.completedFuture(Boolean.TRUE), 0)) { assertEquals(0, bag.values(8).size()); PoolEntry reserved = pool.newPoolEntry(false); diff --git a/src/test/java/com/zaxxer/hikari/util/TomcatConcurrentBagLeakTest.java b/src/test/java/com/zaxxer/hikari/util/TomcatConcurrentBagLeakTest.java index 545597309..aa1622f2e 100644 --- a/src/test/java/com/zaxxer/hikari/util/TomcatConcurrentBagLeakTest.java +++ b/src/test/java/com/zaxxer/hikari/util/TomcatConcurrentBagLeakTest.java @@ -112,7 +112,7 @@ public static class FauxWebContext @SuppressWarnings({"ResultOfMethodCallIgnored"}) public void createConcurrentBag() throws InterruptedException { - try (ConcurrentBag bag = new ConcurrentBag<>(x -> CompletableFuture.completedFuture(Boolean.TRUE))) { + try (ConcurrentBag bag = new ConcurrentBag<>(x -> CompletableFuture.completedFuture(Boolean.TRUE), 0)) { PoolEntry entry = new PoolEntry(); bag.add(entry);