Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ebean-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.5.3</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import java.io.PrintWriter;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -27,6 +31,7 @@ final class ConnectionPool implements DataSourcePool {

private static final String APPLICATION_NAME = "ApplicationName";
private final ReentrantLock heartbeatLock = new ReentrantLock(false);
private final ReentrantLock executorLock = new ReentrantLock(false);
private final ReentrantLock notifyLock = new ReentrantLock(false);
/**
* The name given to this dataSource.
Expand Down Expand Up @@ -81,6 +86,8 @@ final class ConnectionPool implements DataSourcePool {
private final PooledConnectionQueue queue;
private Timer heartBeatTimer;
private int heartbeatPoolExhaustedCount;
private ExecutorService executor;

/**
* Used to find and close() leaked connections. Leaked connections are
* thought to be busy but have not been used for some time. Each time a
Expand Down Expand Up @@ -189,6 +196,7 @@ private void tryEnsureMinimumConnections() {
private void initialiseConnections() throws SQLException {
long start = System.currentTimeMillis();
dataSourceUp.set(true);
startExecutor();
if (failOnStart) {
queue.ensureMinimumConnections();
} else {
Expand Down Expand Up @@ -327,6 +335,7 @@ private void notifyUp() {
// check such that we only notify once
if (!dataSourceUp.get()) {
dataSourceUp.set(true);
startExecutor();
startHeartBeatIfStopped();
dataSourceDownReason = null;
Log.error("RESOLVED FATAL: DataSource [" + name + "] is back up!");
Expand Down Expand Up @@ -660,6 +669,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
stopHeartBeatIfRunning();
PoolStatus status = queue.shutdown(closeBusyConnections);
dataSourceUp.set(false);
stopExecutor();
if (fromHook) {
Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
} else {
Expand Down Expand Up @@ -725,6 +735,77 @@ private void stopHeartBeatIfRunning() {
}
}

static class AsyncCloser implements Runnable {
final PooledConnection pc;
final boolean logErrors;

public AsyncCloser(PooledConnection pc, boolean logErrors) {
this.pc = pc;
this.logErrors = logErrors;
}

@Override
public void run() {
pc.doCloseConnection(logErrors);
}

@Override
public String toString() {
return pc.toString();
}
}

/**
* Tries to close the pc in an async thread. The method waits up to 5 seconds and returns true,
* if connection was closed in this time.
* <p>
* If the connection could not be closed within 5 seconds,
*/
void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) {
executorLock.lock();
try {
if (executor != null) {
executor.submit(new AsyncCloser(pc, logErrors));
return;
}
} finally {
executorLock.unlock();
}
// it is possible, that we receive runnables after shutdown.
// in this case, we will execute them immediately (outside lock)
pc.doCloseConnection(logErrors);
}

private void startExecutor() {
executorLock.lock();
try {
if (executor == null) {
executor = Executors.newSingleThreadExecutor();
}
} finally {
executorLock.unlock();
}
}

private void stopExecutor() {
executorLock.lock();
try {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
Log.warn("DataSource [{0}] could not terminate executor.", name);
}
} catch (InterruptedException ie) {
Log.warn("DataSource [{0}] could not terminate executor.", name, ie);
}
executor = null;
}
} finally {
executorLock.unlock();
}
}

/**
* Return the default autoCommit setting for the pool.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.sql.*;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down Expand Up @@ -234,52 +235,75 @@ void closeConnectionFully(boolean logErrors) {
if (Log.isLoggable(System.Logger.Level.TRACE)) {
Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description());
}
if (pool != null) {
pool.pstmtCacheMetrics(pstmtCache);
if (pool == null) {
return; // this can happen in tests only.
}
pool.pstmtCacheMetrics(pstmtCache);
pool.closeConnectionFullyAsync(this, logErrors);
}

/**
* this mehthod performs network IO and may block
*/
void doCloseConnection(boolean logErrors) {
long start = System.nanoTime();
try {
if (connection.isClosed()) {
// Typically, the JDBC Driver has its own JVM shutdown hook and already
// closed the connections in our DataSource pool so making this DEBUG level
Log.trace("Closing Connection[{0}] that is already closed?", name);
return;
try {
if (connection.isClosed()) {
// Typically, the JDBC Driver has its own JVM shutdown hook and already
// closed the connections in our DataSource pool so making this DEBUG level
Log.trace("Closing Connection[{0}] that is already closed?", name);
return;
}
} catch (SQLException ex) {
if (logErrors) {
Log.error("Error checking if connection [" + name + "] is closed", ex);
}
}
} catch (SQLException ex) {
if (logErrors) {
Log.error("Error checking if connection [" + name + "] is closed", ex);
try {
clearPreparedStatementCache();
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Error when closing connection Statements", ex);
}
}
try {
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
// so try to do a rollback.
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Could not perform rollback", ex);
}
}
try {
connection.close();
pool.dec();
} catch (SQLException ex) {
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
}
}
} finally {
long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (millis > 500) {
Log.warn("Closing connection [" + fullDescription() + "] took an unexpected long time of " + millis + " ms");
}
}
}

void clearPreparedStatementCache() throws SQLException {
lock.lock();
try {
for (ExtendedPreparedStatement ps : pstmtCache.values()) {
ps.closeDestroy();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Error when closing connection Statements", ex);
}

} finally {
lock.unlock();
}
try {
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
// so try to do a rollback.
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Could not perform rollback", ex);
}
}
try {
connection.close();
pool.dec();
} catch (SQLException ex) {
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.ebean.datasource.pool;

import io.ebean.datasource.DataSourceConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;

public class ConnectionPoolBlockTest {

private final Logger logger = LoggerFactory.getLogger(ConnectionPoolBlockTest.class);

private final ConnectionPool pool;

private final Random random = new Random();

private int total;

ConnectionPoolBlockTest() {
pool = createPool();
}

private ConnectionPool createPool() {

DataSourceConfig config = new DataSourceConfig();
config.setDriver("org.h2.Driver");
config.setUrl("jdbc:h2:mem:tests");
config.setUsername("sa");
config.setPassword("");
config.setMinConnections(1);
config.setMaxConnections(100);
config.setAutoCommit(false);
config.setTrimPoolFreqSecs(1);
config.setMaxInactiveTimeSecs(1);
config.setHeartbeatFreqSecs(1);
config.enforceCleanClose(true);
return new ConnectionPool("testblock", config);
}

@AfterEach
public void after() throws InterruptedException {
pool.shutdown();
}

/**
* Yes, this code does some strange things to simulate a blocking close.
*
* 1. It does not commit/rollback the pooledConnection
* 2. because of `enforceCleanClose = true`, the pool tries to close the
* underlying H2 console.
* 3. another thread holds a synchronized-lock on that H2 connection,
* so the pool cannot close that connection quickly!
*
* This can happen on network outage, because close may send TCP/IP packet
* which can slow down the pool whenever IO is done during the pool lock.
*/
public void blockPool() {
try (Connection conn = pool.getConnection()) {
// we close the underlying h2 connection and start a thread that holds
// synchronized lock on the h2 connection.
new Thread(() -> {
try {
Connection h2Conn = conn.unwrap(org.h2.jdbc.JdbcConnection.class);
synchronized (h2Conn) {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}

}).start();
Thread.sleep(50);
} catch (AssertionError e) {
// expected
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void test() throws Exception {

new Thread(this::blockPool).start();
// wait for warmup
Thread.sleep(100);

long start = System.currentTimeMillis();
try (Connection conn = pool.getConnection()) {
conn.rollback();
}
assertThat(System.currentTimeMillis() - start).isLessThan(100);
}
}
Loading
Loading