diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java index 2fe702d..ead4e79 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/BusyConnectionBuffer.java @@ -107,7 +107,6 @@ void closeBusyConnections(long leakTimeMinutes) { private void closeBusyConnection(PooledConnection pc) { try { Log.warn("DataSource closing busy connection? {0}", pc.fullDescription()); - System.out.println("CLOSING busy connection: " + pc.fullDescription()); pc.closeConnectionFully(false); } catch (Exception ex) { Log.error("Error when closing potentially leaked connection " + pc.description(), ex); diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index c58651f..9d1e863 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -6,6 +6,12 @@ import java.io.PrintWriter; import java.sql.*; import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -26,7 +32,7 @@ final class ConnectionPool implements DataSourcePool { private static final String APPLICATION_NAME = "ApplicationName"; - private final ReentrantLock heartbeatLock = new ReentrantLock(false); + private final ReentrantLock backgroundtasksLock = new ReentrantLock(false); private final ReentrantLock notifyLock = new ReentrantLock(false); /** * The name given to this dataSource. @@ -94,6 +100,8 @@ final class ConnectionPool implements DataSourcePool { private final boolean shutdownOnJvmExit; private Thread shutdownHook; + private ExecutorService executor; + ConnectionPool(String name, DataSourceConfig params) { this.config = params; this.name = name; @@ -192,7 +200,7 @@ private void initialiseConnections() throws SQLException { } else { tryEnsureMinimumConnections(); } - startHeartBeatIfStopped(); + startBackgroundActionsIfStopped(); if (shutdownOnJvmExit && shutdownHook == null) { shutdownHook = new Thread(() -> shutdownPool(true, true)); @@ -325,7 +333,7 @@ private void notifyUp() { // check such that we only notify once if (!dataSourceUp.get()) { dataSourceUp.set(true); - startHeartBeatIfStopped(); + startBackgroundActionsIfStopped(); dataSourceDownReason = null; Log.error("RESOLVED FATAL: DataSource [" + name + "] is back up!"); if (notify != null) { @@ -648,6 +656,10 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) { stopHeartBeatIfRunning(); PoolStatus status = queue.shutdown(closeBusyConnections); dataSourceUp.set(false); + + // we must stop the executor after queue.shutdown + stopAsyncExecutorIfRunning(); + if (fromHook) { Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); } else { @@ -656,6 +668,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) { } } + private void removeShutdownHook() { if (shutdownHook != null) { try { @@ -684,8 +697,8 @@ public boolean isDataSourceUp() { return dataSourceUp.get(); } - private void startHeartBeatIfStopped() { - heartbeatLock.lock(); + private void startBackgroundActionsIfStopped() { + backgroundtasksLock.lock(); try { // only start if it is not already running if (heartBeatTimer == null) { @@ -695,13 +708,16 @@ private void startHeartBeatIfStopped() { heartBeatTimer.scheduleAtFixedRate(new HeartBeatRunnable(), freqMillis, freqMillis); } } + if (executor == null) { + this.executor = Executors.newCachedThreadPool(); + } } finally { - heartbeatLock.unlock(); + backgroundtasksLock.unlock(); } } private void stopHeartBeatIfRunning() { - heartbeatLock.lock(); + backgroundtasksLock.lock(); try { // only stop if it was running if (heartBeatTimer != null) { @@ -709,7 +725,27 @@ private void stopHeartBeatIfRunning() { heartBeatTimer = null; } } finally { - heartbeatLock.unlock(); + backgroundtasksLock.unlock(); + } + } + + private void stopAsyncExecutorIfRunning() { + backgroundtasksLock.lock(); + try { + // only stop if it was running + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + Log.warn("DataSource [{0}] could not terminate executor {1}", name, executor); + } + } catch (InterruptedException ie) { + Log.warn("DataSource [{0}] could not terminate executor {1}", name, executor, ie); + } + executor = null; + } + } finally { + backgroundtasksLock.unlock(); } } @@ -785,6 +821,55 @@ public PoolStatus status(boolean reset) { return queue.status(reset); } + /** + * Builds a closer, that closes the connection fully async, if the executor is present. + */ + private Future buildAsyncCloser(PooledConnection pc, boolean logErrors) { + backgroundtasksLock.lock(); + try { + if (executor != null) { + Runnable task = new Runnable() { + @Override + public void run() { + pc.doCloseConnectionFully(logErrors); + } + + @Override + public String toString() { + return pc.toString(); + } + }; + return executor.submit(task); + } + } finally { + backgroundtasksLock.unlock(); + } + return null; + } + + /** + * Tries to close the pc in an async thread. The method waits up to 5 seconds and returns true, + * if connection was closed in this time. + *

+ * If the connection could not be closed within 5 seconds, + */ + void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) { + Future asyncCloser = buildAsyncCloser(pc, logErrors); + if (asyncCloser == null) { + Log.info("Closing {0} in current thread", pc); + pc.doCloseConnectionFully(logErrors); + } else { + Log.trace("Closing {0} async", pc); + try { + asyncCloser.get(5, TimeUnit.SECONDS); + } catch (TimeoutException te) { + Log.warn("Timeout while async closing {0}", pc); + } catch (Exception e) { + Log.error("Unexpected error while async closing {0}", pc, e); + } + } + } + static final class Status implements PoolStatus { private final int minSize; diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 2e8185a..0a7f186 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -232,6 +232,13 @@ String fullDescription() { * @param logErrors if false then don't log errors when closing */ void closeConnectionFully(boolean logErrors) { + pool.closeConnectionFullyAsync(this, logErrors); + } + + /** + * This method should be executed only by pool.closeConnectionFullyAsync + */ + void doCloseConnectionFully(boolean logErrors) { if (Log.isLoggable(System.Logger.Level.TRACE)) { Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description()); } diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java new file mode 100644 index 0000000..9c3afa6 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java @@ -0,0 +1,44 @@ +package io.ebean.datasource.pool; + +import io.ebean.datasource.DataSourceBuilder; +import io.ebean.datasource.DataSourcePool; +import org.h2.jdbc.JdbcConnection; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; + +public class ConnectionPoolHangUpTest { + + @Test + void testHoldLockOnObject() throws Exception { + DataSourcePool pool = DataSourceBuilder.create() + .url("jdbc:h2:mem:testConnectionPoolHangUp") + .username("sa") + .password("sa") + .heartbeatFreqSecs(1) + .minConnections(1) + .maxConnections(1) + .trimPoolFreqSecs(1) + .heartbeatMaxPoolExhaustedCount(0) + .failOnStart(false) + .build(); + try { + Connection conn = pool.getConnection(); + Thread t = new Thread(() -> { + try { + JdbcConnection h2Conn = conn.unwrap(JdbcConnection.class); + synchronized (h2Conn) { + Thread.sleep(300000); + } + } catch (Exception e) { + // nop + } + }); + t.setDaemon(true); + t.start(); + } finally { + pool.shutdown(); + } + } + +}