Skip to content

Commit 8c2a60a

Browse files
committed
abandoned triggers will now fire a failed trigger event
1 parent c8a2256 commit 8c2a60a

File tree

12 files changed

+152
-72
lines changed

12 files changed

+152
-72
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## v1.6.7
4+
5+
### New features:
6+
7+
### Bug fixes:
8+
- abandoned triggers will now fire a failed trigger event
9+
- trigger history scheduler waits now in case of shut down for any history events to get saved
10+
311
## v1.6.6 - (2025-03-20)
412

513
### New features:

core/src/main/java/org/sterl/spring/persistent_tasks/history/component/TriggerHistoryComponent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class TriggerHistoryComponent {
3131
// as if the trigger fails, a rollback would also remove this entry
3232
// furthermore async to ensure that we would not block
3333
// as REQURES_NEW would block two DB connections ...
34-
@Async
34+
@Async("triggerHistoryExecutor")
3535
@Transactional(timeout = 10)
3636
@EventListener
3737
public void onRunning(TriggerRunningEvent e) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.sterl.spring.persistent_tasks.history.config;
2+
3+
import org.springframework.context.annotation.Bean;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.core.task.TaskExecutor;
6+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
7+
8+
@Configuration
9+
public class TriggerHistoryConfig {
10+
11+
@Bean
12+
TaskExecutor triggerHistoryExecutor() {
13+
var taskExecutor = new ThreadPoolTaskExecutor();
14+
taskExecutor.setCorePoolSize(1);
15+
taskExecutor.setMaxPoolSize(4);
16+
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
17+
taskExecutor.initialize();
18+
return taskExecutor;
19+
}
20+
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ public Future<TriggerKey> submit(@Nullable TriggerEntity trigger) {
7070
if (trigger == null) {
7171
return CompletableFuture.completedFuture(null);
7272
}
73-
if (stopped.get() || executor == null) {
74-
throw new IllegalStateException("Executor of " + schedulerName + " is already stopped");
75-
}
7673

7774
try {
7875
Future<TriggerKey> result;
7976
synchronized (runningTasks) {
77+
if (stopped.get() || executor == null) {
78+
throw new IllegalStateException("Executor of " + schedulerName + " is already stopped");
79+
}
8080
result = executor.submit(() -> runTrigger(trigger));
8181
runningTasks.put(trigger, result);
8282
}
@@ -115,28 +115,25 @@ public void start() {
115115

116116
@Override
117117
public void close() {
118-
ExecutorService executorRef;
118+
stopped.set(true);
119119
synchronized (runningTasks) {
120-
executorRef = executor;
121-
executor = null;
122-
stopped.set(true);
123-
}
124-
125-
if (executorRef != null) {
126-
executorRef.shutdown();
127-
log.info("Shutdown {} with {} running tasks, waiting for {}.", schedulerName, runningTasks.size(),
128-
maxShutdownWaitTime);
129-
130-
if (runningTasks.size() > 0) {
131-
try {
132-
executorRef.awaitTermination(maxShutdownWaitTime.getSeconds(), TimeUnit.SECONDS);
133-
} catch (InterruptedException e) {
134-
Thread.currentThread().interrupt();
135-
log.warn("Failed to complete runnings tasks.", e.getCause() == null ? e : e.getCause());
136-
shutdownNow();
137-
} finally {
138-
executorRef = null;
139-
runningTasks.clear();
120+
if (executor != null) {
121+
var execRef = executor;
122+
execRef.shutdown();
123+
log.info("Shutdown {} with {} running tasks, waiting for {}.", schedulerName, runningTasks.size(),
124+
maxShutdownWaitTime);
125+
126+
if (runningTasks.size() > 0) {
127+
try {
128+
execRef.awaitTermination(maxShutdownWaitTime.getSeconds(), TimeUnit.SECONDS);
129+
} catch (InterruptedException e) {
130+
Thread.currentThread().interrupt();
131+
log.warn("Failed to complete runnings tasks.", e.getCause() == null ? e : e.getCause());
132+
shutdownNow();
133+
} finally {
134+
executor = null;
135+
runningTasks.clear();
136+
}
140137
}
141138
}
142139
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService;
1919
import org.sterl.spring.persistent_tasks.task.TaskService;
2020
import org.sterl.spring.persistent_tasks.trigger.component.EditTriggerComponent;
21+
import org.sterl.spring.persistent_tasks.trigger.component.FailTriggerComponent;
2122
import org.sterl.spring.persistent_tasks.trigger.component.LockNextTriggerComponent;
2223
import org.sterl.spring.persistent_tasks.trigger.component.ReadTriggerComponent;
2324
import org.sterl.spring.persistent_tasks.trigger.component.RunTriggerComponent;
25+
import org.sterl.spring.persistent_tasks.trigger.component.StateSerializer;
2426
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2527

2628
import lombok.RequiredArgsConstructor;
@@ -32,9 +34,11 @@
3234
public class TriggerService {
3335

3436
private final TaskService taskService;
37+
private final StateSerializer stateSerializer = new StateSerializer();
3538
private final RunTriggerComponent runTrigger;
3639
private final ReadTriggerComponent readTrigger;
3740
private final EditTriggerComponent editTrigger;
41+
private final FailTriggerComponent failTrigger;
3842
private final LockNextTriggerComponent lockNextTrigger;
3943

4044
/**
@@ -169,10 +173,11 @@ public long countTriggers(@Nullable TriggerStatus status) {
169173
public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
170174
final List<TriggerEntity> result = readTrigger.findTriggersLastPingAfter(
171175
timeout);
176+
final var e = new IllegalStateException("Trigger abandoned - timeout: " + timeout);
172177
result.forEach(t -> {
173-
t.setRunningOn(null);
174-
t.getData().setStatus(TriggerStatus.WAITING);
175-
t.getData().setExceptionName("Abandoned tasks");
178+
var task = taskService.get(t.newTaskId());
179+
var state = stateSerializer.deserializeOrNull(t.getData().getState());
180+
failTrigger.execute(task.orElse(null), t, state, e);
176181
});
177182
log.debug("rescheduled {} triggers", result.size());
178183
return result;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.sterl.spring.persistent_tasks.trigger.component;
2+
3+
import java.io.Serializable;
4+
import java.time.OffsetDateTime;
5+
import java.util.Optional;
6+
7+
import org.springframework.lang.Nullable;
8+
import org.springframework.stereotype.Component;
9+
import org.sterl.spring.persistent_tasks.api.task.PersistentTask;
10+
import org.sterl.spring.persistent_tasks.task.exception.CancelTaskException;
11+
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
12+
import org.sterl.spring.persistent_tasks.trigger.model.RunTaskWithStateCommand;
13+
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
14+
15+
import lombok.RequiredArgsConstructor;
16+
import lombok.extern.slf4j.Slf4j;
17+
18+
@Component
19+
@RequiredArgsConstructor
20+
@Slf4j
21+
public class FailTriggerComponent {
22+
23+
private final EditTriggerComponent editTrigger;
24+
25+
public Optional<TriggerEntity> execute(RunTaskWithStateCommand runTaskWithStateCommand, Exception e) {
26+
27+
var trigger = runTaskWithStateCommand.trigger();
28+
var task = runTaskWithStateCommand.task();
29+
var state = runTaskWithStateCommand.state();
30+
return execute(task, trigger, state, e);
31+
}
32+
33+
/**
34+
* Fails the given trigger, no retry will be applied!
35+
*/
36+
public <T extends Serializable> Optional<TriggerEntity> execute(TriggerEntity trigger, Exception e) {
37+
return execute(null, trigger, null, e);
38+
}
39+
public <T extends Serializable> Optional<TriggerEntity> execute(
40+
@Nullable PersistentTask<T> task,
41+
TriggerEntity trigger,
42+
@Nullable T state,
43+
Exception e) {
44+
45+
Optional<TriggerEntity> result;
46+
47+
if (e instanceof CancelTaskException) {
48+
log.info("Cancel of a running trigger={} requested", trigger.getKey());
49+
result = editTrigger.cancelTask(trigger.getKey(), e);
50+
} else if (e instanceof FailTaskNoRetryException) {
51+
log.warn("Fail no retry of a running trigger={} requested", trigger.getKey(), e);
52+
result = editTrigger.failTrigger(trigger.getKey(), state, e, null);
53+
} else if (task == null
54+
|| !task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
55+
56+
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
57+
result = editTrigger.failTrigger(trigger.getKey(), state, e, null);
58+
} else {
59+
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
60+
if (retryAt == null) {
61+
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
62+
} else {
63+
log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e);
64+
}
65+
result = editTrigger.failTrigger(trigger.getKey(), state, e, retryAt);
66+
}
67+
return result;
68+
}
69+
}
Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.sterl.spring.persistent_tasks.trigger.component;
22

3-
import java.time.OffsetDateTime;
43
import java.util.Optional;
54

65
import org.springframework.lang.Nullable;
@@ -9,8 +8,6 @@
98
import org.springframework.transaction.annotation.Transactional;
109
import org.sterl.spring.persistent_tasks.api.task.RunningTriggerContextHolder;
1110
import org.sterl.spring.persistent_tasks.task.TaskService;
12-
import org.sterl.spring.persistent_tasks.task.exception.CancelTaskException;
13-
import org.sterl.spring.persistent_tasks.task.exception.FailTaskNoRetryException;
1411
import org.sterl.spring.persistent_tasks.trigger.model.RunTaskWithStateCommand;
1512
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
1613

@@ -23,6 +20,7 @@
2320
public class RunTriggerComponent {
2421

2522
private final TaskService taskService;
23+
private final FailTriggerComponent failTrigger;
2624
private final EditTriggerComponent editTrigger;
2725
private final StateSerializer serializer = new StateSerializer();
2826

@@ -43,7 +41,7 @@ public Optional<TriggerEntity> execute(TriggerEntity trigger) {
4341
RunningTriggerContextHolder.setContext(runTaskWithState.runningTrigger());
4442
return runTaskWithState.execute(editTrigger);
4543
} catch (Exception e) {
46-
return failTaskAndState(runTaskWithState, e);
44+
return failTrigger.execute(runTaskWithState, e);
4745
} finally {
4846
RunningTriggerContextHolder.clearContext();
4947
}
@@ -57,37 +55,8 @@ private RunTaskWithStateCommand buildTaskWithStateFor(TriggerEntity trigger) {
5755
final var state = serializer.deserialize(trigger.getData().getState());
5856
return new RunTaskWithStateCommand(task, trx, state, trigger);
5957
} catch (Exception e) {
60-
failTaskAndState(new RunTaskWithStateCommand(null, Optional.empty(), null, trigger), e);
58+
failTrigger.execute(trigger, e);
6159
return null;
6260
}
6361
}
64-
65-
private Optional<TriggerEntity> failTaskAndState(RunTaskWithStateCommand runTaskWithStateCommand, Exception e) {
66-
67-
var trigger = runTaskWithStateCommand.trigger();
68-
var task = runTaskWithStateCommand.task();
69-
Optional<TriggerEntity> result;
70-
71-
if (e instanceof CancelTaskException) {
72-
log.info("Cancel of a running trigger={} requested", trigger.getKey());
73-
result = editTrigger.cancelTask(trigger.getKey(), e);
74-
} else if (e instanceof FailTaskNoRetryException) {
75-
log.warn("Fail no retry of a running trigger={} requested", trigger.getKey(), e);
76-
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null);
77-
} else if (task == null
78-
|| !task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
79-
80-
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
81-
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null);
82-
} else {
83-
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
84-
if (retryAt == null) {
85-
log.error("Failed trigger={}, no further retries!", trigger.getKey(), e);
86-
} else {
87-
log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e);
88-
}
89-
result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, retryAt);
90-
}
91-
return result;
92-
}
9362
}

core/src/test/java/org/sterl/spring/persistent_tasks/PersistentTaskServiceTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ void testChainedTasks() throws Exception {
4141
final var correlationId = UUID.randomUUID().toString();
4242

4343
// WHEN
44-
System.err.println("-----");
4544
subject.runOrQueue(task1.newTrigger(234).correlationId(correlationId).build());
4645

4746
// THEN

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/TaskFailoverTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,9 @@ void rescheduleAbandonedTasksTest() throws Exception {
6060
.isEqualTo(1);
6161
assertThat(triggerService.countTriggers(TriggerStatus.WAITING))
6262
.isEqualTo(1);
63+
// AND
64+
var timeoutTrigger = historyService.findAllDetailsForKey(willTimeout.getKey()).getContent().getFirst();
65+
assertThat(timeoutTrigger.status()).isEqualTo(TriggerStatus.FAILED);
66+
assertThat(timeoutTrigger.getData().getLastException()).contains("Trigger abandoned");
6367
}
6468
}

core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,21 @@ public Set<TriggerKey> scheduleNextTriggersAndWait(Duration maxWaitTime) {
9696
final var result = new LinkedHashSet<TriggerKey>();
9797
final var timeOut = System.currentTimeMillis() + maxWaitTime.toMillis();
9898

99-
result.addAll(awaitRunningTriggers(maxWaitTime));
10099
List<TriggerKey> newTriggers;
101100
do {
102101

103102
if (System.currentTimeMillis() > timeOut) {
104103
throw new RuntimeException("Timeout waiting for triggers after " + maxWaitTime);
105104
}
106-
107-
newTriggers = awaitTriggers(maxWaitTime, scheduleNextTriggers());
105+
// 1. the running
106+
newTriggers = awaitRunningTriggers(maxWaitTime);
107+
108+
// 2. check if we have waiting new tasks
109+
if (newTriggers.isEmpty()) {
110+
newTriggers = awaitTriggers(maxWaitTime, scheduleNextTriggers());
111+
}
112+
// 3. in case we are really fast in our tests (empty tasks) we double check
113+
// that we have no race condition and missed a now running task
108114
if (newTriggers.isEmpty()) {
109115
newTriggers = awaitRunningTriggers(maxWaitTime);
110116
}

0 commit comments

Comments
 (0)