diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java index 2fe702d..2f67baa 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java @@ -1,6 +1,8 @@ package io.ebean.datasource.pool; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** * A buffer especially designed for Busy PooledConnections. @@ -82,9 +84,10 @@ boolean remove(PooledConnection pc) { } /** - * Close connections that should be considered leaked. + * Remove connections that should be considered leaked. */ - void closeBusyConnections(long leakTimeMinutes) { + List removeBusyConnections(long leakTimeMinutes) { + List busyConnections = null; long olderThanTime = System.currentTimeMillis() - (leakTimeMinutes * 60000); Log.debug("Closing busy connections using leakTimeMinutes {0}", leakTimeMinutes); for (int i = 0; i < slots.length; i++) { @@ -98,20 +101,14 @@ void closeBusyConnections(long leakTimeMinutes) { } else { slots[i] = null; --size; - closeBusyConnection(pc); + if (busyConnections == null) { + busyConnections = new ArrayList<>(); + } + busyConnections.add(pc); } } } - } - - private void closeBusyConnection(PooledConnection pc) { - try { - Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); - System.out.println("CLOSING busy connection: " + pc.fullDescription()); - pc.closeConnectionFully(false); - } catch (Exception ex) { - Log.error("Error when closing potentially leaked connection " + pc.description(), ex); - } + return busyConnections; } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index 3fdc440..9012def 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -292,7 +292,7 @@ public SQLException dataSourceDownReason() { private void notifyDataSourceIsDown(SQLException reason) { if (dataSourceUp.get()) { - reset(); + reset(false); notifyDown(reason); } } @@ -316,7 +316,7 @@ private void notifyDown(SQLException reason) { private void notifyDataSourceIsUp() { if (!dataSourceUp.get()) { - reset(); + reset(true); notifyUp(); } } @@ -553,7 +553,7 @@ void returnConnectionForceClose(PooledConnection pooledConnection, boolean testP } void removeClosedConnection(PooledConnection pooledConnection) { - queue.returnPooledConnection(pooledConnection, true); + queue.returnPooledConnection(pooledConnection, true, false); } /** @@ -564,13 +564,13 @@ private void returnTheConnection(PooledConnection pooledConnection, boolean forc if (poolListener != null && !forceClose) { poolListener.onBeforeReturnConnection(pooledConnection); } - queue.returnPooledConnection(pooledConnection, forceClose); + queue.returnPooledConnection(pooledConnection, forceClose, true); } void returnConnectionReset(PooledConnection pooledConnection) { - queue.returnPooledConnection(pooledConnection, true); + queue.returnPooledConnection(pooledConnection, true, false); Log.warn("Resetting DataSource on read-only failure [{0}]", name); - reset(); + reset(false); } /** @@ -599,9 +599,9 @@ PooledConnection createConnectionForQueue(int connId) throws SQLException { *
  • Busy connections are closed when they are returned to the pool.
  • * */ - private void reset() { + private void reset(boolean logErrors) { heartbeatPoolExhaustedCount = 0; - queue.reset(leakTimeMinutes); + queue.reset(leakTimeMinutes, logErrors); } /** diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java index 73ba836..181d502 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java @@ -59,18 +59,22 @@ void closeAll(boolean logErrors) { /** * Trim any inactive connections that have not been used since usedSince. + *

    + * The connections are returned to close. */ - int trim(int minSize, long usedSince, long createdSince) { - int trimCount = 0; + List trim(int minSize, long usedSince, long createdSince) { + List trimmedConnections = null; ListIterator iterator = freeBuffer.listIterator(minSize); while (iterator.hasNext()) { PooledConnection pooledConnection = iterator.next(); if (pooledConnection.shouldTrim(usedSince, createdSince)) { iterator.remove(); - pooledConnection.closeConnectionFully(true); - trimCount++; + if (trimmedConnections == null) { + trimmedConnections = new ArrayList<>(); + } + trimmedConnections.add(pooledConnection); } } - return trimCount; + return trimmedConnections; } } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 7a66c98..a7ee96c 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -5,6 +5,7 @@ import io.ebean.datasource.pool.ConnectionPool.Status; import java.sql.SQLException; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -87,12 +88,7 @@ private PoolStatus createStatus() { @Override public String toString() { - lock.lock(); - try { - return createStatus().toString(); - } finally { - lock.unlock(); - } + return status(false).toString(); } PoolStatus status(boolean reset) { @@ -147,31 +143,40 @@ void ensureMinimumConnections() throws SQLException { /** * Return a PooledConnection. */ - void returnPooledConnection(PooledConnection c, boolean forceClose) { + void returnPooledConnection(PooledConnection c, boolean forceClose, boolean logErrors) { + boolean closeConnection = false; lock.lock(); try { if (!busyList.remove(c)) { Log.error("Connection [{0}] not found in BusyList?", c); } if (forceClose || c.shouldTrimOnReturn(lastResetTime, maxAgeMillis)) { - c.closeConnectionFully(false); + closeConnection = true; // close outside lock } else { freeList.add(c); - notEmpty.signal(); + notEmpty.signal(); // notify _obtainConnectionWaitLoop, that there are new connections in the freelist } } finally { lock.unlock(); } + if (closeConnection) { + c.closeConnectionFully(logErrors); + } } - private PooledConnection extractFromFreeList() { + /** + * Returns one connection from the free list and put it into the busy list or null if no connections are free. + * + * @throws StaleConnectionException there was a connection in the freeList, but it is stale. + * The connection has to be closed outside the lock. + */ + private PooledConnection extractFromFreeList() throws StaleConnectionException { if (freeList.isEmpty()) { return null; } final PooledConnection c = freeList.remove(); if (validateStaleMillis > 0 && staleEviction(c)) { - c.closeConnectionFully(false); - return null; + throw new StaleConnectionException(c); } registerBusyConnection(c); return c; @@ -192,15 +197,24 @@ private boolean stale(PooledConnection c) { } PooledConnection obtainConnection() throws SQLException { - try { - PooledConnection pc = _obtainConnection(); - pc.resetForUse(); - return pc; - - } catch (InterruptedException e) { - // restore the interrupted status as we throw SQLException - Thread.currentThread().interrupt(); - throw new SQLException("Interrupted getting connection from pool", e); + var start = System.nanoTime(); + while (true) { + try { + PooledConnection pc = _obtainConnection(); + pc.resetForUse(); + final var elapsed = System.nanoTime() - start; + totalAcquireNanos += elapsed; + maxAcquireNanos = Math.max(maxAcquireNanos, elapsed); + return pc; + + } catch (InterruptedException e) { + // restore the interrupted status as we throw SQLException + Thread.currentThread().interrupt(); + throw new SQLException("Interrupted getting connection from pool", e); + } catch (StaleConnectionException e) { + e.getConnection().closeConnectionFully(true); + // try again... + } } } @@ -215,8 +229,8 @@ private int registerBusyConnection(PooledConnection connection) { return busySize; } - private PooledConnection _obtainConnection() throws InterruptedException, SQLException { - var start = System.nanoTime(); + private PooledConnection _obtainConnection() throws InterruptedException, SQLException, StaleConnectionException { + lock.lockInterruptibly(); try { if (doingShutdown) { @@ -246,9 +260,6 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc waitingThreads--; } } finally { - final var elapsed = System.nanoTime() - start; - totalAcquireNanos += elapsed; - maxAcquireNanos = Math.max(maxAcquireNanos, elapsed); lock.unlock(); } } @@ -270,7 +281,7 @@ private PooledConnection createConnection() throws SQLException { /** * Got into a loop waiting for connections to be returned to the pool. */ - private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException { + private PooledConnection _obtainConnectionWaitLoop() throws SQLException, InterruptedException, StaleConnectionException { long nanos = MILLIS_TIME_UNIT.toNanos(waitTimeoutMillis); for (; ; ) { if (nanos <= 0) { @@ -307,7 +318,7 @@ PoolStatus shutdown(boolean closeBusyConnections) { try { doingShutdown = true; PoolStatus status = createStatus(); - closeFreeConnections(true); + freeList.closeAll(true); if (!closeBusyConnections) { // connections close on return to pool @@ -332,14 +343,14 @@ PoolStatus shutdown(boolean closeBusyConnections) { *

    * This is typically done when a database down event occurs. */ - void reset(long leakTimeMinutes) { + void reset(long leakTimeMinutes, boolean logErrors) { lock.lock(); try { PoolStatus status = createStatus(); Log.info("Resetting DataSource [{0}] {1}", name, status); lastResetTime = System.currentTimeMillis(); - closeFreeConnections(false); + freeList.closeAll(logErrors); closeBusyConnections(leakTimeMinutes); String busyInfo = getBusyConnectionInformation(); @@ -353,17 +364,12 @@ void reset(long leakTimeMinutes) { } void trim(long maxInactiveMillis, long maxAgeMillis) { - lock.lock(); - try { - if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { - try { - ensureMinimumConnections(); - } catch (SQLException e) { - Log.error("Error trying to ensure minimum connections", e); - } + if (trimInactiveConnections(maxInactiveMillis, maxAgeMillis)) { + try { + ensureMinimumConnections(); + } catch (SQLException e) { + Log.error("Error trying to ensure minimum connections", e); } - } finally { - lock.unlock(); } } @@ -372,33 +378,32 @@ void trim(long maxInactiveMillis, long maxAgeMillis) { */ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMillis) { final long createdSince = (maxAgeMillis == 0) ? 0 : System.currentTimeMillis() - maxAgeMillis; - final int trimmedCount; - if (freeList.size() > minSize) { - // trim on maxInactive and maxAge - long usedSince = System.currentTimeMillis() - maxInactiveMillis; - trimmedCount = freeList.trim(minSize, usedSince, createdSince); - } else if (createdSince > 0) { - // trim only on maxAge - trimmedCount = freeList.trim(0, createdSince, createdSince); - } else { - trimmedCount = 0; - } - if (trimmedCount > 0 && Log.isLoggable(DEBUG)) { - Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmedCount, totalConnections()); - } - return trimmedCount > 0 && freeList.size() < minSize; - } - - /** - * Close all the connections that are in the free list. - */ - private void closeFreeConnections(boolean logErrors) { + final List trimmed; lock.lock(); try { - freeList.closeAll(logErrors); + if (freeList.size() > minSize) { + // trim on maxInactive and maxAge + long usedSince = System.currentTimeMillis() - maxInactiveMillis; + trimmed = freeList.trim(minSize, usedSince, createdSince); + } else if (createdSince > 0) { + // trim only on maxAge + trimmed = freeList.trim(0, createdSince, createdSince); + } else { + trimmed = null; + } } finally { lock.unlock(); } + if (trimmed != null) { + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] trimmed [{1}] inactive connections. New size[{2}]", name, trimmed.size(), totalConnections()); + } + for (PooledConnection pc : trimmed) { + pc.closeConnectionFully(true); + } + return freeList.size() < minSize; + } + return false; } /** @@ -412,12 +417,23 @@ private void closeFreeConnections(boolean logErrors) { * closed and put back into the pool. */ void closeBusyConnections(long leakTimeMinutes) { + List busyConnections; lock.lock(); try { - busyList.closeBusyConnections(leakTimeMinutes); + busyConnections = busyList.removeBusyConnections(leakTimeMinutes); } finally { lock.unlock(); } + if (busyConnections != null) { + for (PooledConnection pc : busyConnections) { + try { + Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); + pc.closeConnectionFully(true); + } catch (Exception ex) { + Log.error("Error when closing potentially leaked connection " + pc.description(), ex); + } + } + } } String getBusyConnectionInformation() { diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/StaleConnectionException.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/StaleConnectionException.java new file mode 100644 index 0000000..4502179 --- /dev/null +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/StaleConnectionException.java @@ -0,0 +1,18 @@ +package io.ebean.datasource.pool; + +/** + * Used to return a stale connection, so that it can be closed outside a lock. + * + * @author Roland Praml, Foconis Analytics GmbH + */ +class StaleConnectionException extends Exception { + private final PooledConnection connection; + + StaleConnectionException(PooledConnection connection) { + this.connection = connection; + } + + public PooledConnection getConnection() { + return connection; + } +}