Skip to content

Commit d96b0f0

Browse files
committed
added a way to cancel and to fail a running trigger
1 parent 80ceac5 commit d96b0f0

File tree

10 files changed

+149
-46
lines changed

10 files changed

+149
-46
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,27 +44,31 @@ public class PersistentTaskService {
4444
/**
4545
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
4646
* Maybe out of the history event from a retry execution of the very same id.
47-
*
47+
*
4848
* @param key the {@link TriggerKey} to look for
4949
* @return the {@link TriggerData} to the {@link TriggerKey}
5050
*/
5151
public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
5252
final Optional<TriggerEntity> trigger = triggerService.get(key);
5353
if (trigger.isEmpty()) {
5454
var history = historyService.findLastKnownStatus(key);
55-
if (history.isPresent()) return Optional.ofNullable(history.get().getData());
55+
if (history.isPresent()) {
56+
return Optional.ofNullable(history.get().getData());
57+
}
5658
return Optional.empty();
5759
} else {
5860
return Optional.ofNullable(trigger.get().getData());
5961
}
6062
}
61-
63+
6264
public Optional<TriggerData> getLastDetailData(TriggerKey key) {
6365
var data = historyService.findAllDetailsForKey(key, Pageable.ofSize(1));
64-
if (data.isEmpty()) return Optional.empty();
66+
if (data.isEmpty()) {
67+
return Optional.empty();
68+
}
6569
return Optional.of(data.getContent().get(0).getData());
6670
}
67-
71+
6872
@EventListener
6973
void queue(TriggerTaskCommand<? extends Serializable> event) {
7074
if (event.size() == 1) {
@@ -76,15 +80,17 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
7680

7781
/**
7882
* Queues/updates the given triggers, if the {@link TriggerKey} is already present
79-
*
83+
*
8084
* @param <T> the state type
8185
* @param triggers the triggers to add
8286
* @return the {@link TriggerKey}
8387
*/
8488
@Transactional(timeout = 10)
8589
@NonNull
8690
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
87-
if (triggers == null || triggers.isEmpty()) return Collections.emptyList();
91+
if (triggers == null || triggers.isEmpty()) {
92+
return Collections.emptyList();
93+
}
8894

8995
return triggers.stream() //
9096
.map(t -> triggerService.queue(t)) //
@@ -93,7 +99,7 @@ public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequ
9399
}
94100
/**
95101
* Queues/updates the given trigger, if the {@link TriggerKey} is already present.
96-
*
102+
*
97103
* @param <T> the state type
98104
* @param trigger the trigger to add
99105
* @return the {@link TriggerKey}
@@ -107,7 +113,7 @@ public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
107113
/**
108114
* Runs the given trigger if a free threads are available
109115
* and the runAt time is not in the future.
110-
* @return the reference to the {@link TriggerKey}
116+
* @return the reference to the {@link TriggerKey}
111117
*/
112118
public <T extends Serializable> TriggerKey runOrQueue(
113119
AddTriggerRequest<T> triggerRequest) {
@@ -118,10 +124,10 @@ public <T extends Serializable> TriggerKey runOrQueue(
118124
}
119125
return triggerRequest.key();
120126
}
121-
127+
122128
/**
123129
* Triggers the execution of all pending triggers.
124-
*
130+
*
125131
* @return the reference to the {@link TriggerKey} of the running tasks
126132
*/
127133
public List<Future<TriggerKey>> executeTriggers() {
@@ -131,7 +137,7 @@ public List<Future<TriggerKey>> executeTriggers() {
131137
}
132138
return result;
133139
}
134-
140+
135141
/**
136142
* Triggers the execution of all pending triggers and wait for the result.
137143
*/
@@ -151,7 +157,7 @@ public List<TriggerKey> executeTriggersAndWait() {
151157
throw cause == null ? e : cause;
152158
}
153159
}
154-
160+
155161
isSomethingRunning = hasRunningTriggers();
156162
if (isSomethingRunning) {
157163
Thread.sleep(Duration.ofMillis(100));
@@ -165,7 +171,7 @@ public List<TriggerKey> executeTriggersAndWait() {
165171
private boolean hasRunningTriggers() {
166172
var running = this.schedulers.stream()
167173
.map(s -> s.hasRunningTriggers())
168-
.filter(r -> r == true)
174+
.filter(r -> r)
169175
.findAny();
170176

171177
return running.isPresent() && running.get() == true;
@@ -186,7 +192,7 @@ public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {
186192
var done = historyService.findTriggerByCorrelationId(correlationId)
187193
.stream().map(TriggerHistoryLastStateEntity::getData)
188194
.toList();
189-
195+
190196

191197
var result = new ArrayList<TriggerData>(running.size() + done.size());
192198
result.addAll(done);
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.sterl.spring.persistent_tasks.task.exception;
2+
3+
/**
4+
* Set the task to cancel and finish the execution
5+
*/
6+
public class CancelTaskException extends TaskException {
7+
private static final long serialVersionUID = 1L;
8+
public CancelTaskException(String message, Throwable cause) {
9+
super(message, cause);
10+
}
11+
12+
public CancelTaskException(String message) {
13+
super(message);
14+
}
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.sterl.spring.persistent_tasks.task.exception;
2+
3+
/**
4+
* Set the task to failed and finish the execution.
5+
*/
6+
public class FailTaskNoRetryException extends TaskException {
7+
private static final long serialVersionUID = 1L;
8+
public FailTaskNoRetryException(String message, Throwable cause) {
9+
super(message, cause);
10+
}
11+
12+
public FailTaskNoRetryException(String message) {
13+
super(message);
14+
}
15+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.sterl.spring.persistent_tasks.task.exception;
2+
3+
public abstract class TaskException extends RuntimeException {
4+
private static final long serialVersionUID = 1L;
5+
6+
public TaskException(String message, Throwable cause) {
7+
super(message, cause);
8+
}
9+
10+
public TaskException(String message) {
11+
super(message);
12+
}
13+
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,11 @@ public <T extends Serializable> TriggerEntity queue(AddTriggerRequest<T> tigger)
133133
* If you changed your mind, cancel the persistentTask
134134
*/
135135
public Optional<TriggerEntity> cancel(TriggerKey key) {
136-
return editTrigger.cancelTask(key);
136+
return editTrigger.cancelTask(key, null);
137137
}
138138

139139
public List<TriggerEntity> cancel(Collection<TriggerKey> key) {
140-
return key.stream().map(editTrigger::cancelTask)
140+
return key.stream().map(t -> editTrigger.cancelTask(t, null))
141141
.filter(Optional::isPresent)
142142
.map(Optional::get)
143143
.toList();

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.List;
77
import java.util.Optional;
88

9-
import org.slf4j.event.Level;
109
import org.springframework.context.ApplicationEventPublisher;
1110
import org.springframework.lang.NonNull;
1211
import org.springframework.stereotype.Component;
@@ -52,7 +51,7 @@ public Optional<TriggerEntity> completeTaskWithSuccess(TriggerKey key, Serializa
5251
}
5352

5453
/**
55-
* Sets success or error based on the fact if an exception is given or not.
54+
* Sets error based on the fact if an exception is given or not.
5655
*/
5756
public Optional<TriggerEntity> failTrigger(
5857
TriggerKey key,
@@ -63,10 +62,6 @@ public Optional<TriggerEntity> failTrigger(
6362

6463

6564
result.ifPresent(t -> {
66-
log.atLevel(retryAt == null ? Level.ERROR : Level.WARN)
67-
.setCause(e)
68-
.log("{} failed, retryAt={}",
69-
key, retryAt == null ? "no" : retryAt);
7065
t.complete(e);
7166
publisher.publishEvent(new TriggerFailedEvent(t.getId(), t.copyData(), state, e, retryAt));
7267

@@ -76,6 +71,7 @@ public Optional<TriggerEntity> failTrigger(
7671
t.runAt(retryAt);
7772
}
7873
});
74+
7975
if (result.isEmpty()) {
8076
log.error("Trigger with key={} not found and may be at a wrong state!",
8177
key, e);
@@ -84,17 +80,19 @@ public Optional<TriggerEntity> failTrigger(
8480
return result;
8581
}
8682

87-
public Optional<TriggerEntity> cancelTask(TriggerKey id) {
83+
public Optional<TriggerEntity> cancelTask(TriggerKey id, Exception e) {
8884
return triggerRepository //
8985
.findByKey(id) //
90-
.map(t -> {
91-
t.cancel();
92-
publisher.publishEvent(new TriggerCanceledEvent(
93-
t.getId(), t.copyData(),
94-
stateSerializer.deserializeOrNull(t.getData().getState())));
95-
triggerRepository.delete(t);
96-
return t;
97-
});
86+
.map(t -> cancelTask(t, e));
87+
}
88+
89+
private TriggerEntity cancelTask(TriggerEntity t, Exception e) {
90+
t.cancel(e);
91+
publisher.publishEvent(new TriggerCanceledEvent(
92+
t.getId(), t.copyData(),
93+
stateSerializer.deserializeOrNull(t.getData().getState())));
94+
triggerRepository.delete(t);
95+
return t;
9896
}
9997

10098
public <T extends Serializable> TriggerEntity addTrigger(AddTriggerRequest<T> tigger) {

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.springframework.transaction.annotation.Transactional;
1010
import org.sterl.spring.persistent_tasks.api.task.RunningTriggerContextHolder;
1111
import org.sterl.spring.persistent_tasks.task.TaskService;
12+
import org.sterl.spring.persistent_tasks.task.exception.CancelTaskException;
13+
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
1214
import org.sterl.spring.persistent_tasks.trigger.model.RunTaskWithStateCommand;
1315
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
1416

@@ -66,18 +68,21 @@ private Optional<TriggerEntity> failTaskAndState(RunTaskWithStateCommand runTask
6668
var task = runTaskWithStateCommand.task();
6769
Optional<TriggerEntity> result;
6870

69-
if (task != null
70-
&& task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
71+
if (e instanceof CancelTaskException) {
72+
log.info("Cancel of a running trigger={} requested", trigger.getKey());
73+
result = editTrigger.cancelTask(trigger.getKey(), e);
74+
} else if (e instanceof FailTaskNoRetryException) {
75+
log.warn("Fail no retry of a running trigger={} requested", trigger.getKey(), e);
76+
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null);
77+
} else if (task == null
78+
|| !task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
7179

80+
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
81+
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null);
82+
} else {
7283
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
73-
84+
log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e);
7485
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, retryAt);
75-
76-
} else {
77-
log.error("{} failed, no more retries! {}", trigger.getKey(),
78-
e == null ? "No exception given." : e.getMessage(), e);
79-
80-
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null);
8186
}
8287
return result;
8388
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,18 @@ public TriggerKey getKey() {
6666
return data.getKey();
6767
}
6868

69-
public TriggerEntity cancel() {
69+
public TriggerEntity cancel(Exception e) {
7070
this.data.setEnd(OffsetDateTime.now());
7171
this.data.setStatus(TriggerStatus.CANCELED);
72-
this.data.setExceptionName("PersistentTask canceled");
73-
this.data.setRunningDurationInMs(null);
72+
73+
if (e == null) {
74+
this.data.setExceptionName("PersistentTask canceled");
75+
} else {
76+
data.setExceptionName(e.getClass().getName());
77+
data.setLastException(ExceptionUtils.getStackTrace(e));
78+
}
79+
80+
data.updateRunningDuration();
7481
return this;
7582
}
7683

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
import org.sterl.spring.persistent_tasks.AbstractSpringTest;
1515
import org.sterl.spring.persistent_tasks.api.RetryStrategy;
1616
import org.sterl.spring.persistent_tasks.api.TaskId.TriggerBuilder;
17-
import org.sterl.spring.persistent_tasks.api.task.PersistentTask;
18-
import org.sterl.spring.persistent_tasks.api.task.TransactionalTask;
1917
import org.sterl.spring.persistent_tasks.api.TriggerKey;
2018
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
19+
import org.sterl.spring.persistent_tasks.api.task.PersistentTask;
20+
import org.sterl.spring.persistent_tasks.api.task.TransactionalTask;
2121
import org.sterl.spring.sample_app.person.PersonEntity;
2222
import org.sterl.spring.sample_app.person.PersonRepository;
2323
import org.sterl.test.Countdown;

core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.sterl.spring.persistent_tasks.api.TriggerKey;
2323
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
2424
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
25+
import org.sterl.spring.persistent_tasks.task.exception.CancelTaskException;
26+
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
2527
import org.sterl.spring.persistent_tasks.task.repository.TaskRepository;
2628
import org.sterl.spring.persistent_tasks.trigger.component.StateSerializer.DeSerializationFailedException;
2729
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
@@ -420,4 +422,46 @@ void testBadStateNoRetry() {
420422
assertThat(events.stream(TriggerSuccessEvent.class).count()).isZero();
421423
assertThat(events.stream(TriggerFailedEvent.class).count()).isOne();
422424
}
425+
426+
@Test
427+
void tesCancelRunningTrigger() {
428+
// GIVEN
429+
TaskId<String> taskId = taskService.replace("foo-cancel", c -> {
430+
throw new CancelTaskException(c);
431+
});
432+
var key1 = subject.queue(taskId.newTrigger().build()).getKey();
433+
434+
// WHEN
435+
assertThat(runNextTrigger()).isPresent();
436+
assertThat(runNextTrigger()).isEmpty();
437+
438+
// THEN
439+
assertThat(historyService.findLastKnownStatus(key1).get().status()).isEqualTo(TriggerStatus.CANCELED);
440+
441+
// AND
442+
assertThat(events.stream(TriggerCanceledEvent.class).count()).isOne();
443+
assertThat(events.stream(TriggerFailedEvent.class).count()).isZero();
444+
assertThat(events.stream(TriggerSuccessEvent.class).count()).isZero();
445+
}
446+
447+
@Test
448+
void tesFailRunningTriggerNoRetry() {
449+
// GIVEN
450+
TaskId<String> taskId = taskService.replace("foo-fail", c -> {
451+
throw new FailTaskNoRetryException(c);
452+
});
453+
var key1 = subject.queue(taskId.newTrigger().build()).getKey();
454+
455+
// WHEN
456+
assertThat(runNextTrigger()).isPresent();
457+
assertThat(runNextTrigger()).isEmpty();
458+
459+
// THEN
460+
assertThat(historyService.findLastKnownStatus(key1).get().status()).isEqualTo(TriggerStatus.FAILED);
461+
462+
// AND
463+
assertThat(events.stream(TriggerFailedEvent.class).count()).isOne();
464+
assertThat(events.stream(TriggerCanceledEvent.class).count()).isZero();
465+
assertThat(events.stream(TriggerSuccessEvent.class).count()).isZero();
466+
}
423467
}

0 commit comments

Comments
 (0)