Skip to content

Commit 39cdf0e

Browse files
committed
adjusted test and fixed code
1 parent 2c374a3 commit 39cdf0e

File tree

5 files changed

+49
-4
lines changed

5 files changed

+49
-4
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sterl.spring.persistent_tasks;
22

33
import java.io.Serializable;
4+
import java.time.Duration;
45
import java.util.ArrayList;
56
import java.util.Collection;
67
import java.util.Collections;
@@ -139,6 +140,7 @@ public List<TriggerKey> executeTriggersAndWait() {
139140
final var result = new ArrayList<TriggerKey>();
140141

141142
List<Future<TriggerKey>> triggers;
143+
var isSomethingRunning = false;
142144
do {
143145
triggers = executeTriggers();
144146
for (Future<TriggerKey> future : triggers) {
@@ -149,12 +151,34 @@ public List<TriggerKey> executeTriggersAndWait() {
149151
throw cause == null ? e : cause;
150152
}
151153
}
152-
} while (!triggers.isEmpty());
154+
155+
isSomethingRunning = hasRunningTriggers();
156+
if (isSomethingRunning) {
157+
Thread.sleep(Duration.ofMillis(100));
158+
}
159+
160+
} while (!triggers.isEmpty() || isSomethingRunning);
153161

154162
return result;
155163
}
156164

157-
public List<TriggerData> findTriggerByCorrelationId(String correlationId) {
165+
private boolean hasRunningTriggers() {
166+
var running = this.schedulers.stream()
167+
.map(s -> s.hasRunningTriggers())
168+
.filter(r -> r == true)
169+
.findAny();
170+
171+
return running.isPresent() && running.get() == true;
172+
}
173+
174+
/**
175+
* Returns all triggers for a correlationId sorted by the creation time.
176+
* @param correlationId the id to search for
177+
* @return the found {@link TriggerData} sorted by create time ASC
178+
*/
179+
@Transactional(readOnly = true, timeout = 5)
180+
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {
181+
158182
var running = triggerService.findTriggerByCorrelationId(correlationId)
159183
.stream().map(TriggerEntity::getData)
160184
.toList();
@@ -163,9 +187,10 @@ public List<TriggerData> findTriggerByCorrelationId(String correlationId) {
163187
.stream().map(TriggerHistoryLastStateEntity::getData)
164188
.toList();
165189

190+
166191
var result = new ArrayList<TriggerData>(running.size() + done.size());
167-
result.addAll(running);
168192
result.addAll(done);
193+
result.addAll(running);
169194
return result;
170195
}
171196
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
118118

119119
return taskExecutor.submit(result);
120120
} else {
121+
log.debug("No free threads {}/{} right now to run jobs due for: {}",
122+
taskExecutor.getFreeThreads(),
123+
taskExecutor.getMaxThreads(),
124+
timeDue);
121125
pingRegistry();
122126
return Collections.emptyList();
123127
}
@@ -155,6 +159,7 @@ void checkIfTrigerIsRunning(TriggerAddedEvent addedTrigger) {
155159
log.debug("New triger added for imidiate execution {}", addedTrigger.key());
156160
taskExecutor.submit(toRun);
157161
}
162+
// TODO implement a cleanup for old pending triggers which may never been triggered!
158163
}
159164

160165
public SchedulerEntity getStatus() {
@@ -177,4 +182,8 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
177182
public List<SchedulerEntity> listAll() {
178183
return editSchedulerStatus.listAll();
179184
}
185+
186+
public boolean hasRunningTriggers() {
187+
return !this.taskExecutor.getRunningTriggers().isEmpty();
188+
}
180189
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ public Optional<TriggerEntity> cancel(TriggerKey key) {
136136
return editTrigger.cancelTask(key);
137137
}
138138

139+
public List<TriggerEntity> cancel(Collection<TriggerKey> key) {
140+
return key.stream().map(editTrigger::cancelTask)
141+
.filter(Optional::isPresent)
142+
.map(Optional::get)
143+
.toList();
144+
}
145+
139146
/**
140147
* Counts the trigger using the name only from the {@link TaskId}
141148
*

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ public TriggerEntity withState(byte[] state) {
115115
return this;
116116
}
117117

118+
public boolean isWaiting() {
119+
return data.getStatus() == TriggerStatus.WAITING;
120+
}
121+
118122
public TriggerData copyData() {
119123
if (data == null) return null;
120124
return this.data.copy();

core/src/test/java/org/sterl/spring/persistent_tasks/TaskSchedulerServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ void testChainedTasks() {
100100
asserts.awaitOrdered("234::chainTask1", "chainTask1::234::chainTask1");
101101
assertThat(correlationId).isEqualTo(correlationFound.get());
102102
// AND
103-
var trigger= persistentTaskService.findTriggerByCorrelationId(correlationId);
103+
var trigger= persistentTaskService.findAllTriggerByCorrelationId(correlationId);
104104
assertThat(trigger).hasSize(2);
105105
assertThat(trigger.get(0).getCorrelationId()).isEqualTo(correlationId);
106106
assertThat(trigger.get(1).getCorrelationId()).isEqualTo(correlationId);

0 commit comments

Comments
 (0)