Skip to content

Commit b6f254f

Browse files
committed
added transaction support for tasks
1 parent be7150f commit b6f254f

File tree

9 files changed

+201
-32
lines changed

9 files changed

+201
-32
lines changed
Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,114 @@
11
package org.sterl.spring.persistent_tasks.api;
22

33
import java.time.OffsetDateTime;
4+
import java.time.temporal.ChronoUnit;
5+
import java.time.temporal.TemporalUnit;
46

57
import org.springframework.lang.Nullable;
68

9+
import lombok.RequiredArgsConstructor;
10+
711
@FunctionalInterface
812
public interface RetryStrategy {
913
RetryStrategy NO_RETRY = (c, e) -> false;
1014
/**
1115
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
1216
*/
13-
RetryStrategy THREE_RETRIES = (c, e) -> c < 4;
17+
RetryStrategy THREE_RETRIES = new LinearRetryStrategy(4, ChronoUnit.MINUTES, 1);
1418
/**
1519
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
1620
*/
17-
RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() {
21+
RetryStrategy THREE_RETRIES_IMMEDIATELY = new MultiplicativeRetryStrategy(4, ChronoUnit.MILLIS, 0);
22+
23+
24+
/**
25+
* Determines whether a retry should be attempted based on the current
26+
* execution count and the provided exception.
27+
*
28+
* @param executionCount The number of attempts already made.
29+
* @param error The exception that triggered the retry.
30+
* @return {@code true} if the current execution count is less than
31+
* the maximum execution count; {@code false} otherwise.
32+
*/
33+
boolean shouldRetry(int executionCount, @Nullable Exception error);
34+
35+
/**
36+
* Calculates the time of the next retry attempt based on the current
37+
* execution count and the provided exception.
38+
*
39+
* @param executionCount The number of attempts already made.
40+
* @param exception The exception that triggered the retry.
41+
* @return The {@link OffsetDateTime} representing the time of the next retry attempt.
42+
*/
43+
default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) {
44+
return OffsetDateTime.now().plusMinutes(executionCount);
45+
}
46+
47+
48+
// Default implementations
49+
/**
50+
* A retry strategy that determines the next retry time by adding a fixed
51+
* offset and the execution count to the current time in the specified temporal unit.
52+
*
53+
* <p>This strategy can be used to create retry intervals that increase linearly
54+
* with the number of attempts, providing a predictable delay pattern.</p>
55+
*
56+
* <p>Example:
57+
* If {@code offset = 5}, {@code unit = ChronoUnit.SECONDS}, and
58+
* {@code executionCount = 3}, the next retry will be scheduled after
59+
* {@code 5 + 3 = 8 seconds} from the current time.</p>
60+
*
61+
* <p>Note: The retry attempts will stop once the maximum execution count
62+
* ({@code maxExecutionCount}) is reached.</p>
63+
*
64+
* @author Your Name
65+
*/
66+
@RequiredArgsConstructor
67+
public static class LinearRetryStrategy implements RetryStrategy {
68+
private final int maxExecutionCount;
69+
private final TemporalUnit unit;
70+
private final int offset;
71+
1872
@Override
1973
public boolean shouldRetry(int executionCount, Exception error) {
20-
return executionCount < 4;
74+
return maxExecutionCount > executionCount;
2175
}
2276
@Override
2377
public OffsetDateTime retryAt(int executionCount, Exception error) {
24-
return OffsetDateTime.now();
78+
return OffsetDateTime.now().plus(offset + executionCount, unit);
2579
}
26-
};
27-
80+
}
81+
2882
/**
29-
* Check if a retry should be done.
83+
* A retry strategy that determines the next retry time by multiplying
84+
* the execution count by a scaling factor and adding the result to the
85+
* current time in the specified temporal unit.
86+
*
87+
* <p>This strategy can be used to create retry intervals that increase
88+
* multiplicatively with the number of attempts, providing a way to progressively
89+
* delay retries.</p>
3090
*
31-
* @param executionCount 0 based counter how often the execution was tried
32-
* @param error the exception, <code>null</code> on a timeout
33-
*/
34-
boolean shouldRetry(int executionCount, @Nullable Exception error);
35-
36-
/**
37-
* By default a linear retry strategy, adding one minute for each failed try.
91+
* <p>Example:
92+
* If {@code scalingFactor = 2}, {@code unit = ChronoUnit.SECONDS}, and
93+
* {@code executionCount = 3}, the next retry will be scheduled after
94+
* {@code 2 * 3 = 6 seconds} from the current time.</p>
3895
*
39-
* @param executionCount 0 based counter how often the execution was tried
40-
* @param error the exception, <code>null</code> on a timeout
96+
* <p>Note: The retry attempts will stop once the maximum execution count
97+
* ({@code maxExecutionCount}) is reached.</p>
4198
*/
42-
default OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
43-
return OffsetDateTime.now().plusMinutes(1 + executionCount);
99+
@RequiredArgsConstructor
100+
public static class MultiplicativeRetryStrategy implements RetryStrategy {
101+
private final int maxExecutionCount;
102+
private final TemporalUnit unit;
103+
private final int scalingFactor;
104+
105+
@Override
106+
public boolean shouldRetry(int executionCount, Exception error) {
107+
return maxExecutionCount > executionCount;
108+
}
109+
@Override
110+
public OffsetDateTime retryAt(int executionCount, Exception error) {
111+
return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
112+
}
44113
}
45114
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,19 @@ public interface SpringBeanTask<T extends Serializable> extends Consumer<T> {
1111
default RetryStrategy retryStrategy() {
1212
return RetryStrategy.THREE_RETRIES;
1313
}
14+
15+
/**
16+
* Whether the task is transaction or not. If <code>true</code> the execution
17+
* is wrapped into the default transaction template together with the state update
18+
* and the following events:
19+
* <ol>
20+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent</li>
21+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent</li>
22+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent</li>
23+
* </ol>
24+
* @return {@code true} if the task is transactional; {@code false} otherwise.
25+
*/
26+
default boolean isTransactional() {
27+
return false;
28+
}
1429
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2020
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
2121
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent;
22-
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCompleteEvent;
22+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent;
2323
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
2424
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2525
import org.sterl.spring.persistent_tasks.trigger.repository.TriggerRepository;
@@ -55,8 +55,8 @@ public Optional<TriggerEntity> completeTaskWithStatus(TriggerKey key, Exception
5555
result.ifPresent(t -> {
5656
t.complete(e);
5757

58-
if (t.getData().getStatus() != TriggerStatus.FAILED) {
59-
publisher.publishEvent(new TriggerCompleteEvent(t));
58+
if (t.getData().getStatus() == TriggerStatus.SUCCESS) {
59+
publisher.publishEvent(new TriggerSuccessEvent(t));
6060
log.debug("Setting {} to status={} {}", key, t.getData().getStatus(),
6161
e == null ? "" : "error=" + e.getClass().getSimpleName());
6262
} else {

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import java.io.Serializable;
44
import java.time.OffsetDateTime;
55
import java.util.Optional;
6+
import java.util.concurrent.Callable;
67

78
import org.springframework.context.ApplicationEventPublisher;
89
import org.springframework.lang.Nullable;
910
import org.springframework.stereotype.Component;
11+
import org.springframework.transaction.support.TransactionTemplate;
1012
import org.sterl.spring.persistent_tasks.api.Task;
1113
import org.sterl.spring.persistent_tasks.task.TaskService;
1214
import org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent;
@@ -23,36 +25,72 @@ public class RunTriggerComponent {
2325
private final TaskService taskService;
2426
private final EditTriggerComponent editTrigger;
2527
private final ApplicationEventPublisher eventPublisher;
28+
private final TransactionTemplate trx;
2629
private final StateSerializer serializer = new StateSerializer();
2730

2831
/**
2932
* Will execute the given {@link TriggerEntity} and handle any errors etc.
3033
*/
31-
public Optional<TriggerEntity> execute(@Nullable TriggerEntity trigger) {
34+
public Optional<TriggerEntity> execute(TriggerEntity trigger) {
3235
if (trigger == null) {
3336
return Optional.empty();
3437
}
35-
Task<Serializable> task = null;
38+
final var taskAndState = getTastAndState(trigger);
39+
// something went really wrong this trigger is crap
40+
if (taskAndState == null) return Optional.of(trigger);
41+
3642
try {
37-
task = taskService.assertIsKnown(trigger.newTaskId());
43+
Optional<TriggerEntity> result;
44+
if (taskAndState.isTransactional()) {
45+
result = trx.execute(t -> taskAndState.call());
46+
} else {
47+
result = taskAndState.call();
48+
}
3849

50+
return result;
51+
} catch (Exception e) {
52+
return handleTaskException(taskAndState, e);
53+
}
54+
}
55+
@Nullable
56+
private TaskAndState getTastAndState(TriggerEntity trigger) {
57+
try {
58+
var task = taskService.assertIsKnown(trigger.newTaskId());
59+
var state = serializer.deserialize(trigger.getData().getState());
60+
return new TaskAndState(task, state, trigger);
61+
} catch (Exception e) {
62+
// this trigger is somehow crap, no retry and done.
63+
handleTaskException(new TaskAndState(null, null, trigger), e);
64+
return null;
65+
}
66+
}
67+
@RequiredArgsConstructor
68+
private class TaskAndState implements Callable<Optional<TriggerEntity>> {
69+
final Task<Serializable> task;
70+
final Serializable state;
71+
final TriggerEntity trigger;
72+
73+
boolean isTransactional() {
74+
return task.isTransactional();
75+
}
76+
public Optional<TriggerEntity> call() {
3977
eventPublisher.publishEvent(new TriggerRunningEvent(trigger));
4078

41-
task.accept(serializer.deserialize(trigger.getData().getState()));
79+
task.accept(state);
4280

4381
var result = editTrigger.completeTaskWithSuccess(trigger.getKey());
4482
editTrigger.deleteTrigger(trigger);
4583

4684
return result;
47-
} catch (Exception e) {
48-
return handleTaskException(trigger, task, e);
85+
4986
}
5087
}
5188

52-
private Optional<TriggerEntity> handleTaskException(TriggerEntity trigger,
53-
@Nullable Task<Serializable> task,
89+
private Optional<TriggerEntity> handleTaskException(TaskAndState taskAndState,
5490
@Nullable Exception e) {
5591

92+
var trigger = taskAndState.trigger;
93+
var task = taskAndState.task;
5694
var result = editTrigger.completeTaskWithStatus(trigger.getKey(), e);
5795

5896
if (task != null &&
@@ -73,7 +111,7 @@ private Optional<TriggerEntity> handleTaskException(TriggerEntity trigger,
73111
}
74112
} else {
75113
log.error("{} failed, no more retries! {}", trigger.getKey(),
76-
e == null ? "No exception given." : e.getMessage());
114+
e == null ? "No exception given." : e.getMessage(), e);
77115

78116
editTrigger.deleteTrigger(trigger);
79117
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/StateSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Serializable deserialize(byte[] bytes) {
3333
try (ObjectInput in = new ObjectInputStream(bis)) {
3434
return (Serializable)in.readObject();
3535
} catch (Exception ex) {
36-
throw new RuntimeException(ex);
36+
throw new RuntimeException("Failed to deserialize state of length " + bytes.length, ex);
3737
}
3838
}
3939
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
44

5-
public record TriggerCompleteEvent(TriggerEntity trigger) implements TriggerLifeCycleEvent {
5+
public record TriggerSuccessEvent(TriggerEntity trigger) implements TriggerLifeCycleEvent {
66

77
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/model/TriggerEntity.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,15 @@ public TriggerEntity runOn(String runningOn) {
8585
return this;
8686
}
8787

88+
/**
89+
* @param e Sets either {@link TriggerStatus#SUCCESS} or {@link TriggerStatus#FAILED}
90+
* based if the {@link Exception} is <code>null</code> or not.
91+
*/
8892
public TriggerEntity complete(Exception e) {
8993
data.setStatus(TriggerStatus.SUCCESS);
9094
data.setEnd(OffsetDateTime.now());
9195
data.updateRunningDuration();
96+
9297
if (e != null) {
9398
data.setStatus(TriggerStatus.FAILED);
9499
data.setExceptionName(e.getClass().getName());
@@ -103,4 +108,9 @@ public TriggerEntity runAt(OffsetDateTime runAt) {
103108
data.setRunAt(runAt);
104109
return this;
105110
}
111+
112+
public TriggerEntity withState(byte[] state) {
113+
this.data.setState(state);
114+
return this;
115+
}
106116
}

core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,4 +353,34 @@ void testRescheduleAbandonedTasks() {
353353
assertThat(rescheduledTasks).hasSize(1);
354354
assertThat(rescheduledTasks.get(0).getKey()).isEqualTo(t1.getKey());
355355
}
356+
357+
@Test
358+
void testUnknownTriggersNoRetry() {
359+
// GIVEN
360+
var t = triggerRepository.save(new TriggerEntity(new TriggerKey("fooTask-unknown")));
361+
362+
// WHEN
363+
runNextTrigger();
364+
365+
// WHEN
366+
var triggerData = persistentTaskService.getLastTriggerData(t.getKey()).get();
367+
assertThat(triggerData.getStatus()).isEqualTo(TriggerStatus.FAILED);
368+
assertThat(triggerData.getExceptionName()).isEqualTo(IllegalStateException.class.getName());
369+
}
370+
371+
@Test
372+
void testBadStateNoRetry() {
373+
var t = triggerRepository.save(new TriggerEntity(
374+
new TriggerKey("slowTask")
375+
).withState(new byte[] {12, 54})
376+
);
377+
378+
// WHEN
379+
runNextTrigger();
380+
381+
// WHEN
382+
var triggerData = persistentTaskService.getLastTriggerData(t.getKey()).get();
383+
assertThat(triggerData.getStatus()).isEqualTo(TriggerStatus.FAILED);
384+
assertThat(triggerData.getExceptionName()).isEqualTo(RuntimeException.class.getName());
385+
}
356386
}

ui/src/server-api.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ export interface HistoryOverview {
3737
export interface RetryStrategy {
3838
}
3939

40+
export interface LinearRetryStrategy extends RetryStrategy {
41+
}
42+
43+
export interface MultiplicativeRetryStrategy extends RetryStrategy {
44+
}
45+
4046
export interface SpringBeanTask<T> extends Consumer<T> {
47+
transactional: boolean;
4148
}
4249

4350
export interface Task<T> extends SpringBeanTask<T> {

0 commit comments

Comments
 (0)