11package org .sterl .spring .persistent_tasks .scheduler .component ;
22
3- import java .io .Closeable ;
43import java .time .Duration ;
54import java .util .ArrayList ;
65import java .util .Collection ;
3534 * </p>
3635 */
3736@ Slf4j
38- public class TaskExecutorComponent implements Closeable {
37+ public class TaskExecutorComponent {
3938
4039 private final String schedulerName ;
4140 private final TriggerService triggerService ;
@@ -49,16 +48,12 @@ public class TaskExecutorComponent implements Closeable {
4948 private final AtomicBoolean stopped = new AtomicBoolean (true );
5049 private final Lock lock = new ReentrantLock (true );
5150
52- public TaskExecutorComponent (
53- String schedulerName ,
54- TriggerService triggerService ,
55- SchedulerThreadFactory threadFactory ,
56- int maxThreads ) {
51+ public TaskExecutorComponent (String schedulerName , TriggerService triggerService ,
52+ SchedulerThreadFactory threadFactory , int maxThreads ) {
5753 super ();
5854 this .schedulerName = schedulerName ;
5955 this .triggerService = triggerService ;
6056 this .maxThreads .set (maxThreads );
61- this .start ();
6257 }
6358
6459 @ NonNull
@@ -78,8 +73,8 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
7873 if (trigger == null ) {
7974 return CompletableFuture .completedFuture (null );
8075 }
76+ lock .lock ();
8177 assertStarted ();
82- // lock.lock();
8378 try {
8479 var result = executor .submit (() -> runTrigger (trigger ));
8580 runningTasks .put (trigger , result );
@@ -88,7 +83,7 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
8883 runningTasks .remove (trigger );
8984 throw new RuntimeException ("Failed to run " + trigger .getKey (), e );
9085 } finally {
91- // lock.unlock();
86+ lock .unlock ();
9287 }
9388
9489 }
@@ -104,60 +99,72 @@ private TriggerKey runTrigger(TriggerEntity trigger) {
10499 triggerService .run (trigger );
105100 return trigger .getKey ();
106101 } finally {
107- // lock.lock();
102+ lock .lock ();
108103 try {
109104 if (runningTasks .remove (trigger ) == null && runningTasks .size () > 0 ) {
110105 var runningKeys = runningTasks .keySet ().stream ().map (TriggerEntity ::key ).toList ();
111106 log .error ("Failed to remove trigger with {} - {}" , trigger .key (), runningKeys );
112107 }
113108 } finally {
114- // lock.unlock();
109+ lock .unlock ();
115110 }
116111 }
117112 }
118113
119114 public void start () {
120115 if (stopped .compareAndExchange (true , false )) {
121- runningTasks .clear ();
122- executor = new ThreadPoolExecutor (
123- 1 , this .maxThreads .get (),
124- 60L , TimeUnit .SECONDS ,
125- new LinkedBlockingQueue <Runnable >());
126- log .info ("Started {} with {} threads." , schedulerName , maxThreads .get ());
127- stopped .set (false );
116+ lock .lock ();
117+ try {
118+ runningTasks .clear ();
119+ executor = new ThreadPoolExecutor (1 , this .maxThreads .get (), 60L , TimeUnit .SECONDS ,
120+ new LinkedBlockingQueue <Runnable >());
121+ log .info ("Started {} with {} threads." , schedulerName , maxThreads .get ());
122+ stopped .set (false );
123+ } finally {
124+ lock .unlock ();
125+ }
128126 }
129127 }
130128
131- @ Override
132- public void close () {
129+ public void shutdown () {
133130 stopped .set (true );
134131 if (executor != null ) {
135- executor .shutdown ();
136- log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
137- maxShutdownWaitTime );
138-
139- if (runningTasks .size () > 0 ) {
140- try {
141- executor .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
142- } catch (InterruptedException e ) {
143- Thread .currentThread ().interrupt ();
144- log .warn ("Failed to complete runnings tasks." , e .getCause () == null ? e : e .getCause ());
145- shutdownNow ();
146- } finally {
147- executor = null ;
148- runningTasks .clear ();
132+ lock .lock ();
133+ try {
134+ executor .shutdown ();
135+ if (runningTasks .size () > 0 ) {
136+ log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
137+ maxShutdownWaitTime );
138+ try {
139+ executor .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
140+ } catch (InterruptedException e ) {
141+ Thread .currentThread ().interrupt ();
142+ log .warn ("Failed to complete runnings tasks." , e .getCause () == null ? e : e .getCause ());
143+ shutdownNow ();
144+ }
145+ } else {
146+ log .info ("Shutdown {} with." , schedulerName );
149147 }
148+ } finally {
149+ executor = null ;
150+ runningTasks .clear ();
151+ lock .unlock ();
150152 }
151153 }
152154 }
153155
154156 public void shutdownNow () {
155157 stopped .set (true );
156158 if (executor != null ) {
157- executor .shutdownNow ();
158- log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
159- runningTasks .clear ();
160- executor = null ;
159+ lock .lock ();
160+ try {
161+ executor .shutdownNow ();
162+ log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
163+ } finally {
164+ runningTasks .clear ();
165+ executor = null ;
166+ lock .unlock ();
167+ }
161168 }
162169 }
163170
@@ -166,8 +173,7 @@ public int getFreeThreads() {
166173 return 0 ;
167174 }
168175 if (maxThreads .get () - runningTasks .size () < 0 ) {
169- log .warn ("Already {} running tasks, more than threads {} in pool." ,
170- runningTasks .size (), maxThreads .get ());
176+ log .warn ("Already {} running tasks, more than threads {} in pool." , runningTasks .size (), maxThreads .get ());
171177 }
172178 return Math .max (maxThreads .get () - runningTasks .size (), 0 );
173179 }
@@ -181,8 +187,7 @@ public Collection<Future<TriggerKey>> getRunningTasks() {
181187 }
182188
183189 public List <TriggerEntity > getRunningTriggers () {
184- var doneAndNotRemovedFutures = this .runningTasks .entrySet ().stream ()
185- .filter (e -> e .getValue ().isDone ())
190+ var doneAndNotRemovedFutures = this .runningTasks .entrySet ().stream ().filter (e -> e .getValue ().isDone ())
186191 .toList ();
187192
188193 if (doneAndNotRemovedFutures .size () > 0 ) {
0 commit comments