@@ -74,27 +74,30 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
7474 throw new IllegalStateException ("Executor of " + schedulerName + " is already stopped" );
7575 }
7676
77- Future <TriggerKey > result ;
78- synchronized (runningTasks ) {
79- result = executor .submit (() -> runTrigger (trigger ));
80- runningTasks .put (trigger , result );
77+ try {
78+ runningTasks .put (trigger , executor .submit (() -> runTrigger (trigger )));
79+ return runningTasks .get (trigger );
80+ } catch (Exception e ) {
81+ runningTasks .remove (trigger );
82+ throw new RuntimeException ("Failed to run " + trigger .getKey (), e );
8183 }
8284
83- return result ;
8485 }
8586
8687 private TriggerKey runTrigger (TriggerEntity trigger ) {
8788 try {
8889 triggerService .run (trigger );
8990 return trigger .getKey ();
9091 } finally {
91- runningTasks .remove (trigger );
92+ if (runningTasks .remove (trigger ) == null ) {
93+ log .error ("Failed to remove trigger with {}" , trigger .key ());
94+ }
9295 }
9396 }
9497
9598 public void start () {
96- if ( stopped . compareAndExchange ( true , false ) ) {
97- synchronized ( runningTasks ) {
99+ synchronized ( runningTasks ) {
100+ if ( stopped . compareAndExchange ( true , false ) ) {
98101 runningTasks .clear ();
99102 executor = Executors .newFixedThreadPool (maxThreads .get ());
100103 log .info ("Started {} with {} threads." , schedulerName , maxThreads .get ());
@@ -105,43 +108,41 @@ public void start() {
105108 @ Override
106109 public void close () {
107110 if (stopped .compareAndExchange (false , true )) {
111+ ExecutorService executorRef ;
108112 synchronized (runningTasks ) {
109- doShutdown ();
113+ executorRef = executor ;
114+ executor = null ;
110115 }
111- }
112- }
113-
114- private void doShutdown () {
115- if (executor != null ) {
116- executor .shutdown ();
117- if (runningTasks .size () > 0 ) {
118- log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
119- maxShutdownWaitTime );
120-
121- try {
122- executor .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
123- } catch (InterruptedException e ) {
124- log .warn ("Failed to complete runnings tasks." , e .getCause ());
125- shutdownNow ();
126- } finally {
127- executor = null ;
128- runningTasks .clear ();
116+
117+ if (executorRef != null ) {
118+ executorRef .shutdown ();
119+ if (runningTasks .size () > 0 ) {
120+ log .info ("Shutdown {} with {} running tasks, waiting for {}." , schedulerName , runningTasks .size (),
121+ maxShutdownWaitTime );
122+
123+ try {
124+ executorRef .awaitTermination (maxShutdownWaitTime .getSeconds (), TimeUnit .SECONDS );
125+ } catch (InterruptedException e ) {
126+ Thread .currentThread ().interrupt ();
127+ log .warn ("Failed to complete runnings tasks." , e .getCause () == null ? e : e .getCause ());
128+ shutdownNow ();
129+ } finally {
130+ executorRef = null ;
131+ runningTasks .clear ();
132+ }
129133 }
130- } else {
131- executor = null ;
132134 }
133135 }
134136 }
135137
136138 public void shutdownNow () {
137- if (stopped .compareAndExchange (false , true )) {
138- synchronized (runningTasks ) {
139- if (executor != null ) {
140- executor .shutdownNow ();
141- log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
142- runningTasks .clear ();
143- executor = null ;
144- }
139+ synchronized (runningTasks ) {
140+ if (executor != null ) {
141+ executor .shutdownNow ();
142+ log .info ("Force stop {} with {} running tasks" , schedulerName , runningTasks .size ());
143+ runningTasks .clear ();
144+ executor = null ;
145+ stopped .set (true );
145146 }
146147 }
147148 }
@@ -178,6 +179,6 @@ public int getMaxThreads() {
178179 }
179180
180181 public boolean isRunning (TriggerEntity trigger ) {
181- return runningTasks .contains (trigger );
182+ return runningTasks .containsKey (trigger );
182183 }
183184}
0 commit comments