@@ -42,9 +42,10 @@ public class TaskExecutorComponent implements Closeable {
4242 private Duration maxShutdownWaitTime = Duration .ofSeconds (10 );
4343 @ Nullable
4444 private ExecutorService executor ;
45+ // also the LOCK object ...
4546 private final ConcurrentHashMap <TriggerEntity , Future <TriggerKey >> runningTasks = new ConcurrentHashMap <>();
4647 private final AtomicBoolean stopped = new AtomicBoolean (true );
47-
48+
4849 public TaskExecutorComponent (String schedulerName , TriggerService triggerService , int maxThreads ) {
4950 super ();
5051 this .schedulerName = schedulerName ;
@@ -54,7 +55,8 @@ public TaskExecutorComponent(String schedulerName, TriggerService triggerService
5455
5556 @ NonNull
5657 public List <Future <TriggerKey >> submit (List <TriggerEntity > trigger ) {
57- if (trigger == null || trigger .isEmpty ()) return Collections .emptyList ();
58+ if (trigger == null || trigger .isEmpty ())
59+ return Collections .emptyList ();
5860
5961 final List <Future <TriggerKey >> result = new ArrayList <>(trigger .size ());
6062 for (TriggerEntity triggerEntity : trigger ) {
@@ -71,9 +73,13 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
7173 if (stopped .get () || executor == null ) {
7274 throw new IllegalStateException ("Executor of " + schedulerName + " is already stopped" );
7375 }
74-
75- final var result = executor .submit (() -> runTrigger (trigger ));
76- runningTasks .put (trigger , result );
76+
77+ Future <TriggerKey > result ;
78+ synchronized (runningTasks ) {
79+ result = executor .submit (() -> runTrigger (trigger ));
80+ runningTasks .put (trigger , result );
81+ }
82+
7783 return result ;
7884 }
7985
@@ -88,7 +94,7 @@ private TriggerKey runTrigger(TriggerEntity trigger) {
8894
8995 public void start () {
9096 if (stopped .compareAndExchange (true , false )) {
91- synchronized (stopped ) {
97+ synchronized (runningTasks ) {
9298 runningTasks .clear ();
9399 executor = Executors .newFixedThreadPool (maxThreads .get ());
94100 log .info ("Started {} with {} threads." , schedulerName , maxThreads .get ());
@@ -99,7 +105,7 @@ public void start() {
99105 @ Override
100106 public void close () {
101107 if (stopped .compareAndExchange (false , true )) {
102- synchronized (stopped ) {
108+ synchronized (runningTasks ) {
103109 doShutdown ();
104110 }
105111 }
@@ -109,8 +115,8 @@ private void doShutdown() {
109115 if (executor != null ) {
110116 executor .shutdown ();
111117 if (runningTasks .size () > 0 ) {
112- log .info ("Shutdown {} with {} running tasks, waiting for {}." ,
113- schedulerName , runningTasks . size (), maxShutdownWaitTime );
118+ log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks . size (),
119+ maxShutdownWaitTime );
114120
115121 try {
116122 executor .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
@@ -129,8 +135,8 @@ private void doShutdown() {
129135
130136 public void shutdownNow () {
131137 if (stopped .compareAndExchange (false , true )) {
132- if ( executor != null ) {
133- synchronized (executor ) {
138+ synchronized ( runningTasks ) {
139+ if (executor != null ) {
134140 executor .shutdownNow ();
135141 log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
136142 runningTasks .clear ();
@@ -146,7 +152,7 @@ public int getFreeThreads() {
146152 }
147153 return Math .max (maxThreads .get () - runningTasks .size (), 0 );
148154 }
149-
155+
150156 public int countRunning () {
151157 return runningTasks .size ();
152158 }
@@ -158,14 +164,15 @@ public Collection<Future<TriggerKey>> getRunningTasks() {
158164 public boolean isStopped () {
159165 return stopped .get () || maxThreads .get () <= 0 ;
160166 }
161-
167+
162168 public List <TriggerEntity > getRunningTriggers () {
163169 return Collections .list (this .runningTasks .keys ());
164170 }
165171
166172 public void setMaxThreads (int value ) {
167173 this .maxThreads .set (value );
168174 }
175+
169176 public int getMaxThreads () {
170177 return isStopped () ? 0 : this .maxThreads .get ();
171178 }
0 commit comments