Skip to content

Commit d3f1492

Browse files
committed
added the ability to listen for an error in the trigger itself
1 parent 57ec891 commit d3f1492

File tree

7 files changed

+216
-59
lines changed

7 files changed

+216
-59
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/api/task/PersistentTask.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.springframework.lang.Nullable;
66
import org.sterl.spring.persistent_tasks.api.RetryStrategy;
77
import org.sterl.spring.persistent_tasks.api.Trigger;
8+
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
89

910
/**
1011
* A Spring persistent task whose state is saved in a {@link Trigger}.
@@ -51,4 +52,20 @@ default RetryStrategy retryStrategy() {
5152
default boolean isTransactional() {
5253
return false;
5354
}
55+
56+
/**
57+
* Callback handler which is invoked once <b>after</b>:
58+
* <ul>
59+
* <li> if the trigger is finally failed
60+
* <li> or the trigger is abandoned
61+
* </ul>
62+
* <br>
63+
* This method is not invoked for expired triggers waiting for an signal.
64+
*
65+
* @param state the state, could be <code>null</code> if the state could be parsed
66+
* @param e the exception reason - could also be a {@link FailTaskNoRetryException}
67+
* @see <a href="https://spring-persistent-task.sterl.org/failed-spring-triggers/">Failed trigger</a>
68+
*/
69+
default void afterTriggerFailed(@Nullable T state, Exception e) {
70+
}
5471
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ public Optional<RunningTriggerEntity> failTrigger(
7171

7272
result.ifPresent(t -> {
7373
t.complete(e);
74+
7475
publisher.publishEvent(new TriggerFailedEvent(t.getId(), t.copyData(), state, e, retryAt));
7576

7677
if (retryAt == null) {
7778
triggerRepository.delete(t);
7879
} else {
7980
t.runAt(retryAt);
8081
}
82+
8183
});
8284

8385
if (result.isEmpty()) {

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/FailTriggerComponent.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public Optional<RunningTriggerEntity> execute(RunTaskWithStateCommand runTaskWit
3636
public <T extends Serializable> Optional<RunningTriggerEntity> execute(RunningTriggerEntity trigger, Exception e) {
3737
return execute(null, trigger, null, e);
3838
}
39+
3940
public <T extends Serializable> Optional<RunningTriggerEntity> execute(
4041
@Nullable PersistentTask<T> task,
4142
RunningTriggerEntity trigger,
@@ -47,23 +48,58 @@ public <T extends Serializable> Optional<RunningTriggerEntity> execute(
4748
if (e instanceof CancelTaskException) {
4849
log.info("Cancel of a running trigger={} requested", trigger.getKey());
4950
result = editTrigger.cancelTask(trigger.getKey(), e);
50-
} else if (e instanceof FailTaskNoRetryException) {
51-
log.warn("Fail no retry of a running trigger={} requested", trigger.getKey(), e);
52-
result = editTrigger.failTrigger(trigger.getKey(), state, e, null);
53-
} else if (task == null
54-
|| !task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
51+
} else {
52+
result = triggerFailed(task, trigger, state, e);
53+
}
54+
return result;
55+
}
56+
57+
private <T extends Serializable> Optional<RunningTriggerEntity> triggerFailed(
58+
@Nullable PersistentTask<T> task,
59+
RunningTriggerEntity trigger,
60+
@Nullable T state, Exception e) {
61+
62+
final var retryAt = determineWhenToRetry(task, trigger, e);
63+
var result = editTrigger.failTrigger(trigger.getKey(), state, e, retryAt);
64+
65+
if (task != null && retryAt == null) {
66+
try {
67+
task.afterTriggerFailed(state, e);
68+
} catch (Exception ex) {
69+
log.error("Failed to invoke afterTriggerFailed on {}", task.getClass(), e);
70+
}
71+
}
72+
73+
return result;
74+
}
75+
76+
/**
77+
* @return <code>null</code> no retry, otherwise the time when to retry
78+
*/
79+
private <T extends Serializable> OffsetDateTime determineWhenToRetry(
80+
@Nullable PersistentTask<T> task,
81+
RunningTriggerEntity trigger, Exception e) {
5582

56-
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
57-
result = editTrigger.failTrigger(trigger.getKey(), state, e, null);
83+
final OffsetDateTime retryAt;
84+
if (task == null) {
85+
retryAt = null;
86+
log.warn("No task found for trigger key={}", trigger.key());
87+
} else if (e instanceof FailTaskNoRetryException) {
88+
log.info("No retry for trigger={} requested", trigger.getKey(), e);
89+
retryAt = null;
5890
} else {
59-
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
91+
var failCount = trigger.getData().getExecutionCount();
92+
var shouldRetry = task.retryStrategy().shouldRetry(failCount, e);
93+
retryAt = shouldRetry ? task.retryStrategy().retryAt(failCount, e) : null;
94+
6095
if (retryAt == null) {
61-
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
96+
log.error("Failed={} trigger={}, no further retries!", failCount,
97+
trigger.getKey(), e);
6298
} else {
63-
log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e);
99+
log.warn("Failed={} trigger={} with retryAt={}", failCount,
100+
trigger.getKey(), retryAt, e);
64101
}
65-
result = editTrigger.failTrigger(trigger.getKey(), state, e, retryAt);
66102
}
67-
return result;
103+
return retryAt;
68104
}
69105
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package org.sterl.spring.persistent_tasks.trigger;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.time.OffsetDateTime;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.lang.Nullable;
12+
import org.springframework.test.context.event.ApplicationEvents;
13+
import org.sterl.spring.persistent_tasks.AbstractSpringTest;
14+
import org.sterl.spring.persistent_tasks.api.TaskId;
15+
import org.sterl.spring.persistent_tasks.api.task.PersistentTask;
16+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerExpiredEvent;
17+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
18+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent;
19+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent;
20+
21+
class TriggerLifeCycleTest extends AbstractSpringTest {
22+
23+
@Autowired
24+
private TriggerService subject;
25+
26+
@Autowired
27+
private ApplicationEvents events;
28+
29+
@Test
30+
void testFailedTrigger() throws Exception {
31+
// GIVEN
32+
final AtomicInteger afterTriggerFailedCalled = new AtomicInteger(0);
33+
final AtomicReference<Exception> exRef = new AtomicReference<>();
34+
TaskId<String> task = taskService.<String>replace("foo", new PersistentTask<>() {
35+
@Override
36+
public void accept(@Nullable String c) {
37+
throw new IllegalArgumentException("Nope! " + c);
38+
}
39+
40+
public void afterTriggerFailed(String c, Exception e) {
41+
afterTriggerFailedCalled.incrementAndGet();
42+
exRef.set(e);
43+
}
44+
});
45+
46+
// WHEN
47+
subject.queue(task.newTrigger().state("Hallo :-)").build());
48+
persistentTaskTestService.runAllDueTrigger(OffsetDateTime.now().plusDays(10));
49+
50+
// THEN
51+
assertThat(events.stream(TriggerSuccessEvent.class).count()).isZero();
52+
assertThat(events.stream(TriggerRunningEvent.class).count()).isEqualTo(4);
53+
assertThat(events.stream(TriggerFailedEvent.class).count()).isEqualTo(4);
54+
// AND
55+
assertThat(events.stream(TriggerFailedEvent.class).filter(e -> e.isDone()).count()).isOne();
56+
// AND
57+
assertThat(afterTriggerFailedCalled.get()).isOne();
58+
assertThat(exRef.get().getClass()).isEqualTo(IllegalArgumentException.class);
59+
assertThat(exRef.get().getMessage()).isEqualTo("Nope! Hallo :-)");
60+
}
61+
62+
@Test
63+
void testExpireTimeoutTriggers() {
64+
// GIVEN
65+
TaskId<String> taskId = taskService.replace("foo", asserts::info);
66+
subject.queue(
67+
taskId.newTrigger().waitForSignal(OffsetDateTime.now().plusMinutes(1)).state("old state").build());
68+
var trigger = subject
69+
.queue(taskId.newTrigger().waitForSignal(OffsetDateTime.now().minusSeconds(1)).state("foobar").build());
70+
71+
// WHEN
72+
var expired = subject.expireTimeoutTriggers();
73+
74+
// WHEN
75+
assertThat(expired).hasSize(1);
76+
assertThat(trigger).isEqualTo(expired.get(0));
77+
// AND
78+
assertThat(events.stream(TriggerExpiredEvent.class).count()).isOne();
79+
assertThat(events.stream(TriggerFailedEvent.class).count()).isZero();
80+
}
81+
82+
@Test
83+
void testAbandonedTriggerCallFailed() {
84+
// GIVEN
85+
final AtomicInteger afterTriggerFailedCalled = new AtomicInteger(0);
86+
final AtomicReference<Exception> exRef = new AtomicReference<>();
87+
TaskId<String> taskId = taskService.replace("foo", new PersistentTask<>() {
88+
@Override
89+
public void accept(@Nullable String state) {
90+
}
91+
92+
public void afterTriggerFailed(String state, Exception e) {
93+
afterTriggerFailedCalled.incrementAndGet();
94+
exRef.set(e);
95+
}
96+
});
97+
98+
trx.executeWithoutResult(trx -> {
99+
var t = subject.queue(taskId.newTrigger().waitForSignal(OffsetDateTime.now().minusSeconds(1)).build());
100+
t.runOn("foo-bar-gone");
101+
t.setLastPing(OffsetDateTime.now().minusDays(1));
102+
t.getData().setExecutionCount(99);
103+
});
104+
105+
// WHEN
106+
var result = subject.rescheduleAbandoned(OffsetDateTime.now());
107+
108+
// THEN
109+
assertThat(result).hasSize(1);
110+
assertThat(afterTriggerFailedCalled.get()).isOne();
111+
assertThat(exRef.get()).isNotNull();
112+
}
113+
}

core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.sterl.spring.persistent_tasks.trigger;
22

3-
import static org.assertj.core.api.Assertions.*;
3+
import static org.assertj.core.api.Assertions.assertThat;
44

55
import java.time.Duration;
66
import java.time.OffsetDateTime;
@@ -24,7 +24,6 @@
2424
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
2525
import org.sterl.spring.persistent_tasks.history.repository.CompletedTriggerRepository;
2626
import org.sterl.spring.persistent_tasks.task.exception.CancelTaskException;
27-
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
2827
import org.sterl.spring.persistent_tasks.task.repository.TaskRepository;
2928
import org.sterl.spring.persistent_tasks.trigger.component.StateSerializer.DeSerializationFailedException;
3029
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
@@ -486,27 +485,6 @@ void testCancelRunningTrigger() {
486485
assertThat(events.stream(TriggerSuccessEvent.class)).hasSize(0);
487486
}
488487

489-
@Test
490-
void testFailRunningTriggerNoRetry() {
491-
// GIVEN
492-
TaskId<String> taskId = taskService.replace("foo-fail", c -> {
493-
throw new FailTaskNoRetryException(c);
494-
});
495-
var key1 = subject.queue(taskId.newTrigger().build()).getKey();
496-
497-
// WHEN
498-
assertThat(persistentTaskTestService.runNextTrigger()).isPresent();
499-
assertThat(persistentTaskTestService.runNextTrigger()).isEmpty();
500-
501-
// THEN
502-
assertThat(historyService.findLastKnownStatus(key1).get().status()).isEqualTo(TriggerStatus.FAILED);
503-
504-
// AND
505-
assertThat(events.stream(TriggerFailedEvent.class).count()).isOne();
506-
assertThat(events.stream(TriggerCanceledEvent.class).count()).isZero();
507-
assertThat(events.stream(TriggerSuccessEvent.class).count()).isZero();
508-
}
509-
510488
@Test
511489
void testAddTriggerAndWaitForSignal() {
512490
// GIVEN
@@ -616,27 +594,4 @@ void testAwaitForSignalTriggersInTimeoutWillNotRun() {
616594
// AND
617595
asserts.assertMissing("old state");
618596
}
619-
620-
@Test
621-
void testExpireTimeoutTriggers() {
622-
// GIVEN
623-
TaskId<String> taskId = taskService.replace("foo", asserts::info);
624-
subject.queue(taskId.newTrigger()
625-
.waitForSignal(OffsetDateTime.now().plusMinutes(1))
626-
.state("old state")
627-
.build());
628-
var trigger = subject.queue(taskId.newTrigger()
629-
.waitForSignal(OffsetDateTime.now().minusSeconds(1))
630-
.state("foobar")
631-
.build());
632-
633-
// WHEN
634-
var expired = subject.expireTimeoutTriggers();
635-
636-
// WHEN
637-
assertThat(expired).hasSize(1);
638-
assertThat(trigger).isEqualTo(expired.get(0));
639-
// AND
640-
assertThat(events.stream(TriggerExpiredEvent.class).count()).isOne();
641-
}
642597
}

doc/docs/delete-task-trigger.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ void testCancelTrigger() {
2222
## Cancel a running trigger
2323

2424
As soon a task is triggered, a task may decide to cancel or to fail itself. Both will suppress any outstanding retries of the retry strategy.
25-
![Task Exceprtion](/assets/spring-persistent-task-exception.png){ align=center }
25+
![Task Exceprtion](./assets/spring-persistent-task-exception.png){ align=center }
2626

2727
```java
2828
@Test

doc/docs/failed-spring-triggers.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Reacting to Trigger Failures
2+
3+
Each trigger goes through a trigger lifecycle with the [corresponding events](./life-cycle-events.md). One of these events, `TriggerFailedEvent`, allows you to hook into a trigger’s error handling.
4+
5+
!!! info
6+
7+
- Events are fired for all triggers, so a filter is needed to ensure the event corresponds to the current trigger.
8+
- Events are fired even if a trigger will retry; a check is needed to determine whether the handler should only execute when no retries are planned.
9+
10+
## Hook into a Failed Trigger
11+
12+
The default use case is to get notified if a trigger has failed and will not be retried.
13+
14+
```java
15+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
16+
void onTriggerFailed(TriggerFailedEvent failed) {
17+
if (TASK_NAME.equals(failed.key().getTaskName()) && failed.isDone()) {
18+
// trigger will not retry anymore
19+
}
20+
}
21+
```
22+
23+
## Join the failed trigger transaction
24+
25+
In this example, the following code joins the trigger/framework transaction.
26+
27+
```java
28+
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
29+
void onTriggerFailed(TriggerFailedEvent failed) {
30+
if (TASK_NAME.equals(failed.key().getTaskName()) && failed.isDone()) {
31+
// trigger will not retry anymore
32+
}
33+
}
34+
```

0 commit comments

Comments
 (0)