Skip to content

Commit f3ff1d7

Browse files
committed
events now in transactions
1 parent 4bd36b3 commit f3ff1d7

30 files changed

+114
-69
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import java.time.OffsetDateTime;
44

5-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
6-
75
import lombok.Data;
86

97
@Data
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.sterl.spring.persistent_tasks.shared.model;
1+
package org.sterl.spring.persistent_tasks.api;
22

33
import java.util.EnumSet;
44
import java.util.Set;

core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
import org.springframework.data.domain.Sort.Direction;
1212
import org.springframework.lang.Nullable;
1313
import org.sterl.spring.persistent_tasks.api.TriggerKey;
14+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
1415
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
1516
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
1617
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository;
1718
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
18-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
1919
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService;
2020
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
2121
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;

core/src/main/java/org/sterl/spring/persistent_tasks/history/component/TriggerHistoryComponent.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
1111
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalCompontant;
1212
import org.sterl.spring.persistent_tasks.trigger.event.TriggerLifeCycleEvent;
13-
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
1413

1514
import lombok.RequiredArgsConstructor;
1615
import lombok.extern.slf4j.Slf4j;
@@ -23,27 +22,25 @@ public class TriggerHistoryComponent {
2322
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
2423
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
2524

26-
public void write(TriggerEntity e) {
25+
@Transactional(timeout = 10)
26+
@EventListener
27+
public void onPersistentTaskEvent(TriggerLifeCycleEvent e) {
28+
log.debug("Received event={} for {} new status={}",
29+
e.getClass().getSimpleName(),
30+
e.key(), e.status());
31+
32+
2733
var state = new TriggerHistoryLastStateEntity();
28-
state.setId(e.getId());
29-
state.setData(e.getData().toBuilder().build());
34+
state.setId(e.id());
35+
state.setData(e.getData().copy());
3036
triggerHistoryLastStateRepository.save(state);
3137

3238
var detail = new TriggerHistoryDetailEntity();
33-
detail.setInstanceId(e.getId());
39+
detail.setInstanceId(e.id());
3440
detail.setData(e.getData().toBuilder()
3541
.state(null)
42+
.createdTime(OffsetDateTime.now())
3643
.build());
37-
detail.getData().setCreatedTime(OffsetDateTime.now());
3844
triggerHistoryDetailRepository.save(detail);
3945
}
40-
41-
@Transactional(timeout = 10)
42-
@EventListener
43-
public void onPersistentTaskEvent(TriggerLifeCycleEvent triggerLifeCycleEvent) {
44-
log.debug("Received event={} for {} new status={}",
45-
triggerLifeCycleEvent.getClass().getSimpleName(),
46-
triggerLifeCycleEvent.key(), triggerLifeCycleEvent.status());
47-
write(triggerLifeCycleEvent.trigger());
48-
}
4946
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import java.time.OffsetDateTime;
55
import java.util.Collections;
66
import java.util.List;
7+
import java.util.Map;
78
import java.util.Optional;
9+
import java.util.concurrent.ConcurrentHashMap;
810
import java.util.concurrent.Future;
911

1012
import org.springframework.lang.NonNull;
@@ -44,6 +46,7 @@ public class SchedulerService {
4446
private final TaskExecutorComponent taskExecutor;
4547
private final EditSchedulerStatusComponent editSchedulerStatus;
4648
private final TransactionTemplate trx;
49+
private final Map<Long, TriggerEntity> shouldRun = new ConcurrentHashMap<>();
4750

4851
@PostConstruct
4952
public void start() {
@@ -136,6 +139,7 @@ public <T extends Serializable> TriggerKey runOrQueue(
136139
if (taskExecutor.getFreeThreads() > 0) {
137140
trigger = triggerService.markTriggersAsRunning(trigger, name);
138141
pingRegistry().addRunning(1);
142+
shouldRun.put(trigger.getId(), trigger);
139143
} else {
140144
log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.",
141145
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
@@ -147,9 +151,10 @@ public <T extends Serializable> TriggerKey runOrQueue(
147151

148152
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
149153
void checkIfTrigerIsRunning(TriggerAddedEvent addedTrigger) {
150-
if (addedTrigger.isRunningOn(name) && !taskExecutor.isRunning(addedTrigger.trigger())) {
154+
final var toRun = shouldRun.remove(addedTrigger.id());
155+
if (toRun != null) {
151156
log.debug("New triger added for imidiate execution {}", addedTrigger.key());
152-
taskExecutor.submit(addedTrigger.trigger());
157+
taskExecutor.submit(toRun);
153158
}
154159
}
155160

core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/HasTriggerData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,18 @@
44
import java.time.OffsetDateTime;
55

66
import org.sterl.spring.persistent_tasks.api.TaskId;
7+
import org.sterl.spring.persistent_tasks.api.TriggerKey;
8+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
79

810
public interface HasTriggerData {
911
TriggerData getData();
1012

13+
default TriggerKey key() {
14+
return getData().getKey();
15+
}
16+
default TriggerStatus status() {
17+
return getData().getStatus();
18+
}
1119
default boolean isRunning() {
1220
return getData().getStatus() == TriggerStatus.RUNNING;
1321
}

core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/TriggerData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.time.OffsetDateTime;
55

66
import org.sterl.spring.persistent_tasks.api.TriggerKey;
7+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
78

89
import jakarta.persistence.AttributeOverride;
910
import jakarta.persistence.AttributeOverrides;
@@ -86,4 +87,8 @@ public void updateRunningDuration() {
8687
private String exceptionName;
8788
@Lob
8889
private String lastException;
90+
91+
public TriggerData copy() {
92+
return this.toBuilder().build();
93+
}
8994
}

core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import org.springframework.data.repository.NoRepositoryBean;
1212
import org.springframework.data.repository.query.Param;
1313
import org.sterl.spring.persistent_tasks.api.TriggerKey;
14+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
1415
import org.sterl.spring.persistent_tasks.shared.model.HasTriggerData;
15-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
1616

1717
@NoRepositoryBean
1818
public interface TriggerDataRepository<T extends HasTriggerData> extends JpaRepository<T, Long> {

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
1515
import org.sterl.spring.persistent_tasks.api.TaskId;
1616
import org.sterl.spring.persistent_tasks.api.TriggerKey;
17-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
17+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
1818
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService;
1919
import org.sterl.spring.persistent_tasks.task.TaskService;
2020
import org.sterl.spring.persistent_tasks.trigger.component.EditTriggerComponent;

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.springframework.transaction.annotation.Transactional;
1313
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
1414
import org.sterl.spring.persistent_tasks.api.TriggerKey;
15+
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
1516
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
16-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
1717
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
1818
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent;
1919
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
@@ -48,11 +48,12 @@ public Optional<TriggerEntity> completeTaskWithStatus(TriggerKey key, Serializab
4848
t.complete(e);
4949

5050
if (t.getData().getStatus() == TriggerStatus.SUCCESS) {
51-
publisher.publishEvent(new TriggerSuccessEvent(t, state));
51+
publisher.publishEvent(new TriggerSuccessEvent(
52+
t.getId(), t.copyData(), state));
5253
log.debug("Setting {} to status={} {}", key, t.getData().getStatus(),
5354
e == null ? "" : "error=" + e.getClass().getSimpleName());
5455
} else {
55-
publisher.publishEvent(new TriggerFailedEvent(t, state, e));
56+
publisher.publishEvent(new TriggerFailedEvent(t.getId(), t.copyData(), state, e));
5657
log.info("Setting {} to status={} {}", key, t.getData().getStatus(),
5758
e == null ? "" : "error=" + e.getClass().getSimpleName());
5859
}
@@ -73,7 +74,8 @@ public Optional<TriggerEntity> cancelTask(TriggerKey id) {
7374
.findByKey(id) //
7475
.map(t -> {
7576
t.cancel();
76-
publisher.publishEvent(new TriggerCanceledEvent(t,
77+
publisher.publishEvent(new TriggerCanceledEvent(
78+
t.getId(), t.copyData(),
7779
stateSerializer.deserializeOrNull(t.getData().getState())));
7880
triggerRepository.delete(t);
7981
return t;
@@ -94,7 +96,8 @@ public <T extends Serializable> TriggerEntity addTrigger(AddTriggerRequest<T> ti
9496
result = triggerRepository.save(result);
9597
log.debug("Added trigger={}", result);
9698
}
97-
publisher.publishEvent(new TriggerAddedEvent(result, tigger.state()));
99+
publisher.publishEvent(new TriggerAddedEvent(
100+
result.getId(), result.copyData(), tigger.state()));
98101
return result;
99102
}
100103

0 commit comments

Comments
 (0)