Skip to content

Commit baad850

Browse files
authored
Stats (#5)
* moved ping of scheduler * less transaction needed * added transaction support for tasks * fixed PMD issue * added test to count the transactions * extended javadoc * extended transaction management doc
1 parent fa2126f commit baad850

File tree

18 files changed

+372
-54
lines changed

18 files changed

+372
-54
lines changed

README.md

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,51 @@ public class BuildVehicleTask implements SpringBeanTask<Vehicle> {
6666

6767
private final VehicleRepository vehicleRepository;
6868

69-
@Transactional(timeout = 5)
7069
@Override
7170
public void accept(Vehicle vehicle) {
7271
// do stuff
7372
// save
7473
vehicleRepository.save(vehicle);
7574
}
75+
// OPTIONAL
76+
@Override
77+
public RetryStrategy retryStrategy() {
78+
// run 5 times, multiply the execution count with 4, add the result in HOURS to now.
79+
return new MultiplicativeRetryStrategy(5, ChronoUnit.HOURS, 4)
80+
}
81+
// OPTIONAL
82+
// if the task in accept requires a DB transaction, join them together with the framework
83+
// if true the TransactionTemplate is used. Set here any timeouts.
84+
@Override
85+
public boolean isTransactional() {
86+
return true;
87+
}
88+
}
89+
```
90+
91+
Consider setting a timeout to the `TransactionTemplate`:
92+
93+
```java
94+
@Bean
95+
TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
96+
TransactionTemplate template = new TransactionTemplate(transactionManager);
97+
template.setTimeout(10);
98+
return template;
7699
}
77100
```
78101

79102
### As a closure
80103

81-
Note: this example has no aspects as above the spring _@Transactional_
104+
Simple task will use defaults:
105+
106+
- Not a transactional task, e.g. HTTP calls
107+
- 4 executions, one regular and 3 retries, linear
108+
- using minutes with an offset of 1 which is added to now
82109

83110
```java
84111
@Bean
85-
SpringBeanTask<Vehicle> task1(VehicleRepository vehicleRepository) {
86-
return v -> vehicleRepository.save(v);
112+
SpringBeanTask<Vehicle> task1(VehicleHttpConnector vehicleHttpConnector) {
113+
return v -> vehicleHttpConnector.send(v);
87114
}
88115
```
89116

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,114 @@
11
package org.sterl.spring.persistent_tasks.api;
22

33
import java.time.OffsetDateTime;
4+
import java.time.temporal.ChronoUnit;
5+
import java.time.temporal.TemporalUnit;
46

57
import org.springframework.lang.Nullable;
68

9+
import lombok.RequiredArgsConstructor;
10+
711
@FunctionalInterface
812
public interface RetryStrategy {
913
RetryStrategy NO_RETRY = (c, e) -> false;
1014
/**
1115
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
1216
*/
13-
RetryStrategy THREE_RETRIES = (c, e) -> c < 4;
17+
RetryStrategy THREE_RETRIES = new LinearRetryStrategy(4, ChronoUnit.MINUTES, 1);
1418
/**
1519
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
1620
*/
17-
RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() {
21+
RetryStrategy THREE_RETRIES_IMMEDIATELY = new MultiplicativeRetryStrategy(4, ChronoUnit.MILLIS, 0);
22+
23+
24+
/**
25+
* Determines whether a retry should be attempted based on the current
26+
* execution count and the provided exception.
27+
*
28+
* @param executionCount The number of attempts already made.
29+
* @param error The exception that triggered the retry.
30+
* @return {@code true} if the current execution count is less than
31+
* the maximum execution count; {@code false} otherwise.
32+
*/
33+
boolean shouldRetry(int executionCount, @Nullable Exception error);
34+
35+
/**
36+
* Calculates the time of the next retry attempt based on the current
37+
* execution count and the provided exception.
38+
*
39+
* @param executionCount The number of attempts already made.
40+
* @param exception The exception that triggered the retry.
41+
* @return The {@link OffsetDateTime} representing the time of the next retry attempt.
42+
*/
43+
default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) {
44+
return OffsetDateTime.now().plusMinutes(executionCount);
45+
}
46+
47+
48+
// Default implementations
49+
/**
50+
* A retry strategy that determines the next retry time by adding a fixed
51+
* offset and the execution count to the current time in the specified temporal unit.
52+
*
53+
* <p>This strategy can be used to create retry intervals that increase linearly
54+
* with the number of attempts, providing a predictable delay pattern.</p>
55+
*
56+
* <p>Example:
57+
* If {@code offset = 5}, {@code unit = ChronoUnit.SECONDS}, and
58+
* {@code executionCount = 3}, the next retry will be scheduled after
59+
* {@code 5 + 3 = 8 seconds} from the current time.</p>
60+
*
61+
* <p>Note: The retry attempts will stop once the maximum execution count
62+
* ({@code maxExecutionCount}) is reached.</p>
63+
*
64+
* @author Your Name
65+
*/
66+
@RequiredArgsConstructor
67+
class LinearRetryStrategy implements RetryStrategy {
68+
private final int maxExecutionCount;
69+
private final TemporalUnit unit;
70+
private final int offset;
71+
1872
@Override
1973
public boolean shouldRetry(int executionCount, Exception error) {
20-
return executionCount < 4;
74+
return maxExecutionCount > executionCount;
2175
}
2276
@Override
2377
public OffsetDateTime retryAt(int executionCount, Exception error) {
24-
return OffsetDateTime.now();
78+
return OffsetDateTime.now().plus(offset + executionCount, unit);
2579
}
26-
};
80+
}
2781

2882
/**
29-
* Check if a retry should be done.
83+
* A retry strategy that determines the next retry time by multiplying
84+
* the execution count by a scaling factor and adding the result to the
85+
* current time in the specified temporal unit.
86+
*
87+
* <p>This strategy can be used to create retry intervals that increase
88+
* multiplicatively with the number of attempts, providing a way to progressively
89+
* delay retries.</p>
3090
*
31-
* @param executionCount 0 based counter how often the execution was tried
32-
* @param error the exception, <code>null</code> on a timeout
33-
*/
34-
boolean shouldRetry(int executionCount, @Nullable Exception error);
35-
36-
/**
37-
* By default a linear retry strategy, adding one minute for each failed try.
91+
* <p>Example:
92+
* If {@code scalingFactor = 2}, {@code unit = ChronoUnit.SECONDS}, and
93+
* {@code executionCount = 3}, the next retry will be scheduled after
94+
* {@code 2 * 3 = 6 seconds} from the current time.</p>
3895
*
39-
* @param executionCount 0 based counter how often the execution was tried
40-
* @param error the exception, <code>null</code> on a timeout
96+
* <p>Note: The retry attempts will stop once the maximum execution count
97+
* ({@code maxExecutionCount}) is reached.</p>
4198
*/
42-
default OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
43-
return OffsetDateTime.now().plusMinutes(1 + executionCount);
99+
@RequiredArgsConstructor
100+
class MultiplicativeRetryStrategy implements RetryStrategy {
101+
private final int maxExecutionCount;
102+
private final TemporalUnit unit;
103+
private final int scalingFactor;
104+
105+
@Override
106+
public boolean shouldRetry(int executionCount, Exception error) {
107+
return maxExecutionCount > executionCount;
108+
}
109+
@Override
110+
public OffsetDateTime retryAt(int executionCount, Exception error) {
111+
return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
112+
}
44113
}
45114
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/SpringBeanTask.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,19 @@ public interface SpringBeanTask<T extends Serializable> extends Consumer<T> {
1111
default RetryStrategy retryStrategy() {
1212
return RetryStrategy.THREE_RETRIES;
1313
}
14+
15+
/**
16+
* Whether the task is transaction or not. If <code>true</code> the execution
17+
* is wrapped into the default transaction template together with the state update
18+
* and the following events:
19+
* <ol>
20+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent</li>
21+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent</li>
22+
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent</li>
23+
* </ol>
24+
* @return {@code true} if the task is transactional; {@code false} otherwise.
25+
*/
26+
default boolean isTransactional() {
27+
return false;
28+
}
1429
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,19 @@ public List<Future<TriggerKey>> triggerNextTasks() {
103103
*/
104104
@NonNull
105105
public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
106-
List<TriggerEntity> triggers;
107106
if (taskExecutor.getFreeThreads() > 0) {
108-
triggers = triggerService.lockNextTrigger(
109-
name, taskExecutor.getFreeThreads(), timeDue);
107+
final var result = trx.execute(t -> {
108+
var triggers = triggerService.lockNextTrigger(name,
109+
taskExecutor.getFreeThreads(), timeDue);
110+
pingRegistry().addRunning(triggers.size());
111+
return triggers;
112+
});
113+
114+
return taskExecutor.submit(result);
110115
} else {
111-
triggers = Collections.emptyList();
116+
pingRegistry();
117+
return Collections.emptyList();
112118
}
113-
var result = taskExecutor.submit(triggers);
114-
pingRegistry();
115-
return result;
116119
}
117120

118121
/**
@@ -122,23 +125,25 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
122125
*/
123126
public <T extends Serializable> Future<TriggerKey> runOrQueue(
124127
AddTriggerRequest<T> triggerRequest) {
125-
var runningTrigger = trx.execute(t -> {
128+
final var runningTrigger = trx.execute(t -> {
126129
var trigger = triggerService.queue(triggerRequest);
127130
// exit now if this trigger is for the future ...
128131
if (trigger.shouldRunInFuture()) return trigger;
129132

130133
if (taskExecutor.getFreeThreads() > 0) {
131134
trigger = triggerService.markTriggersAsRunning(trigger, name);
135+
pingRegistry().addRunning(1);
132136
} else {
133137
log.debug("Currently not enough free thread available {} of {} in use. Task {} queued.",
134138
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
135139
}
136140
return trigger;
137141
});
138-
Future<TriggerKey> result = CompletableFuture.completedFuture(runningTrigger.getKey());
142+
Future<TriggerKey> result;
139143
if (runningTrigger.isRunning()) {
140144
result = taskExecutor.submit(runningTrigger);
141-
pingRegistry();
145+
} else {
146+
result = CompletableFuture.completedFuture(runningTrigger.getKey());
142147
}
143148
return result;
144149
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public TaskExecutorComponent(TriggerService triggerService, int maxThreads) {
4747

4848
@NonNull
4949
public List<Future<TriggerKey>> submit(List<TriggerEntity> trigger) {
50+
if (trigger == null || trigger.isEmpty()) return Collections.emptyList();
51+
5052
final List<Future<TriggerKey>> result = new ArrayList<>(trigger.size());
5153
for (TriggerEntity triggerEntity : trigger) {
5254
result.add(submit(triggerEntity));

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,9 @@ public SchedulerEntity(String name) {
4343
super();
4444
this.id = name;
4545
}
46+
47+
public SchedulerEntity addRunning(int count) {
48+
this.runnungTasks += count;
49+
return this;
50+
}
4651
}

core/src/main/java/org/sterl/spring/persistent_tasks/task/model/RegisteredTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,8 @@ public void accept(T state) {
3838
public RetryStrategy retryStrategy() {
3939
return this.fun.retryStrategy();
4040
}
41+
@Override
42+
public boolean isTransactional() {
43+
return this.fun.isTransactional();
44+
}
4145
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2020
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
2121
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent;
22-
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCompleteEvent;
22+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent;
2323
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
2424
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2525
import org.sterl.spring.persistent_tasks.trigger.repository.TriggerRepository;
@@ -55,8 +55,8 @@ public Optional<TriggerEntity> completeTaskWithStatus(TriggerKey key, Exception
5555
result.ifPresent(t -> {
5656
t.complete(e);
5757

58-
if (t.getData().getStatus() != TriggerStatus.FAILED) {
59-
publisher.publishEvent(new TriggerCompleteEvent(t));
58+
if (t.getData().getStatus() == TriggerStatus.SUCCESS) {
59+
publisher.publishEvent(new TriggerSuccessEvent(t));
6060
log.debug("Setting {} to status={} {}", key, t.getData().getStatus(),
6161
e == null ? "" : "error=" + e.getClass().getSimpleName());
6262
} else {

0 commit comments

Comments
 (0)