Skip to content

Commit ddbe1b2

Browse files
committed
Close connections asyncroniously
1 parent 74978ae commit ddbe1b2

File tree

3 files changed

+201
-33
lines changed

3 files changed

+201
-33
lines changed

ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import java.io.PrintWriter;
77
import java.sql.*;
88
import java.util.*;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
911
import java.util.concurrent.atomic.AtomicBoolean;
1012
import java.util.concurrent.atomic.AtomicInteger;
1113
import java.util.concurrent.atomic.LongAdder;
@@ -27,6 +29,7 @@ final class ConnectionPool implements DataSourcePool {
2729

2830
private static final String APPLICATION_NAME = "ApplicationName";
2931
private final ReentrantLock heartbeatLock = new ReentrantLock(false);
32+
private final ReentrantLock executorLock = new ReentrantLock(false);
3033
private final ReentrantLock notifyLock = new ReentrantLock(false);
3134
/**
3235
* The name given to this dataSource.
@@ -81,6 +84,8 @@ final class ConnectionPool implements DataSourcePool {
8184
private final PooledConnectionQueue queue;
8285
private Timer heartBeatTimer;
8386
private int heartbeatPoolExhaustedCount;
87+
private ExecutorService executor;
88+
8489
/**
8590
* Used to find and close() leaked connections. Leaked connections are
8691
* thought to be busy but have not been used for some time. Each time a
@@ -189,6 +194,7 @@ private void tryEnsureMinimumConnections() {
189194
private void initialiseConnections() throws SQLException {
190195
long start = System.currentTimeMillis();
191196
dataSourceUp.set(true);
197+
startExecutor();
192198
if (failOnStart) {
193199
queue.ensureMinimumConnections();
194200
} else {
@@ -327,6 +333,7 @@ private void notifyUp() {
327333
// check such that we only notify once
328334
if (!dataSourceUp.get()) {
329335
dataSourceUp.set(true);
336+
startExecutor();
330337
startHeartBeatIfStopped();
331338
dataSourceDownReason = null;
332339
Log.error("RESOLVED FATAL: DataSource [" + name + "] is back up!");
@@ -660,6 +667,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
660667
stopHeartBeatIfRunning();
661668
PoolStatus status = queue.shutdown(closeBusyConnections);
662669
dataSourceUp.set(false);
670+
stopExecutor();
663671
if (fromHook) {
664672
Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
665673
} else {
@@ -725,6 +733,44 @@ private void stopHeartBeatIfRunning() {
725733
}
726734
}
727735

736+
void execute(Runnable runable) {
737+
executorLock.lock();
738+
try {
739+
if (executor != null) {
740+
executor.submit(runable);
741+
return;
742+
}
743+
} finally {
744+
executorLock.unlock();
745+
}
746+
// it is possible, that we receive runnables after shutdown.
747+
// in this case, we will execute them immediately (outside lock)
748+
runable.run();
749+
}
750+
751+
private void startExecutor() {
752+
executorLock.lock();
753+
try {
754+
if (executor == null) {
755+
executor = Executors.newSingleThreadExecutor();
756+
}
757+
} finally {
758+
executorLock.unlock();
759+
}
760+
}
761+
762+
private void stopExecutor() {
763+
executorLock.lock();
764+
try {
765+
if (executor != null) {
766+
executor.shutdown();
767+
executor = null;
768+
}
769+
} finally {
770+
executorLock.unlock();
771+
}
772+
}
773+
728774
/**
729775
* Return the default autoCommit setting for the pool.
730776
*/

ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.sql.*;
44
import java.util.ArrayList;
55
import java.util.Map;
6+
import java.util.concurrent.TimeUnit;
67
import java.util.concurrent.locks.ReentrantLock;
78

89
/**
@@ -234,52 +235,75 @@ void closeConnectionFully(boolean logErrors) {
234235
if (Log.isLoggable(System.Logger.Level.TRACE)) {
235236
Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description());
236237
}
237-
if (pool != null) {
238-
pool.pstmtCacheMetrics(pstmtCache);
238+
if (pool == null) {
239+
return; // this can happen in tests only.
239240
}
241+
pool.pstmtCacheMetrics(pstmtCache);
242+
pool.execute(() -> doCloseConnection(logErrors));
243+
}
244+
245+
/**
246+
* this mehthod performs network IO and may block
247+
*/
248+
private void doCloseConnection(boolean logErrors) {
249+
long start = System.nanoTime();
240250
try {
241-
if (connection.isClosed()) {
242-
// Typically, the JDBC Driver has its own JVM shutdown hook and already
243-
// closed the connections in our DataSource pool so making this DEBUG level
244-
Log.trace("Closing Connection[{0}] that is already closed?", name);
245-
return;
251+
try {
252+
if (connection.isClosed()) {
253+
// Typically, the JDBC Driver has its own JVM shutdown hook and already
254+
// closed the connections in our DataSource pool so making this DEBUG level
255+
Log.trace("Closing Connection[{0}] that is already closed?", name);
256+
return;
257+
}
258+
} catch (SQLException ex) {
259+
if (logErrors) {
260+
Log.error("Error checking if connection [" + name + "] is closed", ex);
261+
}
246262
}
247-
} catch (SQLException ex) {
248-
if (logErrors) {
249-
Log.error("Error checking if connection [" + name + "] is closed", ex);
263+
try {
264+
clearPreparedStatementCache();
265+
} catch (SQLException ex) {
266+
if (logErrors) {
267+
Log.warn("Error when closing connection Statements", ex);
268+
}
269+
}
270+
try {
271+
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
272+
// so try to do a rollback.
273+
if (!connection.getAutoCommit()) {
274+
connection.rollback();
275+
}
276+
} catch (SQLException ex) {
277+
if (logErrors) {
278+
Log.warn("Could not perform rollback", ex);
279+
}
280+
}
281+
try {
282+
connection.close();
283+
pool.dec();
284+
} catch (SQLException ex) {
285+
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
286+
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
287+
}
288+
}
289+
} finally {
290+
long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
291+
if (millis > 500) {
292+
Log.warn("Closing connection [" + fullDescription() + "] took an unexpected long time of " + millis + " ms");
250293
}
251294
}
295+
}
296+
297+
void clearPreparedStatementCache() throws SQLException {
252298
lock.lock();
253299
try {
254300
for (ExtendedPreparedStatement ps : pstmtCache.values()) {
255301
ps.closeDestroy();
256302
}
257-
} catch (SQLException ex) {
258-
if (logErrors) {
259-
Log.warn("Error when closing connection Statements", ex);
260-
}
303+
261304
} finally {
262305
lock.unlock();
263306
}
264-
try {
265-
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
266-
// so try to do a rollback.
267-
if (!connection.getAutoCommit()) {
268-
connection.rollback();
269-
}
270-
} catch (SQLException ex) {
271-
if (logErrors) {
272-
Log.warn("Could not perform rollback", ex);
273-
}
274-
}
275-
try {
276-
connection.close();
277-
pool.dec();
278-
} catch (SQLException ex) {
279-
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
280-
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
281-
}
282-
}
283307
}
284308

285309
/**
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.ebean.datasource.pool;
2+
3+
import io.ebean.datasource.DataSourceConfig;
4+
import org.junit.jupiter.api.AfterEach;
5+
import org.junit.jupiter.api.Test;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.sql.Connection;
10+
import java.util.Random;
11+
12+
import static org.assertj.core.api.Assertions.assertThat;
13+
14+
public class ConnectionPoolBlockTest {
15+
16+
private final Logger logger = LoggerFactory.getLogger(ConnectionPoolBlockTest.class);
17+
18+
private final ConnectionPool pool;
19+
20+
private final Random random = new Random();
21+
22+
private int total;
23+
24+
ConnectionPoolBlockTest() {
25+
pool = createPool();
26+
}
27+
28+
private ConnectionPool createPool() {
29+
30+
DataSourceConfig config = new DataSourceConfig();
31+
config.setDriver("org.h2.Driver");
32+
config.setUrl("jdbc:h2:mem:tests");
33+
config.setUsername("sa");
34+
config.setPassword("");
35+
config.setMinConnections(1);
36+
config.setMaxConnections(100);
37+
config.setAutoCommit(false);
38+
config.setTrimPoolFreqSecs(1);
39+
config.setMaxInactiveTimeSecs(1);
40+
config.setHeartbeatFreqSecs(1);
41+
config.enforceCleanClose(true);
42+
return new ConnectionPool("testblock", config);
43+
}
44+
45+
@AfterEach
46+
public void after() throws InterruptedException {
47+
pool.shutdown();
48+
}
49+
50+
/**
51+
* Yes, this code does some strange things to simulate a blocking close.
52+
*
53+
* 1. It does not commit/rollback the pooledConnection
54+
* 2. because of `enforceCleanClose = true`, the pool tries to close the
55+
* underlying H2 console.
56+
* 3. another thread holds a synchronized-lock on that H2 connection,
57+
* so the pool cannot close that connection quickly!
58+
*
59+
* This can happen on network outage, because close may send TCP/IP packet
60+
* which can slow down the pool whenever IO is done during the pool lock.
61+
*/
62+
public void blockPool() {
63+
try (Connection conn = pool.getConnection()) {
64+
// we close the underlying h2 connection and start a thread that holds
65+
// synchronized lock on the h2 connection.
66+
new Thread(() -> {
67+
try {
68+
Connection h2Conn = conn.unwrap(org.h2.jdbc.JdbcConnection.class);
69+
synchronized (h2Conn) {
70+
Thread.sleep(1000);
71+
}
72+
} catch (Exception e) {
73+
e.printStackTrace();
74+
}
75+
76+
}).start();
77+
Thread.sleep(50);
78+
} catch (AssertionError e) {
79+
// expected
80+
} catch (Exception e) {
81+
e.printStackTrace();
82+
}
83+
}
84+
85+
@Test
86+
public void test() throws Exception {
87+
88+
new Thread(this::blockPool).start();
89+
// wait for warmup
90+
Thread.sleep(100);
91+
92+
long start = System.currentTimeMillis();
93+
try (Connection conn = pool.getConnection()) {
94+
conn.rollback();
95+
}
96+
assertThat(System.currentTimeMillis() - start).isLessThan(100);
97+
}
98+
}

0 commit comments

Comments
 (0)