Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ void closeBusyConnections(long leakTimeMinutes) {
private void closeBusyConnection(PooledConnection pc) {
try {
Log.warn("DataSource closing busy connection? {0}", pc.fullDescription());
System.out.println("CLOSING busy connection: " + pc.fullDescription());
pc.closeConnectionFully(false);
} catch (Exception ex) {
Log.error("Error when closing potentially leaked connection " + pc.description(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import java.io.PrintWriter;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -26,7 +32,7 @@
final class ConnectionPool implements DataSourcePool {

private static final String APPLICATION_NAME = "ApplicationName";
private final ReentrantLock heartbeatLock = new ReentrantLock(false);
private final ReentrantLock backgroundtasksLock = new ReentrantLock(false);
private final ReentrantLock notifyLock = new ReentrantLock(false);
/**
* The name given to this dataSource.
Expand Down Expand Up @@ -94,6 +100,8 @@ final class ConnectionPool implements DataSourcePool {
private final boolean shutdownOnJvmExit;
private Thread shutdownHook;

private ExecutorService executor;

ConnectionPool(String name, DataSourceConfig params) {
this.config = params;
this.name = name;
Expand Down Expand Up @@ -192,7 +200,7 @@ private void initialiseConnections() throws SQLException {
} else {
tryEnsureMinimumConnections();
}
startHeartBeatIfStopped();
startBackgroundActionsIfStopped();

if (shutdownOnJvmExit && shutdownHook == null) {
shutdownHook = new Thread(() -> shutdownPool(true, true));
Expand Down Expand Up @@ -325,7 +333,7 @@ private void notifyUp() {
// check such that we only notify once
if (!dataSourceUp.get()) {
dataSourceUp.set(true);
startHeartBeatIfStopped();
startBackgroundActionsIfStopped();
dataSourceDownReason = null;
Log.error("RESOLVED FATAL: DataSource [" + name + "] is back up!");
if (notify != null) {
Expand Down Expand Up @@ -648,6 +656,10 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
stopHeartBeatIfRunning();
PoolStatus status = queue.shutdown(closeBusyConnections);
dataSourceUp.set(false);

// we must stop the executor after queue.shutdown
stopAsyncExecutorIfRunning();

if (fromHook) {
Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
} else {
Expand All @@ -656,6 +668,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
}
}


private void removeShutdownHook() {
if (shutdownHook != null) {
try {
Expand Down Expand Up @@ -684,8 +697,8 @@ public boolean isDataSourceUp() {
return dataSourceUp.get();
}

private void startHeartBeatIfStopped() {
heartbeatLock.lock();
private void startBackgroundActionsIfStopped() {
backgroundtasksLock.lock();
try {
// only start if it is not already running
if (heartBeatTimer == null) {
Expand All @@ -695,21 +708,44 @@ private void startHeartBeatIfStopped() {
heartBeatTimer.scheduleAtFixedRate(new HeartBeatRunnable(), freqMillis, freqMillis);
}
}
if (executor == null) {
this.executor = Executors.newCachedThreadPool();
}
} finally {
heartbeatLock.unlock();
backgroundtasksLock.unlock();
}
}

private void stopHeartBeatIfRunning() {
heartbeatLock.lock();
backgroundtasksLock.lock();
try {
// only stop if it was running
if (heartBeatTimer != null) {
heartBeatTimer.cancel();
heartBeatTimer = null;
}
} finally {
heartbeatLock.unlock();
backgroundtasksLock.unlock();
}
}

private void stopAsyncExecutorIfRunning() {
backgroundtasksLock.lock();
try {
// only stop if it was running
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
Log.warn("DataSource [{0}] could not terminate executor {1}", name, executor);
}
} catch (InterruptedException ie) {
Log.warn("DataSource [{0}] could not terminate executor {1}", name, executor, ie);
}
executor = null;
}
} finally {
backgroundtasksLock.unlock();
}
}

Expand Down Expand Up @@ -785,6 +821,55 @@ public PoolStatus status(boolean reset) {
return queue.status(reset);
}

/**
* Builds a closer, that closes the connection fully async, if the executor is present.
*/
private Future<?> buildAsyncCloser(PooledConnection pc, boolean logErrors) {
backgroundtasksLock.lock();
try {
if (executor != null) {
Runnable task = new Runnable() {
@Override
public void run() {
pc.doCloseConnectionFully(logErrors);
}

@Override
public String toString() {
return pc.toString();
}
};
return executor.submit(task);
}
} finally {
backgroundtasksLock.unlock();
}
return null;
}

/**
* Tries to close the pc in an async thread. The method waits up to 5 seconds and returns true,
* if connection was closed in this time.
* <p>
* If the connection could not be closed within 5 seconds,
*/
void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) {
Future<?> asyncCloser = buildAsyncCloser(pc, logErrors);
if (asyncCloser == null) {
Log.info("Closing {0} in current thread", pc);
pc.doCloseConnectionFully(logErrors);
} else {
Log.trace("Closing {0} async", pc);
try {
asyncCloser.get(5, TimeUnit.SECONDS);
} catch (TimeoutException te) {
Log.warn("Timeout while async closing {0}", pc);
} catch (Exception e) {
Log.error("Unexpected error while async closing {0}", pc, e);
}
}
}

static final class Status implements PoolStatus {

private final int minSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ String fullDescription() {
* @param logErrors if false then don't log errors when closing
*/
void closeConnectionFully(boolean logErrors) {
pool.closeConnectionFullyAsync(this, logErrors);
}

/**
* This method should be executed only by pool.closeConnectionFullyAsync
*/
void doCloseConnectionFully(boolean logErrors) {
if (Log.isLoggable(System.Logger.Level.TRACE)) {
Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.ebean.datasource.pool;

import io.ebean.datasource.DataSourceBuilder;
import io.ebean.datasource.DataSourcePool;
import org.h2.jdbc.JdbcConnection;
import org.junit.jupiter.api.Test;

import java.sql.Connection;

public class ConnectionPoolHangUpTest {

@Test
void testHoldLockOnObject() throws Exception {
DataSourcePool pool = DataSourceBuilder.create()
.url("jdbc:h2:mem:testConnectionPoolHangUp")
.username("sa")
.password("sa")
.heartbeatFreqSecs(1)
.minConnections(1)
.maxConnections(1)
.trimPoolFreqSecs(1)
.heartbeatMaxPoolExhaustedCount(0)
.failOnStart(false)
.build();
try {
Connection conn = pool.getConnection();
Thread t = new Thread(() -> {
try {
JdbcConnection h2Conn = conn.unwrap(JdbcConnection.class);
synchronized (h2Conn) {
Thread.sleep(300000);
}
} catch (Exception e) {
// nop
}
});
t.setDaemon(true);
t.start();
} finally {
pool.shutdown();
}
}

}
Loading