Skip to content

Commit ace87b9

Browse files
committed
update
1 parent f3ff1d7 commit ace87b9

File tree

4 files changed

+87
-15
lines changed

4 files changed

+87
-15
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,37 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
6060
if (event.triggers().size() == 1) {
6161
runOrQueue(event.triggers().iterator().next());
6262
} else {
63-
queueAll(event.triggers());
63+
queue(event.triggers());
6464
}
6565
}
6666

6767
/**
68-
* Queues the given triggers.
68+
* Queues/updates the given triggers, if the {@link TriggerKey} is already present
6969
*
7070
* @param <T> the state type
7171
* @param triggers the triggers to add
7272
* @return the {@link TriggerKey}
7373
*/
7474
@Transactional(timeout = 10)
7575
@NonNull
76-
public <T extends Serializable> List<TriggerKey> queueAll(Collection<AddTriggerRequest<T>> triggers) {
76+
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
7777
return triggers.stream() //
7878
.map(t -> triggerService.queue(t)) //
7979
.map(TriggerEntity::getKey) //
8080
.toList();
8181
}
82+
/**
83+
* Queues/updates the given trigger, if the {@link TriggerKey} is already present.
84+
*
85+
* @param <T> the state type
86+
* @param trigger the trigger to add
87+
* @return the {@link TriggerKey}
88+
*/
89+
@Transactional(timeout = 5)
90+
@NonNull
91+
public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
92+
return triggerService.queue(trigger).getKey();
93+
}
8294

8395
/**
8496
* Runs the given trigger if a free threads are available

core/src/main/java/org/sterl/spring/persistent_tasks/history/component/TriggerHistoryComponent.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44

55
import org.springframework.context.event.EventListener;
66
import org.springframework.transaction.annotation.Transactional;
7+
import org.springframework.transaction.event.TransactionPhase;
8+
import org.springframework.transaction.event.TransactionalEventListener;
79
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
810
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
911
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository;
1012
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
13+
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
1114
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalCompontant;
1215
import org.sterl.spring.persistent_tasks.trigger.event.TriggerLifeCycleEvent;
16+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent;
1317

1418
import lombok.RequiredArgsConstructor;
1519
import lombok.extern.slf4j.Slf4j;
@@ -22,22 +26,37 @@ public class TriggerHistoryComponent {
2226
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
2327
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
2428

29+
//@Transactional(timeout = 10)
30+
//@EventListener
31+
void onRunning(TriggerRunningEvent e) {
32+
log.debug("Received event={} for {} new status={}",
33+
e.getClass().getSimpleName(),
34+
e.key(), e.status());
35+
36+
execute(e.id(), e.data());
37+
}
38+
2539
@Transactional(timeout = 10)
2640
@EventListener
27-
public void onPersistentTaskEvent(TriggerLifeCycleEvent e) {
41+
//@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
42+
void onPersistentTaskEvent(TriggerLifeCycleEvent e) {
43+
//if (e instanceof TriggerRunningEvent) return; // we have an own listener for that
2844
log.debug("Received event={} for {} new status={}",
2945
e.getClass().getSimpleName(),
3046
e.key(), e.status());
3147

32-
33-
var state = new TriggerHistoryLastStateEntity();
34-
state.setId(e.id());
35-
state.setData(e.getData().copy());
48+
execute(e.id(), e.data());
49+
}
50+
51+
public void execute(final long triggerId, final TriggerData data) {
52+
final var state = new TriggerHistoryLastStateEntity();
53+
state.setId(triggerId);
54+
state.setData(data.copy());
3655
triggerHistoryLastStateRepository.save(state);
3756

3857
var detail = new TriggerHistoryDetailEntity();
39-
detail.setInstanceId(e.id());
40-
detail.setData(e.getData().toBuilder()
58+
detail.setInstanceId(triggerId);
59+
detail.setData(data.toBuilder()
4160
.state(null)
4261
.createdTime(OffsetDateTime.now())
4362
.build());

core/src/test/java/org/sterl/spring/persistent_tasks/history/HistoryServiceTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,22 @@ void testTriggerHistory() throws TimeoutException, InterruptedException {
5454
assertThat(triggers.get(1).getData().getStatus()).isEqualTo(TriggerStatus.RUNNING);
5555
assertThat(triggers.get(2).getData().getStatus()).isEqualTo(TriggerStatus.WAITING);
5656
}
57+
58+
@Test
59+
void testTriggerHistoryTrx() throws TimeoutException, InterruptedException {
60+
// GIVEN
61+
final var trigger = Task3.ID.newUniqueTrigger("Hallo");
62+
persistentTaskService.queue(trigger);
63+
// WHEN
64+
hibernateAsserts.reset();
65+
schedulerService.triggerNextTasks().forEach(t -> {
66+
try {t.get();} catch (Exception ex) {throw new RuntimeException(ex);}
67+
});
68+
69+
// THEN
70+
// 2 to get the work
71+
// 1 for the running history
72+
// 1 for the success history
73+
hibernateAsserts.assertTrxCount(4);
74+
}
5775
}

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44

55
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.TimeoutException;
67
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.concurrent.atomic.AtomicInteger;
79

810
import org.junit.jupiter.api.BeforeEach;
911
import org.junit.jupiter.api.Test;
1012
import org.springframework.beans.factory.annotation.Autowired;
1113
import org.springframework.context.annotation.Bean;
1214
import org.springframework.context.annotation.Configuration;
1315
import org.sterl.spring.persistent_tasks.AbstractSpringTest;
16+
import org.sterl.spring.persistent_tasks.AbstractSpringTest.TaskConfig.Task3;
1417
import org.sterl.spring.persistent_tasks.api.PersistentTask;
1518
import org.sterl.spring.persistent_tasks.api.RetryStrategy;
1619
import org.sterl.spring.persistent_tasks.api.TaskId.TaskTriggerBuilder;
@@ -23,7 +26,8 @@
2326
class SchedulerServiceTransactionTest extends AbstractSpringTest {
2427

2528
private SchedulerService subject;
26-
private static AtomicBoolean sendError = new AtomicBoolean(false);
29+
private static final AtomicBoolean sendError = new AtomicBoolean(false);
30+
private static final AtomicInteger sleepTime = new AtomicInteger(50);
2731
@Autowired private PersonRepository personRepository;
2832

2933
@Configuration
@@ -34,10 +38,9 @@ TransactionalTask<String> savePersonInTrx(PersonRepository personRepository) {
3438
@Override
3539
public void accept(String name) {
3640
try {
37-
Thread.sleep(50);
41+
if (sleepTime.intValue() > 0) Thread.sleep(sleepTime.intValue());
3842
} catch (InterruptedException e) {
39-
// TODO Auto-generated catch block
40-
e.printStackTrace();
43+
Thread.interrupted();
4144
}
4245
personRepository.save(new PersonBE(name));
4346
if (sendError.get()) {
@@ -49,7 +52,7 @@ public RetryStrategy retryStrategy() {
4952
}
5053
};
5154
}
52-
55+
5356
@Bean
5457
PersistentTask<String> savePersonNoTrx(PersonRepository personRepository) {
5558
return new PersistentTask<>() {
@@ -172,6 +175,26 @@ void testRollbackAndRetry() throws Exception {
172175
assertExecutionCount(key, 2);
173176
assertThat(personRepository.count()).isOne();
174177
}
178+
179+
@Test
180+
void testTriggerHistoryTrx() throws TimeoutException, InterruptedException {
181+
// GIVEN
182+
sleepTime.set(0);
183+
final var trigger = Task3.ID.newUniqueTrigger("savePersonNoTrx");
184+
persistentTaskService.queue(trigger);
185+
// WHEN
186+
hibernateAsserts.reset();
187+
schedulerService.triggerNextTasks().forEach(t -> {
188+
try {t.get();} catch (Exception ex) {throw new RuntimeException(ex);}
189+
});
190+
191+
// THEN
192+
// 2 to get the work
193+
// 1 for the running history
194+
// 1 for the success history
195+
hibernateAsserts.assertTrxCount(4);
196+
}
197+
175198

176199
private void assertExecutionCount(TriggerKey triggerKey, int count) throws InterruptedException, ExecutionException {
177200
var data = persistentTaskService.getLastTriggerData(triggerKey);

0 commit comments

Comments
 (0)