66import java .io .PrintWriter ;
77import java .sql .*;
88import java .util .*;
9+ import java .util .concurrent .ExecutionException ;
10+ import java .util .concurrent .ExecutorService ;
11+ import java .util .concurrent .Executors ;
12+ import java .util .concurrent .Future ;
13+ import java .util .concurrent .TimeUnit ;
14+ import java .util .concurrent .TimeoutException ;
915import java .util .concurrent .atomic .AtomicBoolean ;
1016import java .util .concurrent .atomic .AtomicInteger ;
1117import java .util .concurrent .atomic .LongAdder ;
2632final class ConnectionPool implements DataSourcePool {
2733
2834 private static final String APPLICATION_NAME = "ApplicationName" ;
29- private final ReentrantLock heartbeatLock = new ReentrantLock (false );
35+ private final ReentrantLock backgroundtasksLock = new ReentrantLock (false );
3036 private final ReentrantLock notifyLock = new ReentrantLock (false );
3137 /**
3238 * The name given to this dataSource.
@@ -94,6 +100,8 @@ final class ConnectionPool implements DataSourcePool {
94100 private final boolean shutdownOnJvmExit ;
95101 private Thread shutdownHook ;
96102
103+ private ExecutorService executor ;
104+
97105 ConnectionPool (String name , DataSourceConfig params ) {
98106 this .config = params ;
99107 this .name = name ;
@@ -192,7 +200,7 @@ private void initialiseConnections() throws SQLException {
192200 } else {
193201 tryEnsureMinimumConnections ();
194202 }
195- startHeartBeatIfStopped ();
203+ startBackgroundActionsIfStopped ();
196204
197205 if (shutdownOnJvmExit && shutdownHook == null ) {
198206 shutdownHook = new Thread (() -> shutdownPool (true , true ));
@@ -325,7 +333,7 @@ private void notifyUp() {
325333 // check such that we only notify once
326334 if (!dataSourceUp .get ()) {
327335 dataSourceUp .set (true );
328- startHeartBeatIfStopped ();
336+ startBackgroundActionsIfStopped ();
329337 dataSourceDownReason = null ;
330338 Log .error ("RESOLVED FATAL: DataSource [" + name + "] is back up!" );
331339 if (notify != null ) {
@@ -648,6 +656,10 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
648656 stopHeartBeatIfRunning ();
649657 PoolStatus status = queue .shutdown (closeBusyConnections );
650658 dataSourceUp .set (false );
659+
660+ // we must stop the executor after queue.shutdown
661+ stopAsyncExecutorIfRunning ();
662+
651663 if (fromHook ) {
652664 Log .info ("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]" , name , status , pscHit , pscMiss , pscPut , pscRem );
653665 } else {
@@ -656,6 +668,7 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
656668 }
657669 }
658670
671+
659672 private void removeShutdownHook () {
660673 if (shutdownHook != null ) {
661674 try {
@@ -684,8 +697,8 @@ public boolean isDataSourceUp() {
684697 return dataSourceUp .get ();
685698 }
686699
687- private void startHeartBeatIfStopped () {
688- heartbeatLock .lock ();
700+ private void startBackgroundActionsIfStopped () {
701+ backgroundtasksLock .lock ();
689702 try {
690703 // only start if it is not already running
691704 if (heartBeatTimer == null ) {
@@ -695,21 +708,44 @@ private void startHeartBeatIfStopped() {
695708 heartBeatTimer .scheduleAtFixedRate (new HeartBeatRunnable (), freqMillis , freqMillis );
696709 }
697710 }
711+ if (executor == null ) {
712+ this .executor = Executors .newCachedThreadPool ();
713+ }
698714 } finally {
699- heartbeatLock .unlock ();
715+ backgroundtasksLock .unlock ();
700716 }
701717 }
702718
703719 private void stopHeartBeatIfRunning () {
704- heartbeatLock .lock ();
720+ backgroundtasksLock .lock ();
705721 try {
706722 // only stop if it was running
707723 if (heartBeatTimer != null ) {
708724 heartBeatTimer .cancel ();
709725 heartBeatTimer = null ;
710726 }
711727 } finally {
712- heartbeatLock .unlock ();
728+ backgroundtasksLock .unlock ();
729+ }
730+ }
731+
732+ private void stopAsyncExecutorIfRunning () {
733+ backgroundtasksLock .lock ();
734+ try {
735+ // only stop if it was running
736+ if (executor != null ) {
737+ executor .shutdown ();
738+ try {
739+ if (!executor .awaitTermination (5 , TimeUnit .SECONDS )) {
740+ Log .warn ("DataSource [{0}] could not terminate executor {1}" , name , executor );
741+ }
742+ } catch (InterruptedException ie ) {
743+ Log .warn ("DataSource [{0}] could not terminate executor {1}" , name , executor , ie );
744+ }
745+ executor = null ;
746+ }
747+ } finally {
748+ backgroundtasksLock .unlock ();
713749 }
714750 }
715751
@@ -785,6 +821,55 @@ public PoolStatus status(boolean reset) {
785821 return queue .status (reset );
786822 }
787823
824+ /**
825+ * Builds a closer, that closes the connection fully async, if the executor is present.
826+ */
827+ private Future <?> buildAsyncCloser (PooledConnection pc , boolean logErrors ) {
828+ backgroundtasksLock .lock ();
829+ try {
830+ if (executor != null ) {
831+ Runnable task = new Runnable () {
832+ @ Override
833+ public void run () {
834+ pc .doCloseConnectionFully (logErrors );
835+ }
836+
837+ @ Override
838+ public String toString () {
839+ return pc .toString ();
840+ }
841+ };
842+ return executor .submit (task );
843+ }
844+ } finally {
845+ backgroundtasksLock .unlock ();
846+ }
847+ return null ;
848+ }
849+
850+ /**
851+ * Tries to close the pc in an async thread. The method waits up to 5 seconds and returns true,
852+ * if connection was closed in this time.
853+ * <p>
854+ * If the connection could not be closed within 5 seconds,
855+ */
856+ void closeConnectionFullyAsync (PooledConnection pc , boolean logErrors ) {
857+ Future <?> asyncCloser = buildAsyncCloser (pc , logErrors );
858+ if (asyncCloser == null ) {
859+ Log .info ("Closing {0} in current thread" , pc );
860+ pc .doCloseConnectionFully (logErrors );
861+ } else {
862+ Log .trace ("Closing {0} async" , pc );
863+ try {
864+ asyncCloser .get (5 , TimeUnit .SECONDS );
865+ } catch (TimeoutException te ) {
866+ Log .warn ("Timeout while async closing {0}" , pc );
867+ } catch (Exception e ) {
868+ Log .error ("Unexpected error while async closing {0}" , pc , e );
869+ }
870+ }
871+ }
872+
788873 static final class Status implements PoolStatus {
789874
790875 private final int minSize ;
0 commit comments