Skip to content

Commit 27d24ab

Browse files
committed
fixed Shutdown method and imroved test helpers
1 parent 875fd05 commit 27d24ab

File tree

6 files changed

+68
-36
lines changed

6 files changed

+68
-36
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void setMaxThreads(int value) {
6666
public void stop() {
6767
taskExecutor.close();
6868
editSchedulerStatus.offline(name);
69+
runOrQueue.clear();
6970
}
7071

7172
public void shutdownNow() {
@@ -76,8 +77,8 @@ public void shutdownNow() {
7677
}
7778

7879
public SchedulerEntity getScheduler() {
79-
var result = editSchedulerStatus.get(name);
80-
return result;
80+
return editSchedulerStatus.checkinToRegistry(name,
81+
taskExecutor.countRunning(), taskExecutor.getMaxThreads());
8182
}
8283

8384
public Optional<SchedulerEntity> findStatus(String name) {
@@ -136,7 +137,7 @@ public <T extends Serializable> TriggerKey runOrQueue(AddTriggerRequest<T> trigg
136137
}
137138

138139
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
139-
public void checkIfTrigerShouldRun(TriggerAddedEvent addedTrigger) {
140+
public void checkIfTriggerShouldRun(TriggerAddedEvent addedTrigger) {
140141
if (runOrQueue.checkIfTrigerShouldRun(addedTrigger.id())) {
141142
editSchedulerStatus.checkinToRegistry(name, taskExecutor.countRunning(), taskExecutor.getMaxThreads());
142143
}
@@ -165,8 +166,11 @@ public List<SchedulerEntity> listAll() {
165166
public Collection<Future<TriggerKey>> getRunning() {
166167
return taskExecutor.getRunningTasks();
167168
}
169+
public List<TriggerEntity> getRunningTriggers() {
170+
return taskExecutor.getRunningTriggers();
171+
}
168172
public boolean hasRunningTriggers() {
169-
return !taskExecutor.isStopped()
170-
&& (taskExecutor.countRunning() > 0 || runOrQueue.hasWaitingTriggers());
173+
return taskExecutor.countRunning() > 0 || runOrQueue.hasWaitingTriggers();
171174
}
175+
172176
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,8 @@ private void clearNotCreatedTriggers() {
8484
}
8585
}
8686
}
87+
88+
public void clear() {
89+
shouldRun.clear();
90+
}
8791
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -101,35 +101,35 @@ public void start() {
101101
runningTasks.clear();
102102
executor = Executors.newFixedThreadPool(maxThreads.get());
103103
log.info("Started {} with {} threads.", schedulerName, maxThreads.get());
104+
stopped.set(false);
104105
}
105106
}
106107
}
107108

108109
@Override
109110
public void close() {
110-
if (stopped.compareAndExchange(false, true)) {
111-
ExecutorService executorRef;
112-
synchronized (runningTasks) {
113-
executorRef = executor;
114-
executor = null;
115-
}
116-
117-
if (executorRef != null) {
118-
executorRef.shutdown();
119-
if (runningTasks.size() > 0) {
120-
log.info("Shutdown {} with {} running tasks, waiting for {}.", schedulerName, runningTasks.size(),
121-
maxShutdownWaitTime);
122-
123-
try {
124-
executorRef.awaitTermination(maxShutdownWaitTime.getSeconds(), TimeUnit.SECONDS);
125-
} catch (InterruptedException e) {
126-
Thread.currentThread().interrupt();
127-
log.warn("Failed to complete runnings tasks.", e.getCause() == null ? e : e.getCause());
128-
shutdownNow();
129-
} finally {
130-
executorRef = null;
131-
runningTasks.clear();
132-
}
111+
ExecutorService executorRef;
112+
synchronized (runningTasks) {
113+
executorRef = executor;
114+
executor = null;
115+
stopped.set(true);
116+
}
117+
118+
if (executorRef != null) {
119+
executorRef.shutdown();
120+
log.info("Shutdown {} with {} running tasks, waiting for {}.", schedulerName, runningTasks.size(),
121+
maxShutdownWaitTime);
122+
123+
if (runningTasks.size() > 0) {
124+
try {
125+
executorRef.awaitTermination(maxShutdownWaitTime.getSeconds(), TimeUnit.SECONDS);
126+
} catch (InterruptedException e) {
127+
Thread.currentThread().interrupt();
128+
log.warn("Failed to complete runnings tasks.", e.getCause() == null ? e : e.getCause());
129+
shutdownNow();
130+
} finally {
131+
executorRef = null;
132+
runningTasks.clear();
133133
}
134134
}
135135
}
@@ -151,6 +151,10 @@ public int getFreeThreads() {
151151
if (stopped.get()) {
152152
return 0;
153153
}
154+
if (maxThreads.get() - runningTasks.size() < 0) {
155+
log.warn("Already {}" + runningTasks.size() + " running more than threads={}",
156+
runningTasks.size(), maxThreads.get());
157+
}
154158
return Math.max(maxThreads.get() - runningTasks.size(), 0);
155159
}
156160

@@ -161,15 +165,26 @@ public int countRunning() {
161165
public Collection<Future<TriggerKey>> getRunningTasks() {
162166
return runningTasks.values();
163167
}
168+
public List<TriggerEntity> getRunningTriggers() {
169+
var doneAndNotRemovedFutures = this.runningTasks.entrySet().stream()
170+
.filter(e -> e.getValue().isDone())
171+
.toList();
172+
173+
if (doneAndNotRemovedFutures.size() > 0) {
174+
log.error("Found still pending futures, maybe an issue, report a bug if so {}",
175+
doneAndNotRemovedFutures.stream().map(e -> e.getKey().getKey()));
176+
for (var entry : doneAndNotRemovedFutures) {
177+
runningTasks.remove(entry.getKey());
178+
}
179+
}
180+
181+
return Collections.list(this.runningTasks.keys());
182+
}
164183

165184
public boolean isStopped() {
166185
return stopped.get() || maxThreads.get() <= 0;
167186
}
168187

169-
public List<TriggerEntity> getRunningTriggers() {
170-
return Collections.list(this.runningTasks.keys());
171-
}
172-
173188
public void setMaxThreads(int value) {
174189
this.maxThreads.set(value);
175190
}

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import java.time.Duration;
66
import java.time.OffsetDateTime;
77
import java.util.List;
8-
import java.util.concurrent.ExecutionException;
98
import java.util.concurrent.Future;
109

1110
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.RepeatedTest;
1212
import org.junit.jupiter.api.Test;
1313
import org.springframework.beans.factory.annotation.Autowired;
1414
import org.sterl.spring.persistent_tasks.AbstractSpringTest;
@@ -44,7 +44,7 @@ void schedulerShouldBeOnlineTest() {
4444
assertThat(status.getLastPing()).isBeforeOrEqualTo(time);
4545
}
4646

47-
@Test
47+
@RepeatedTest(3)
4848
void testFastTaskFreeThreadChound() throws Exception {
4949
// GIVEN
5050
var t = taskService.<String>replace("fastTask", s -> {});
@@ -64,9 +64,8 @@ void testFastTaskFreeThreadChound() throws Exception {
6464
assertThat(subject.getRunning().size()).isZero();
6565
assertThat(subject.hasRunningTriggers()).isFalse();
6666
// AND
67-
subject.triggerNextTasks();
6867
final SchedulerEntity scheduler = subject.getScheduler();
69-
assertThat(scheduler.getRunningTasks()).isEqualTo(0);
68+
assertThat(scheduler.getRunningTasks()).isZero();
7069
}
7170

7271
@Test

core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,13 @@ public Set<TriggerKey> scheduleNextTriggersAndWait(Duration maxWaitTime) {
105105
}
106106

107107
newTriggers = scheduleNextTriggers();
108+
if (newTriggers.isEmpty()) {
109+
Thread.sleep(200);
110+
newTriggers = scheduleNextTriggers();
111+
}
108112
result.addAll(awaitTriggers(maxWaitTime, newTriggers));
109113
} while (newTriggers.size() > 0);
114+
110115
return result;
111116
}
112117

test/src/main/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,13 @@ public Set<TriggerKey> scheduleNextTriggersAndWait(Duration maxWaitTime) {
105105
}
106106

107107
newTriggers = scheduleNextTriggers();
108+
if (newTriggers.isEmpty()) {
109+
Thread.sleep(200);
110+
newTriggers = scheduleNextTriggers();
111+
}
108112
result.addAll(awaitTriggers(maxWaitTime, newTriggers));
109113
} while (newTriggers.size() > 0);
114+
110115
return result;
111116
}
112117

0 commit comments

Comments
 (0)