88import java .util .List ;
99import java .util .concurrent .CompletableFuture ;
1010import java .util .concurrent .ConcurrentHashMap ;
11- import java .util .concurrent .ExecutorService ;
12- import java .util .concurrent .Executors ;
1311import java .util .concurrent .Future ;
12+ import java .util .concurrent .LinkedBlockingQueue ;
13+ import java .util .concurrent .ThreadPoolExecutor ;
1414import java .util .concurrent .TimeUnit ;
1515import java .util .concurrent .atomic .AtomicBoolean ;
1616import java .util .concurrent .atomic .AtomicInteger ;
17+ import java .util .concurrent .locks .Lock ;
18+ import java .util .concurrent .locks .ReentrantLock ;
1719
1820import org .springframework .lang .NonNull ;
1921import org .springframework .lang .Nullable ;
@@ -41,10 +43,10 @@ public class TaskExecutorComponent implements Closeable {
4143 @ Setter
4244 private Duration maxShutdownWaitTime = Duration .ofSeconds (10 );
4345 @ Nullable
44- private ExecutorService executor ;
45- // also the LOCK object ...
46+ private ThreadPoolExecutor executor ;
4647 private final ConcurrentHashMap <TriggerEntity , Future <TriggerKey >> runningTasks = new ConcurrentHashMap <>();
4748 private final AtomicBoolean stopped = new AtomicBoolean (true );
49+ private final Lock lock = new ReentrantLock (true );
4850
4951 public TaskExecutorComponent (String schedulerName , TriggerService triggerService , int maxThreads ) {
5052 super ();
@@ -72,15 +74,16 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
7274 }
7375 assertStarted ();
7476 try {
75- synchronized (runningTasks ) {
76- assertStarted ();
77- var result = executor .submit (() -> runTrigger (trigger ));
78- runningTasks .put (trigger , result );
79- return result ;
80- }
77+ assertStarted ();
78+ lock .lock ();
79+ var result = executor .submit (() -> runTrigger (trigger ));
80+ runningTasks .put (trigger , result );
81+ return result ;
8182 } catch (Exception e ) {
8283 runningTasks .remove (trigger );
8384 throw new RuntimeException ("Failed to run " + trigger .getKey (), e );
85+ } finally {
86+ lock .unlock ();
8487 }
8588
8689 }
@@ -96,61 +99,59 @@ private TriggerKey runTrigger(TriggerEntity trigger) {
9699 triggerService .run (trigger );
97100 return trigger .getKey ();
98101 } finally {
99- synchronized (runningTasks ) {
102+ try {
103+ lock .lock ();
100104 if (runningTasks .remove (trigger ) == null && runningTasks .size () > 0 ) {
101105 var runningKeys = runningTasks .keySet ().stream ().map (TriggerEntity ::key );
102106 log .error ("Failed to remove trigger with {} - {}" , trigger .key (), runningKeys );
103107 }
108+ } finally {
109+ lock .unlock ();
104110 }
105111 }
106112 }
107113
108114 public void start () {
109- synchronized ( runningTasks ) {
110- if ( stopped . compareAndExchange ( true , false )) {
111- runningTasks . clear ();
112- executor = Executors . newFixedThreadPool ( maxThreads . get ());
113- log . info ( "Started {} with {} threads." , schedulerName , maxThreads . get ());
114- stopped . set ( false );
115- }
115+ if ( stopped . compareAndExchange ( true , false ) ) {
116+ runningTasks . clear ();
117+ executor = new ThreadPoolExecutor ( 1 , this . maxThreads . get (),
118+ 0L , TimeUnit . MILLISECONDS ,
119+ new LinkedBlockingQueue < Runnable > ());
120+ log . info ( "Started {} with {} threads." , schedulerName , maxThreads . get () );
121+ stopped . set ( false );
116122 }
117123 }
118124
119125 @ Override
120126 public void close () {
121127 stopped .set (true );
122- synchronized (runningTasks ) {
123- if (executor != null ) {
124- var execRef = executor ;
125- execRef .shutdown ();
126- log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
127- maxShutdownWaitTime );
128-
129- if (runningTasks .size () > 0 ) {
130- try {
131- execRef .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
132- } catch (InterruptedException e ) {
133- Thread .currentThread ().interrupt ();
134- log .warn ("Failed to complete runnings tasks." , e .getCause () == null ? e : e .getCause ());
135- shutdownNow ();
136- } finally {
137- executor = null ;
138- runningTasks .clear ();
139- }
128+ if (executor != null ) {
129+ executor .shutdown ();
130+ log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
131+ maxShutdownWaitTime );
132+
133+ if (runningTasks .size () > 0 ) {
134+ try {
135+ executor .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
136+ } catch (InterruptedException e ) {
137+ Thread .currentThread ().interrupt ();
138+ log .warn ("Failed to complete runnings tasks." , e .getCause () == null ? e : e .getCause ());
139+ shutdownNow ();
140+ } finally {
141+ executor = null ;
142+ runningTasks .clear ();
140143 }
141144 }
142145 }
143146 }
144147
145148 public void shutdownNow () {
146- synchronized (runningTasks ) {
147- if (executor != null ) {
148- executor .shutdownNow ();
149- log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
150- runningTasks .clear ();
151- executor = null ;
152- stopped .set (true );
153- }
149+ stopped .set (true );
150+ if (executor != null ) {
151+ executor .shutdownNow ();
152+ log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
153+ runningTasks .clear ();
154+ executor = null ;
154155 }
155156 }
156157
@@ -159,7 +160,7 @@ public int getFreeThreads() {
159160 return 0 ;
160161 }
161162 if (maxThreads .get () - runningTasks .size () < 0 ) {
162- log .warn ("Already {}" + runningTasks . size () + " running more than threads={} " ,
163+ log .warn ("Already {} running tasks, more than threads {} in pool. " ,
163164 runningTasks .size (), maxThreads .get ());
164165 }
165166 return Math .max (maxThreads .get () - runningTasks .size (), 0 );
@@ -178,7 +179,7 @@ public List<TriggerEntity> getRunningTriggers() {
178179 .toList ();
179180
180181 if (doneAndNotRemovedFutures .size () > 0 ) {
181- log .error ("Found still pending futures, maybe an issue, report a bug if so {}" ,
182+ log .warn ("Found still pending futures, maybe an issue, report a bug if so {}" ,
182183 doneAndNotRemovedFutures .stream ().map (e -> e .getKey ().getKey ()));
183184 for (var entry : doneAndNotRemovedFutures ) {
184185 runningTasks .remove (entry .getKey ());
0 commit comments