66import java .io .PrintWriter ;
77import java .sql .*;
88import java .util .*;
9+ import java .util .concurrent .ExecutionException ;
10+ import java .util .concurrent .ExecutorService ;
11+ import java .util .concurrent .Executors ;
12+ import java .util .concurrent .TimeUnit ;
13+ import java .util .concurrent .TimeoutException ;
914import java .util .concurrent .atomic .AtomicBoolean ;
1015import java .util .concurrent .atomic .AtomicInteger ;
1116import java .util .concurrent .atomic .LongAdder ;
2631final class ConnectionPool implements DataSourcePool {
2732
2833 private static final String APPLICATION_NAME = "ApplicationName" ;
29- private final ReentrantLock heartbeatLock = new ReentrantLock (false );
34+ private final ReentrantLock backgroundtasksLock = new ReentrantLock (false );
3035 private final ReentrantLock notifyLock = new ReentrantLock (false );
3136 /**
3237 * The name given to this dataSource.
@@ -94,6 +99,8 @@ final class ConnectionPool implements DataSourcePool {
9499 private final boolean shutdownOnJvmExit ;
95100 private Thread shutdownHook ;
96101
102+ private ExecutorService executor ;
103+
97104 ConnectionPool (String name , DataSourceConfig params ) {
98105 this .config = params ;
99106 this .name = name ;
@@ -192,7 +199,7 @@ private void initialiseConnections() throws SQLException {
192199 } else {
193200 tryEnsureMinimumConnections ();
194201 }
195- startHeartBeatIfStopped ();
202+ startBackgroundActionsIfStopped ();
196203
197204 if (shutdownOnJvmExit && shutdownHook == null ) {
198205 shutdownHook = new Thread (() -> shutdownPool (true , true ));
@@ -325,7 +332,7 @@ private void notifyUp() {
325332 // check such that we only notify once
326333 if (!dataSourceUp .get ()) {
327334 dataSourceUp .set (true );
328- startHeartBeatIfStopped ();
335+ startBackgroundActionsIfStopped ();
329336 dataSourceDownReason = null ;
330337 Log .error ("RESOLVED FATAL: DataSource [" + name + "] is back up!" );
331338 if (notify != null ) {
@@ -648,6 +655,10 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
648655 stopHeartBeatIfRunning ();
649656 PoolStatus status = queue .shutdown (closeBusyConnections );
650657 dataSourceUp .set (false );
658+
659+ // we must stop the executor after queue.shutdown
660+ stopAsyncExecutorIfRunning ();
661+
651662 if (fromHook ) {
652663 Log .info ("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]" , name , status , pscHit , pscMiss , pscPut , pscRem );
653664 } else {
@@ -656,6 +667,8 @@ private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
656667 }
657668 }
658669
670+
671+
659672 private void removeShutdownHook () {
660673 if (shutdownHook != null ) {
661674 try {
@@ -684,8 +697,8 @@ public boolean isDataSourceUp() {
684697 return dataSourceUp .get ();
685698 }
686699
687- private void startHeartBeatIfStopped () {
688- heartbeatLock .lock ();
700+ private void startBackgroundActionsIfStopped () {
701+ backgroundtasksLock .lock ();
689702 try {
690703 // only start if it is not already running
691704 if (heartBeatTimer == null ) {
@@ -695,21 +708,44 @@ private void startHeartBeatIfStopped() {
695708 heartBeatTimer .scheduleAtFixedRate (new HeartBeatRunnable (), freqMillis , freqMillis );
696709 }
697710 }
711+ if (executor == null ) {
712+ this .executor = Executors .newCachedThreadPool ();
713+ }
698714 } finally {
699- heartbeatLock .unlock ();
715+ backgroundtasksLock .unlock ();
700716 }
701717 }
702718
703719 private void stopHeartBeatIfRunning () {
704- heartbeatLock .lock ();
720+ backgroundtasksLock .lock ();
705721 try {
706722 // only stop if it was running
707723 if (heartBeatTimer != null ) {
708724 heartBeatTimer .cancel ();
709725 heartBeatTimer = null ;
710726 }
711727 } finally {
712- heartbeatLock .unlock ();
728+ backgroundtasksLock .unlock ();
729+ }
730+ }
731+
732+ private void stopAsyncExecutorIfRunning () {
733+ backgroundtasksLock .lock ();
734+ try {
735+ // only stop if it was running
736+ if (executor != null ) {
737+ executor .shutdown ();
738+ try {
739+ if (!executor .awaitTermination (5 , TimeUnit .SECONDS )) {
740+ Log .warn ("DataSource [{0}] could not terminate executor {1}" , name , executor );
741+ }
742+ } catch (InterruptedException ie ) {
743+ Log .warn ("DataSource [{0}] could not terminate executor {1}" , name , executor , ie );
744+ }
745+ executor = null ;
746+ }
747+ } finally {
748+ backgroundtasksLock .unlock ();
713749 }
714750 }
715751
@@ -785,6 +821,34 @@ public PoolStatus status(boolean reset) {
785821 return queue .status (reset );
786822 }
787823
824+ /**
825+ * Executes the runnable asynchronously. The default grace period is 5 seconds.
826+ *
827+ * @param runnable
828+ * @return true, if the runnable is finished in 5 seconds
829+ */
830+ boolean execute (Runnable runnable ) {
831+ backgroundtasksLock .lock ();
832+ try {
833+ if (executor != null ) {
834+ executor .submit (runnable ).get (5 , TimeUnit .SECONDS );
835+ return true ;
836+ }
837+ } catch (InterruptedException | TimeoutException e ) {
838+ return false ;
839+ } catch (ExecutionException e ) {
840+ if (e .getCause () instanceof RuntimeException ) {
841+ throw (RuntimeException ) e .getCause ();
842+ }
843+ throw new RuntimeException (e .getCause ());
844+ } finally {
845+ backgroundtasksLock .unlock ();
846+ }
847+ // execute in sync
848+ runnable .run ();
849+ return true ;
850+ }
851+
788852 static final class Status implements PoolStatus {
789853
790854 private final int minSize ;
0 commit comments