Skip to content

Commit 16527da

Browse files
committed
adjusted transaction handling during the flow
1 parent ace87b9 commit 16527da

File tree

12 files changed

+231
-142
lines changed

12 files changed

+231
-142
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.Future;
1010

1111
import org.springframework.context.event.EventListener;
12+
import org.springframework.data.domain.Pageable;
1213
import org.springframework.lang.NonNull;
1314
import org.springframework.stereotype.Service;
1415
import org.springframework.transaction.annotation.Transactional;
@@ -55,6 +56,12 @@ public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
5556
}
5657
}
5758

59+
public Optional<TriggerData> getLastDetailData(TriggerKey key) {
60+
var data = historyService.findAllDetailsForKey(key, Pageable.ofSize(1));
61+
if (data.isEmpty()) return Optional.empty();
62+
return Optional.of(data.getContent().get(0).getData());
63+
}
64+
5865
@EventListener
5966
void queue(TriggerTaskCommand<? extends Serializable> event) {
6067
if (event.triggers().size() == 1) {

core/src/main/java/org/sterl/spring/persistent_tasks/SpringPersistentTasksConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
44
import org.springframework.context.annotation.ComponentScan;
55
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.scheduling.annotation.EnableAsync;
67
import org.springframework.scheduling.annotation.EnableScheduling;
78

89
@Configuration
910
@EnableScheduling
11+
@EnableAsync
1012
@AutoConfigurationPackage(basePackageClasses = EnableSpringPersistentTasks.class)
1113
@ComponentScan(basePackageClasses = EnableSpringPersistentTasks.class)
1214
public class SpringPersistentTasksConfig {

core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public Page<TriggerHistoryDetailEntity> findAllDetailsForKey(TriggerKey key) {
6464
return findAllDetailsForKey(key, PageRequest.of(0, 100));
6565
}
6666
public Page<TriggerHistoryDetailEntity> findAllDetailsForKey(TriggerKey key, Pageable page) {
67-
page = sortByIdIfNeeded(page);
67+
page = applyDefaultSortIfNeeded(page);
6868
return triggerHistoryDetailRepository.listKnownStatusFor(key, page);
6969
}
7070

@@ -97,7 +97,7 @@ public long countTriggers(TriggerKey key) {
9797
public Page<TriggerHistoryLastStateEntity> findTriggerState(
9898
@Nullable TriggerKey key, Pageable page) {
9999

100-
page = sortByIdIfNeeded(page);
100+
page = applyDefaultSortIfNeeded(page);
101101
if (key == null) return triggerHistoryLastStateRepository.findAll(page);
102102
if (key.getId() == null && key.getTaskName() == null) return triggerHistoryLastStateRepository.findAll(page);
103103
if (key.getId() == null && key.getTaskName() != null) {
@@ -109,10 +109,10 @@ public Page<TriggerHistoryLastStateEntity> findTriggerState(
109109
page);
110110
}
111111

112-
private Pageable sortByIdIfNeeded(Pageable page) {
112+
private Pageable applyDefaultSortIfNeeded(Pageable page) {
113113
if (page.getSort() == Sort.unsorted()) {
114114
return PageRequest.of(page.getPageNumber(), page.getPageSize(),
115-
Sort.by(Direction.DESC, "id"));
115+
Sort.by(Direction.DESC, "data.createdTime", "id"));
116116
}
117117
return page;
118118
}

core/src/main/java/org/sterl/spring/persistent_tasks/history/component/TriggerHistoryComponent.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.time.OffsetDateTime;
44

55
import org.springframework.context.event.EventListener;
6+
import org.springframework.scheduling.annotation.Async;
67
import org.springframework.transaction.annotation.Transactional;
78
import org.springframework.transaction.event.TransactionPhase;
89
import org.springframework.transaction.event.TransactionalEventListener;
@@ -26,21 +27,26 @@ public class TriggerHistoryComponent {
2627
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
2728
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
2829

29-
//@Transactional(timeout = 10)
30-
//@EventListener
31-
void onRunning(TriggerRunningEvent e) {
30+
// we have to ensure to run in an own transaction
31+
// as if the trigger fails, a rollback would also remove this entry
32+
// furthermore async to ensure that we would not block
33+
// as REQURES_NEW would block two DB connections ...
34+
@Async
35+
@Transactional(timeout = 10)
36+
@EventListener
37+
public void onRunning(TriggerRunningEvent e) {
3238
log.debug("Received event={} for {} new status={}",
3339
e.getClass().getSimpleName(),
3440
e.key(), e.status());
3541

3642
execute(e.id(), e.data());
3743
}
3844

39-
@Transactional(timeout = 10)
40-
@EventListener
41-
//@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
45+
// @Transactional(timeout = 10)
46+
// @EventListener
47+
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
4248
void onPersistentTaskEvent(TriggerLifeCycleEvent e) {
43-
//if (e instanceof TriggerRunningEvent) return; // we have an own listener for that
49+
if (e instanceof TriggerRunningEvent) return; // we have an own listener for that
4450
log.debug("Received event={} for {} new status={}",
4551
e.getClass().getSimpleName(),
4652
e.key(), e.status());

core/src/main/java/org/sterl/spring/persistent_tasks/history/repository/TriggerHistoryLastStateRepository.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,4 @@
44

55
public interface TriggerHistoryLastStateRepository extends HistoryTriggerRepository<TriggerHistoryLastStateEntity> {
66

7-
87
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,10 @@
3030
import lombok.extern.slf4j.Slf4j;
3131

3232
/**
33-
* Use this service if direct access to the Scheduler is required.
34-
* <br>
35-
* <b>Note:</b> This Service is optional, as it could be disabled if no background
36-
* tasks should be execute on this note. As so the {@link TriggerService} should be
37-
* preferred to queue tasks.
33+
* Use this service if direct access to the Scheduler is required. <br>
34+
* <b>Note:</b> This Service is optional, as it could be disabled if no
35+
* background tasks should be execute on this note. As so the
36+
* {@link TriggerService} should be preferred to queue tasks.
3837
*/
3938
@RequiredArgsConstructor
4039
@Slf4j
@@ -73,22 +72,20 @@ public void shutdownNow() {
7372
editSchedulerStatus.offline(name);
7473
}
7574

75+
@Transactional
7676
public SchedulerEntity pingRegistry() {
77-
// using trx template to ensure the TRX is started if we use this method internally
78-
return trx.execute(t -> {
79-
var result = editSchedulerStatus.checkinToRegistry(name);
80-
result.setRunnungTasks(taskExecutor.getRunningTasks());
81-
result.setTasksSlotCount(taskExecutor.getMaxThreads());
82-
log.debug("Ping {}", result);
83-
return result;
84-
});
77+
var result = editSchedulerStatus.checkinToRegistry(name);
78+
result.setRunnungTasks(taskExecutor.getRunningTasks());
79+
result.setTasksSlotCount(taskExecutor.getMaxThreads());
80+
log.debug("Ping {}", result);
81+
return result;
8582
}
86-
83+
8784
public SchedulerEntity getScheduler() {
8885
var result = editSchedulerStatus.get(name);
8986
return result;
9087
}
91-
88+
9289
public Optional<SchedulerEntity> findStatus(String name) {
9390
return editSchedulerStatus.find(name);
9491
}
@@ -102,8 +99,8 @@ public List<Future<TriggerKey>> triggerNextTasks() {
10299
}
103100

104101
/**
105-
* Like {@link #triggerNextTasks()} but allows to set the time e.g. to the future to trigger
106-
* tasks which wouldn't be triggered now.
102+
* Like {@link #triggerNextTasks()} but allows to set the time e.g. to the
103+
* future to trigger tasks which wouldn't be triggered now.
107104
* <p>
108105
* This method should not be called in a transaction!
109106
* </p>
@@ -112,11 +109,10 @@ public List<Future<TriggerKey>> triggerNextTasks() {
112109
public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
113110
if (taskExecutor.getFreeThreads() > 0) {
114111
final var result = trx.execute(t -> {
115-
var triggers = triggerService.lockNextTrigger(name,
116-
taskExecutor.getFreeThreads(), timeDue);
117-
pingRegistry().addRunning(triggers.size());
118-
return triggers;
119-
});
112+
var triggers = triggerService.lockNextTrigger(name, taskExecutor.getFreeThreads(), timeDue);
113+
pingRegistry().addRunning(triggers.size());
114+
return triggers;
115+
});
120116

121117
return taskExecutor.submit(result);
122118
} else {
@@ -126,13 +122,14 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
126122
}
127123

128124
/**
129-
* Runs the given trigger if a free threads are available
130-
* and the runAt time is not in the future.
131-
* @return the reference to the {@link Future} with the key, if no threads are available it is resolved
125+
* Runs the given trigger if a free threads are available and the runAt time is
126+
* not in the future.
127+
*
128+
* @return the reference to the {@link Future} with the key, if no threads are
129+
* available it is resolved
132130
*/
133131
@Transactional(timeout = 10)
134-
public <T extends Serializable> TriggerKey runOrQueue(
135-
AddTriggerRequest<T> triggerRequest) {
132+
public <T extends Serializable> TriggerKey runOrQueue(AddTriggerRequest<T> triggerRequest) {
136133
var trigger = triggerService.queue(triggerRequest);
137134

138135
if (!trigger.shouldRunInFuture()) {
@@ -141,14 +138,14 @@ public <T extends Serializable> TriggerKey runOrQueue(
141138
pingRegistry().addRunning(1);
142139
shouldRun.put(trigger.getId(), trigger);
143140
} else {
144-
log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.",
141+
log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.",
145142
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
146143
}
147144
}
148145
// we will listen for the commit event to execute this trigger ...
149146
return trigger.getKey();
150147
}
151-
148+
152149
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
153150
void checkIfTrigerIsRunning(TriggerAddedEvent addedTrigger) {
154151
final var toRun = shouldRun.remove(addedTrigger.id());
@@ -166,14 +163,11 @@ public SchedulerEntity getStatus() {
166163
public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
167164
var schedulers = editSchedulerStatus.findOnlineSchedulers(timeout);
168165

169-
final List<TriggerKey> runningKeys = this.taskExecutor
170-
.getRunningTriggers().stream()
171-
.map(TriggerEntity::getKey)
166+
final List<TriggerKey> runningKeys = this.taskExecutor.getRunningTriggers().stream().map(TriggerEntity::getKey)
172167
.toList();
173168

174169
int running = triggerService.markTriggersAsRunning(runningKeys, name);
175-
log.debug("({}) - {} trigger(s) are running on {} schedulers",
176-
running, runningKeys, schedulers);
170+
log.debug("({}) - {} trigger(s) are running on {} schedulers", running, runningKeys, schedulers);
177171
return triggerService.rescheduleAbandonedTasks(timeout);
178172
}
179173
}

core/src/main/java/org/sterl/spring/persistent_tasks/task/TaskService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
import java.util.Set;
66
import java.util.function.Consumer;
77

8-
import org.springframework.boot.sql.init.dependency.DependsOnDatabaseInitialization;
98
import org.springframework.lang.NonNull;
109
import org.springframework.stereotype.Service;
11-
import org.springframework.transaction.annotation.Transactional;
1210
import org.springframework.transaction.support.TransactionTemplate;
1311
import org.sterl.spring.persistent_tasks.api.PersistentTask;
1412
import org.sterl.spring.persistent_tasks.api.TaskId;
@@ -18,14 +16,12 @@
1816
import lombok.RequiredArgsConstructor;
1917

2018
@Service
21-
@DependsOnDatabaseInitialization
2219
@RequiredArgsConstructor
2320
public class TaskService {
2421

2522
private final TaskTransactionComponent taskTransactionComponent;
2623
private final TaskRepository taskRepository;
2724

28-
@Transactional(readOnly = true)
2925
public Set<TaskId<? extends Serializable>> findAllTaskIds() {
3026
return this.taskRepository.all();
3127
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.sterl.spring.persistent_tasks.trigger.component;
2+
3+
import java.time.OffsetDateTime;
4+
import java.util.Optional;
5+
6+
import org.springframework.lang.Nullable;
7+
import org.springframework.stereotype.Component;
8+
import org.springframework.transaction.annotation.Transactional;
9+
import org.sterl.spring.persistent_tasks.trigger.component.RunTriggerComponent.TaskAndState;
10+
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
11+
12+
import lombok.RequiredArgsConstructor;
13+
import lombok.extern.slf4j.Slf4j;
14+
15+
@Component
16+
@Transactional(timeout = 30)
17+
@RequiredArgsConstructor
18+
@Slf4j
19+
public class HandleTriggerExceptionComponent {
20+
21+
private final EditTriggerComponent editTrigger;
22+
23+
Optional<TriggerEntity> execute(TaskAndState taskAndState,
24+
@Nullable Exception e) {
25+
26+
var trigger = taskAndState.trigger;
27+
var task = taskAndState.persistentTask;
28+
var result = editTrigger.completeTaskWithStatus(trigger.getKey(), taskAndState.state, e);
29+
30+
if (task != null
31+
&& task.retryStrategy().shouldRetry(trigger.getData().getExecutionCount(), e)) {
32+
33+
final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e);
34+
35+
result = editTrigger.retryTrigger(trigger.getKey(), retryAt);
36+
if (result.isPresent()) {
37+
var data = result.get().getData();
38+
log.warn("{} failed, retry will be done at={} status={}!",
39+
trigger.getKey(),
40+
data.getRunAt(),
41+
data.getStatus(),
42+
e);
43+
} else {
44+
log.error("Trigger with key={} not found and may be at a wrong state!",
45+
trigger.getKey(), e);
46+
}
47+
} else {
48+
log.error("{} failed, no more retries! {}", trigger.getKey(),
49+
e == null ? "No exception given." : e.getMessage(), e);
50+
51+
editTrigger.deleteTrigger(trigger);
52+
}
53+
return result;
54+
}
55+
}

0 commit comments

Comments
 (0)