diff --git a/ebean-datasource/pom.xml b/ebean-datasource/pom.xml index 350b6f1..12371b5 100644 --- a/ebean-datasource/pom.xml +++ b/ebean-datasource/pom.xml @@ -83,6 +83,13 @@ test + + org.mariadb.jdbc + mariadb-java-client + 3.5.3 + test + + diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index 3fdc440..a16371f 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -6,6 +6,10 @@ import java.io.PrintWriter; import java.sql.*; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -27,6 +31,7 @@ final class ConnectionPool implements DataSourcePool { private static final String APPLICATION_NAME = "ApplicationName"; private final ReentrantLock heartbeatLock = new ReentrantLock(false); + private final ReentrantLock executorLock = new ReentrantLock(false); private final ReentrantLock notifyLock = new ReentrantLock(false); /** * The name given to this dataSource. @@ -81,6 +86,8 @@ final class ConnectionPool implements DataSourcePool { private final PooledConnectionQueue queue; private Timer heartBeatTimer; private int heartbeatPoolExhaustedCount; + private ExecutorService executor; + /** * Used to find and close() leaked connections. Leaked connections are * thought to be busy but have not been used for some time. Each time a @@ -189,6 +196,7 @@ private void tryEnsureMinimumConnections() { private void initialiseConnections() throws SQLException { long start = System.currentTimeMillis(); dataSourceUp.set(true); + startExecutor(); if (failOnStart) { queue.ensureMinimumConnections(); } else { @@ -327,6 +335,7 @@ private void notifyUp() { // check such that we only notify once if (!dataSourceUp.get()) { dataSourceUp.set(true); + startExecutor(); startHeartBeatIfStopped(); dataSourceDownReason = null; Log.error("RESOLVED FATAL: DataSource [" + name + "] is back up!"); @@ -660,6 +669,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) { stopHeartBeatIfRunning(); PoolStatus status = queue.shutdown(closeBusyConnections); dataSourceUp.set(false); + stopExecutor(); if (fromHook) { Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem); } else { @@ -725,6 +735,77 @@ private void stopHeartBeatIfRunning() { } } + static class AsyncCloser implements Runnable { + final PooledConnection pc; + final boolean logErrors; + + public AsyncCloser(PooledConnection pc, boolean logErrors) { + this.pc = pc; + this.logErrors = logErrors; + } + + @Override + public void run() { + pc.doCloseConnection(logErrors); + } + + @Override + public String toString() { + return pc.toString(); + } + } + + /** + * Tries to close the pc in an async thread. The method waits up to 5 seconds and returns true, + * if connection was closed in this time. + *

+ * If the connection could not be closed within 5 seconds, + */ + void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) { + executorLock.lock(); + try { + if (executor != null) { + executor.submit(new AsyncCloser(pc, logErrors)); + return; + } + } finally { + executorLock.unlock(); + } + // it is possible, that we receive runnables after shutdown. + // in this case, we will execute them immediately (outside lock) + pc.doCloseConnection(logErrors); + } + + private void startExecutor() { + executorLock.lock(); + try { + if (executor == null) { + executor = Executors.newSingleThreadExecutor(); + } + } finally { + executorLock.unlock(); + } + } + + private void stopExecutor() { + executorLock.lock(); + try { + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + Log.warn("DataSource [{0}] could not terminate executor.", name); + } + } catch (InterruptedException ie) { + Log.warn("DataSource [{0}] could not terminate executor.", name, ie); + } + executor = null; + } + } finally { + executorLock.unlock(); + } + } + /** * Return the default autoCommit setting for the pool. */ diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 4d2b2bd..0a4f067 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -3,6 +3,7 @@ import java.sql.*; import java.util.ArrayList; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** @@ -234,52 +235,75 @@ void closeConnectionFully(boolean logErrors) { if (Log.isLoggable(System.Logger.Level.TRACE)) { Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description()); } - if (pool != null) { - pool.pstmtCacheMetrics(pstmtCache); + if (pool == null) { + return; // this can happen in tests only. } + pool.pstmtCacheMetrics(pstmtCache); + pool.closeConnectionFullyAsync(this, logErrors); + } + + /** + * this mehthod performs network IO and may block + */ + void doCloseConnection(boolean logErrors) { + long start = System.nanoTime(); try { - if (connection.isClosed()) { - // Typically, the JDBC Driver has its own JVM shutdown hook and already - // closed the connections in our DataSource pool so making this DEBUG level - Log.trace("Closing Connection[{0}] that is already closed?", name); - return; + try { + if (connection.isClosed()) { + // Typically, the JDBC Driver has its own JVM shutdown hook and already + // closed the connections in our DataSource pool so making this DEBUG level + Log.trace("Closing Connection[{0}] that is already closed?", name); + return; + } + } catch (SQLException ex) { + if (logErrors) { + Log.error("Error checking if connection [" + name + "] is closed", ex); + } } - } catch (SQLException ex) { - if (logErrors) { - Log.error("Error checking if connection [" + name + "] is closed", ex); + try { + clearPreparedStatementCache(); + } catch (SQLException ex) { + if (logErrors) { + Log.warn("Error when closing connection Statements", ex); + } + } + try { + // DB2 (and some other DBMS) may have uncommitted changes and do not allow close + // so try to do a rollback. + if (!connection.getAutoCommit()) { + connection.rollback(); + } + } catch (SQLException ex) { + if (logErrors) { + Log.warn("Could not perform rollback", ex); + } + } + try { + connection.close(); + pool.dec(); + } catch (SQLException ex) { + if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) { + Log.error("Error when fully closing connection [" + fullDescription() + "]", ex); + } + } + } finally { + long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + if (millis > 500) { + Log.warn("Closing connection [" + fullDescription() + "] took an unexpected long time of " + millis + " ms"); } } + } + + void clearPreparedStatementCache() throws SQLException { lock.lock(); try { for (ExtendedPreparedStatement ps : pstmtCache.values()) { ps.closeDestroy(); } - } catch (SQLException ex) { - if (logErrors) { - Log.warn("Error when closing connection Statements", ex); - } + } finally { lock.unlock(); } - try { - // DB2 (and some other DBMS) may have uncommitted changes and do not allow close - // so try to do a rollback. - if (!connection.getAutoCommit()) { - connection.rollback(); - } - } catch (SQLException ex) { - if (logErrors) { - Log.warn("Could not perform rollback", ex); - } - } - try { - connection.close(); - pool.dec(); - } catch (SQLException ex) { - if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) { - Log.error("Error when fully closing connection [" + fullDescription() + "]", ex); - } - } } /** diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolBlockTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolBlockTest.java new file mode 100644 index 0000000..2379d17 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolBlockTest.java @@ -0,0 +1,98 @@ +package io.ebean.datasource.pool; + +import io.ebean.datasource.DataSourceConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConnectionPoolBlockTest { + + private final Logger logger = LoggerFactory.getLogger(ConnectionPoolBlockTest.class); + + private final ConnectionPool pool; + + private final Random random = new Random(); + + private int total; + + ConnectionPoolBlockTest() { + pool = createPool(); + } + + private ConnectionPool createPool() { + + DataSourceConfig config = new DataSourceConfig(); + config.setDriver("org.h2.Driver"); + config.setUrl("jdbc:h2:mem:tests"); + config.setUsername("sa"); + config.setPassword(""); + config.setMinConnections(1); + config.setMaxConnections(100); + config.setAutoCommit(false); + config.setTrimPoolFreqSecs(1); + config.setMaxInactiveTimeSecs(1); + config.setHeartbeatFreqSecs(1); + config.enforceCleanClose(true); + return new ConnectionPool("testblock", config); + } + + @AfterEach + public void after() throws InterruptedException { + pool.shutdown(); + } + + /** + * Yes, this code does some strange things to simulate a blocking close. + * + * 1. It does not commit/rollback the pooledConnection + * 2. because of `enforceCleanClose = true`, the pool tries to close the + * underlying H2 console. + * 3. another thread holds a synchronized-lock on that H2 connection, + * so the pool cannot close that connection quickly! + * + * This can happen on network outage, because close may send TCP/IP packet + * which can slow down the pool whenever IO is done during the pool lock. + */ + public void blockPool() { + try (Connection conn = pool.getConnection()) { + // we close the underlying h2 connection and start a thread that holds + // synchronized lock on the h2 connection. + new Thread(() -> { + try { + Connection h2Conn = conn.unwrap(org.h2.jdbc.JdbcConnection.class); + synchronized (h2Conn) { + Thread.sleep(1000); + } + } catch (Exception e) { + e.printStackTrace(); + } + + }).start(); + Thread.sleep(50); + } catch (AssertionError e) { + // expected + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void test() throws Exception { + + new Thread(this::blockPool).start(); + // wait for warmup + Thread.sleep(100); + + long start = System.currentTimeMillis(); + try (Connection conn = pool.getConnection()) { + conn.rollback(); + } + assertThat(System.currentTimeMillis() - start).isLessThan(100); + } +} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java new file mode 100644 index 0000000..a3a733b --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/pool/ConnectionPoolHangUpTest.java @@ -0,0 +1,46 @@ +package io.ebean.datasource.pool; + +import io.ebean.datasource.DataSourceBuilder; +import io.ebean.datasource.DataSourcePool; +import org.h2.jdbc.JdbcConnection; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; + +public class ConnectionPoolHangUpTest { + + @Test + @Disabled("run manually") + void testHoldLockOnObject() throws Exception { + DataSourcePool pool = DataSourceBuilder.create() + .url("jdbc:h2:mem:testConnectionPoolHangUp") + .username("sa") + .password("sa") + .heartbeatFreqSecs(1) + .minConnections(1) + .maxConnections(1) + .trimPoolFreqSecs(1) + .heartbeatMaxPoolExhaustedCount(0) + .failOnStart(false) + .build(); + try { + Connection conn = pool.getConnection(); + Thread t = new Thread(() -> { + try { + JdbcConnection h2Conn = conn.unwrap(JdbcConnection.class); + synchronized (h2Conn) { + Thread.sleep(300000); + } + } catch (Exception e) { + // nop + } + }); + t.setDaemon(true); + t.start(); + } finally { + pool.shutdown(); + } + } + +} diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/test/NetworkOutageTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/test/NetworkOutageTest.java new file mode 100644 index 0000000..45a0dd7 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/test/NetworkOutageTest.java @@ -0,0 +1,108 @@ +package io.ebean.datasource.test; + +import io.ebean.test.containers.Db2Container; +import io.ebean.test.containers.MariaDBContainer; +import io.ebean.test.containers.PostgresContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +/** + * This testsuite demonstrates, how a driver behaves, when a network outage occurs. + * + * @author Roland Praml, Foconis Analytics GmbH + */ +public class NetworkOutageTest { + + static void openPort(int port) throws Exception { + Runtime.getRuntime().exec("sudo iptables -D OUTPUT -p tcp --dport " + port + " -j DROP").waitFor(); // remove rule + } + + static void closePort(int port) throws Exception { + System.out.println("Closing port " + port); + Runtime.getRuntime().exec("sudo iptables -A OUTPUT -p tcp --dport " + port + " -j DROP").waitFor(); // remove rule + } + + @BeforeAll + @AfterAll + static void removeRules() throws Exception { + openPort(3306); + openPort(50000); + openPort(5432); + } + + @Test + @Disabled + void testNetworkOutageDb2() throws Exception { + Db2Container container = Db2Container.builder("latest").build(); + container.start(); + + Connection conn = container.createConnection(); + PreparedStatement stmt = conn.prepareStatement("SELECT * from SYSCAT.TABLES"); + stmt.setFetchSize(10); + ResultSet rs = stmt.executeQuery(); + rs.next(); + rs.next(); // read two rows. Now simulate network outage + + closePort(50000); + + long startTime = System.currentTimeMillis(); + stmt.close(); // this will block 1 minute or even longer + conn.close(); + System.out.println("Done in " + (System.currentTimeMillis() - startTime) + "ms"); + + } + + @Test + @Disabled + void testNetworkOutageMariaDb() throws Exception { + MariaDBContainer container = MariaDBContainer.builder("latest").build(); + container.start(); + + Connection conn = container.createConnection(); + PreparedStatement stmt = conn.prepareStatement("SELECT * from INFORMATION_SCHEMA.TABLES"); + stmt.setFetchSize(10); + ResultSet rs = stmt.executeQuery(); + rs.next(); + rs.next(); // read two rows. Now simulate network outage + + closePort(3306); + + long startTime = System.currentTimeMillis(); + stmt.close(); + conn.close(); + // mariadb is graceful here. It slows down 3-4 ms + System.out.println("Done in " + (System.currentTimeMillis() - startTime) + "ms"); + + } + + @Test + @Disabled + void testNetworkOutagePostgres() throws Exception { + PostgresContainer container = PostgresContainer.builder("latest").build(); + container.start(); + + Connection conn = container.createConnection(); + PreparedStatement stmt = conn.prepareStatement("SELECT * from INFORMATION_SCHEMA.TABLES"); + stmt.setFetchSize(10); + ResultSet rs = stmt.executeQuery(); + rs.next(); + rs.next(); // read two rows. Now simulate network outage + + closePort(5432); + + long startTime = System.currentTimeMillis(); + stmt.close(); + conn.close(); + // there is nearly no delay in PG + System.out.println("Done in " + (System.currentTimeMillis() - startTime) + "ms"); + + } + +}