Skip to content

Commit b48a953

Browse files
committed
fixed test
1 parent 8a0357d commit b48a953

File tree

9 files changed

+88
-50
lines changed

9 files changed

+88
-50
lines changed

CHANGELOG.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [v1.0.1]
44

5-
- Showing trigger history entries
6-
- Added `PersistentTaskService` as a new abstraction
7-
- Added cancel trigger button to the UI
5+
- Showing trigger history entries
6+
- Added `PersistentTaskService` as a new abstraction
7+
- Added cancel trigger button to the UI
8+
-

core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
package org.sterl.spring.persistent_tasks.api;
22

33
import java.time.OffsetDateTime;
4+
import java.time.temporal.ChronoUnit;
45

56
import org.springframework.lang.Nullable;
67

78
@FunctionalInterface
89
public interface RetryStrategy {
910
RetryStrategy NO_RETRY = (c, e) -> false;
10-
RetryStrategy THREE_RETRIES = (c, e) -> c < 3;
11+
/**
12+
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
13+
*/
14+
RetryStrategy THREE_RETRIES = (c, e) -> c < 4;
15+
/**
16+
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
17+
*/
1118
RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() {
1219
@Override
1320
public boolean shouldRetry(int executionCount, Exception error) {
14-
return executionCount < 3;
21+
return executionCount < 4;
1522
}
1623
@Override
1724
public OffsetDateTime retryAt(int executionCount, Exception error) {
18-
return OffsetDateTime.now();
25+
return OffsetDateTime.now().truncatedTo(ChronoUnit.SECONDS);
1926
}
2027
};
2128

@@ -34,6 +41,6 @@ public OffsetDateTime retryAt(int executionCount, Exception error) {
3441
* @param error the exception, <code>null</code> on a timeout
3542
*/
3643
default OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
37-
return OffsetDateTime.now().plusMinutes(1 + executionCount);
44+
return OffsetDateTime.now().plusMinutes(1 + executionCount).truncatedTo(ChronoUnit.SECONDS);
3845
}
3946
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
import org.springframework.transaction.annotation.Transactional;
1313
import org.springframework.transaction.support.TransactionTemplate;
1414
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
15-
import org.sterl.spring.persistent_tasks.api.TaskId;
1615
import org.sterl.spring.persistent_tasks.api.TriggerKey;
1716
import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent;
1817
import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent;
1918
import org.sterl.spring.persistent_tasks.scheduler.entity.SchedulerEntity;
19+
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2020
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
2121
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2222

@@ -119,30 +119,29 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
119119
/**
120120
* Runs the given trigger if a free threads are available
121121
* and the runAt time is not in the future.
122+
* @return the reference to the {@link Future} with the key, if no threads are available it is resolved
122123
*/
123-
public <T extends Serializable> Optional<Future<TriggerKey>> runOrQueue(
124+
public <T extends Serializable> Future<TriggerKey> runOrQueue(
124125
AddTriggerRequest<T> triggerRequest) {
125-
return trx.execute(t -> {
126-
Optional<Future<TriggerKey>> result = Optional.empty();
126+
var runningTrigger = trx.execute(t -> {
127127
var trigger = triggerService.queue(triggerRequest);
128128
// exit now if this trigger is for the future ...
129-
if (trigger.shouldRunInFuture()) return Optional.of(
130-
CompletableFuture.completedFuture(trigger.getKey()));
129+
if (trigger.shouldRunInFuture()) return trigger;
131130

132131
if (taskExecutor.getFreeThreads() > 0) {
133132
trigger = triggerService.markTriggersAsRunning(trigger, name);
134-
result = Optional.of(taskExecutor.submit(trigger));
135-
pingRegistry();
136133
} else {
137134
log.debug("Currently not enough free thread available {} of {} in use. Task {} queued.",
138135
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
139136
}
140-
return result;
137+
return trigger;
141138
});
142-
}
143-
144-
public <T extends Serializable> TriggerEntity queue(TaskId<T> taskId, T state) {
145-
return triggerService.queue(taskId.newUniqueTrigger(state));
139+
Future<TriggerKey> result = CompletableFuture.completedFuture(runningTrigger.getKey());
140+
if (runningTrigger.isRunning()) {
141+
result = taskExecutor.submit(runningTrigger);
142+
pingRegistry();
143+
}
144+
return result;
146145
}
147146

148147
public SchedulerEntity getStatus() {
@@ -163,4 +162,16 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
163162
running, runningKeys, schedulers);
164163
return triggerService.rescheduleAbandonedTasks(timeout);
165164
}
165+
166+
/**
167+
* Adds or updates an existing trigger based on its {@link TriggerKey}
168+
*
169+
* @param <T> the state type
170+
* @param trigger the {@link AddTriggerRequest} to save
171+
* @return the saved {@link TriggerEntity}
172+
* @throws IllegalStateException if the trigger already exists and is {@link TriggerStatus#RUNNING}
173+
*/
174+
public <T extends Serializable> TriggerEntity queue(AddTriggerRequest<T> trigger) {
175+
return triggerService.queue(trigger);
176+
}
166177
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/LockNextTriggerComponent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import org.sterl.spring.persistent_tasks.trigger.repository.TriggerRepository;
1313

1414
import lombok.RequiredArgsConstructor;
15+
import lombok.extern.slf4j.Slf4j;
1516

1617
/**
1718
* Own transaction management, as this is the whole sense of this component
1819
*/
20+
@Slf4j
1921
@Component
2022
@Transactional(timeout = 5)
2123
@RequiredArgsConstructor
@@ -28,7 +30,8 @@ public List<TriggerEntity> loadNext(String runningOn, int count, OffsetDateTime
2830
timeDueAt, TriggerStatus.WAITING, PageRequest.of(0, count));
2931

3032
tasks.forEach(t -> t.runOn(runningOn));
31-
33+
log.debug("loadNext triggers for={} found={} triggers with dueAt={}",
34+
runningOn, tasks.size(), timeDueAt);
3235
return tasks;
3336
}
3437

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,18 @@ private Optional<TriggerEntity> handleTaskException(TriggerEntity trigger,
5959
task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
6060

6161
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
62-
log.warn("{} failed, retry will be done at={}!",
63-
trigger.getKey(), retryAt, e);
6462

6563
result = editTrigger.retryTrigger(trigger.getKey(), retryAt);
64+
if (result.isPresent()) {
65+
log.warn("{} failed, retry will be done at={} status={}!",
66+
trigger.getKey(),
67+
result.get().getData().getRunAt(),
68+
result.get().getData().getStatus(),
69+
e);
70+
} else {
71+
log.error("Trigger with key={} not found and may be at a wrong state!",
72+
trigger.getKey(), e);
73+
}
6674
} else {
6775
log.error("{} failed, no more retries! {}", trigger.getKey(),
6876
e == null ? "No exception given." : e.getMessage());

core/src/test/java/org/sterl/spring/persistent_tasks/TaskSchedulerServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public RetryStrategy retryStrategy() {
3636
persistentTaskService.executeTriggersAndWait();
3737

3838
// THEN
39-
assertThat(asserts.getCount("hallo")).isEqualTo(3);
39+
assertThat(asserts.getCount("hallo")).isEqualTo(4);
4040
assertThat(triggerService.countTriggers()).isZero();
4141
// AND
4242
var trigger = historyService.findStatus(runTrigger.getId()).get();
43-
assertThat(trigger.getData().getExecutionCount()).isEqualTo(3);
43+
assertThat(trigger.getData().getExecutionCount()).isEqualTo(4);
4444
assertThat(trigger.getData().getExceptionName()).isEqualTo(RuntimeException.class.getName());
4545
assertThat(trigger.getData().getLastException()).contains("NOPE!");
4646
}

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ void runSimpleTaskMultipleTimesTest() throws Exception {
107107
// GIVEN
108108
TaskId<String> taskId = taskService.replace("foo", c -> asserts.info(c));
109109
for (int i = 1; i < 21; ++i) {
110-
subject.queue(taskId, i + " state");
110+
subject.queue(taskId.newTrigger(i + " state").build());
111111
}
112112

113113
// WHEN

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

5-
import java.util.Optional;
65
import java.util.concurrent.ExecutionException;
7-
import java.util.concurrent.Future;
86
import java.util.concurrent.atomic.AtomicBoolean;
97

108
import org.junit.jupiter.api.BeforeEach;
@@ -18,8 +16,7 @@
1816
import org.sterl.spring.persistent_tasks.api.SpringBeanTask;
1917
import org.sterl.spring.persistent_tasks.api.TaskId.TaskTriggerBuilder;
2018
import org.sterl.spring.persistent_tasks.api.TriggerKey;
21-
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
22-
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
19+
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2320
import org.sterl.spring.sample_app.person.PersonBE;
2421
import org.sterl.spring.sample_app.person.PersonRepository;
2522

@@ -67,43 +64,37 @@ void testSaveEntity() throws Exception {
6764
final var trigger = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build();
6865

6966
// WHEN
70-
final Optional<Future<TriggerKey>> t = subject.runOrQueue(trigger);
67+
subject.runOrQueue(trigger).get();
7168

7269
// THEN
73-
assertThat(t).isPresent();
74-
t.get().get();
7570
assertThat(personRepository.count()).isOne();
7671
}
7772

7873
@Test
7974
void testRollbackAndRetry() throws Exception {
8075
// GIVEN
81-
final var trigger = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build();
76+
final var triggerRequest = TaskTriggerBuilder.newTrigger("savePerson").state("Paul").build();
8277
sendError.set(true);
8378
// WHEN
84-
Optional<Future<TriggerKey>> triggerKey = subject.runOrQueue(trigger);
79+
var key = subject.runOrQueue(triggerRequest);
8580
// THEN
86-
assertExecutionCount(triggerKey, 1);
87-
assertThat(personRepository.count()).isZero();
81+
key.get();
82+
assertThat(persistentTaskService.getLastTriggerData(key.get()).get().getStatus())
83+
.isEqualTo(TriggerStatus.WAITING);
8884

8985
// WHEN
9086
sendError.set(false);
91-
subject.triggerNextTasks().get(0).get();
87+
var executed = persistentTaskService.executeTriggersAndWait();
88+
9289
// THEN
93-
assertExecutionCount(triggerKey, 2);
90+
assertThat(executed).hasSize(1);
91+
assertExecutionCount(key.get(), 2);
9492
assertThat(personRepository.count()).isOne();
9593
}
9694

97-
private void assertExecutionCount(Optional<Future<TriggerKey>> refId, int count) throws InterruptedException, ExecutionException {
98-
final TriggerKey triggerKey = refId.get().get();
99-
final Optional<TriggerEntity> t = triggerService.get(triggerKey);
100-
TriggerData data;
101-
if (t.isEmpty()) {
102-
data = historyService.findLastKnownStatus(triggerKey).get().getData();
103-
} else {
104-
data = t.get().getData();
105-
}
106-
107-
assertThat(data.getExecutionCount()).isEqualTo(count);
95+
private void assertExecutionCount(TriggerKey triggerKey, int count) throws InterruptedException, ExecutionException {
96+
var data = persistentTaskService.getLastTriggerData(triggerKey);
97+
assertThat(data).isPresent();
98+
assertThat(data.get().getExecutionCount()).isEqualTo(count);
10899
}
109100
}

core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,23 @@ void testTriggerChainTask() throws Exception {
157157
assertThat(e.get().getData().getEnd()).isNotNull();
158158
assertThat(e.get().getData().getExecutionCount()).isOne();
159159
}
160+
161+
@Test
162+
void testFailedIsOnRetry() throws Exception {
163+
// GIVEN
164+
TaskId<String> task = taskService.<String>replace("foo", c -> {
165+
throw new IllegalArgumentException("Nope! " + c);
166+
});
167+
168+
// WHEN
169+
var trigger = subject.queue(task.newTrigger().state("Hallo :-)").build());
170+
subject.run(subject.lockNextTrigger("test"));
171+
172+
// THEN
173+
trigger = triggerService.get(trigger.getKey()).get();
174+
assertThat(trigger.getData().getRunAt()).isAfter(OffsetDateTime.now());
175+
assertThat(trigger.getData().getStatus()).isEqualTo(TriggerStatus.WAITING);
176+
}
160177

161178
@Test
162179
void testFailedSavingException() throws Exception {

0 commit comments

Comments
 (0)