Skip to content

Commit 2338211

Browse files
committed
implemented waiting triggers
1 parent 20c8964 commit 2338211

28 files changed

+476
-199
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.springframework.lang.NonNull;
1616
import org.springframework.stereotype.Service;
1717
import org.springframework.transaction.annotation.Transactional;
18-
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
18+
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
1919
import org.sterl.spring.persistent_tasks.api.TriggerKey;
2020
import org.sterl.spring.persistent_tasks.api.TriggerSearch;
2121
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
@@ -86,7 +86,7 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
8686
*/
8787
@Transactional(timeout = 10)
8888
@NonNull
89-
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
89+
public <T extends Serializable> List<TriggerKey> queue(Collection<TriggerRequest<T>> triggers) {
9090
if (triggers == null || triggers.isEmpty()) {
9191
return Collections.emptyList();
9292
}
@@ -105,7 +105,7 @@ public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequ
105105
*/
106106
@Transactional(timeout = 5)
107107
@NonNull
108-
public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
108+
public <T extends Serializable> TriggerKey queue(TriggerRequest<T> trigger) {
109109
return triggerService.queue(trigger).getKey();
110110
}
111111

@@ -115,7 +115,7 @@ public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
115115
* @return the reference to the {@link TriggerKey}
116116
*/
117117
public <T extends Serializable> TriggerKey runOrQueue(
118-
AddTriggerRequest<T> triggerRequest) {
118+
TriggerRequest<T> triggerRequest) {
119119
if (schedulerService.isPresent()) {
120120
schedulerService.get().runOrQueue(triggerRequest);
121121
} else {

core/src/main/java/org/sterl/spring/persistent_tasks/api/TaskId.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public TriggerBuilder<T> newTrigger(T state) {
2020
return new TriggerBuilder<>(this).state(state);
2121
}
2222

23-
public AddTriggerRequest<T> newUniqueTrigger(T state) {
23+
public TriggerRequest<T> newUniqueTrigger(T state) {
2424
return new TriggerBuilder<>(this).state(state).build();
2525
}
2626

@@ -35,19 +35,23 @@ public static class TriggerBuilder<T extends Serializable> {
3535
private String id;
3636
private String correlationId;
3737
private String tag;
38+
private TriggerStatus status = TriggerStatus.WAITING;
3839
private T state;
3940
private OffsetDateTime when = OffsetDateTime.now();
40-
private int priority = AddTriggerRequest.DEFAULT_PRIORITY;
41+
private int priority = TriggerRequest.DEFAULT_PRIORITY;
4142

4243
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name) {
4344
return new TriggerBuilder<>(new TaskId<T>(name));
4445
}
4546
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name, T state) {
4647
return new TriggerBuilder<>(new TaskId<T>(name)).state(state);
4748
}
48-
public AddTriggerRequest<T> build() {
49+
public static <T extends Serializable> TriggerBuilder<T> newTrigger(TriggerKey key, T state) {
50+
return new TriggerBuilder<>(new TaskId<T>(key.getTaskName())).id(key.getId()).state(state);
51+
}
52+
public TriggerRequest<T> build() {
4953
var key = TriggerKey.of(id, taskId);
50-
return new AddTriggerRequest<>(key, state, when, priority, correlationId, tag);
54+
return new TriggerRequest<>(key, status, state, when, priority, correlationId, tag);
5155
}
5256
/**
5357
* The ID of this task, same queued ids are replaced.
@@ -91,6 +95,7 @@ public TriggerBuilder<T> when(OffsetDateTime when) {
9195
}
9296
public TriggerBuilder<T> runAt(OffsetDateTime when) {
9397
this.when = when;
98+
this.status = TriggerStatus.WAITING;
9499
return this;
95100
}
96101
/**
@@ -100,5 +105,14 @@ public TriggerBuilder<T> runAfter(Duration duration) {
100105
runAt(OffsetDateTime.now().plus(duration));
101106
return this;
102107
}
108+
/**
109+
* Creates a trigger which waits for an external signal and
110+
* will run into {@link TriggerStatus#EXPIRED_SIGNAL} if no signal happens.
111+
*/
112+
public TriggerBuilder<T> waitForSignal(OffsetDateTime timeout) {
113+
this.when = timeout;
114+
this.status = TriggerStatus.AWAITING_SIGNAL;
115+
return this;
116+
}
103117
}
104118
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/TriggerKey.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import org.springframework.lang.Nullable;
77

8-
import com.github.f4b6a3.uuid.UuidCreator;
9-
108
import lombok.AllArgsConstructor;
119
import lombok.Builder;
1210
import lombok.Data;
@@ -30,7 +28,7 @@ public class TriggerKey implements Serializable {
3028
private String taskName;
3129

3230
public static TriggerKey of(@Nullable String id, TaskId<? extends Serializable> taskId) {
33-
return new TriggerKey(id == null ? UuidCreator.getTimeOrderedEpochFast().toString() : id, taskId.name());
31+
return new TriggerKey(id, taskId.name());
3432
}
3533

3634
public TaskId<Serializable> toTaskId() {
@@ -41,26 +39,25 @@ public TaskId<Serializable> toTaskId() {
4139
* Builds a trigger for the given persistentTask name
4240
*/
4341
public TriggerKey(String taskName) {
44-
id = UuidCreator.getTimeOrderedEpochFast().toString();
4542
this.taskName = taskName;
4643
}
4744

4845
/**
4946
* Just triggers the given persistentTask to be executed using <code>null</code> as state.
5047
*/
51-
public <T extends Serializable> AddTriggerRequest<T> newTrigger(TaskId<T> taskId) {
48+
public <T extends Serializable> TriggerRequest<T> newTrigger(TaskId<T> taskId) {
5249
return newTrigger(taskId, null);
5350
}
5451

55-
public <T extends Serializable> AddTriggerRequest<T> newTrigger(TaskId<T> taskId, T state) {
56-
return newTrigger(UuidCreator.getTimeOrderedEpochFast().toString(), taskId, state);
52+
public <T extends Serializable> TriggerRequest<T> newTrigger(TaskId<T> taskId, T state) {
53+
return newTrigger(null, taskId, state);
5754
}
5855

59-
public <T extends Serializable> AddTriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state) {
56+
public <T extends Serializable> TriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state) {
6057
return newTrigger(id, taskId, state, OffsetDateTime.now());
6158
}
6259

63-
public <T extends Serializable> AddTriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state, OffsetDateTime when) {
60+
public <T extends Serializable> TriggerRequest<T> newTrigger(String id, TaskId<T> taskId, T state, OffsetDateTime when) {
6461
return taskId.newTrigger() //
6562
.id(id) //
6663
.state(state) //

core/src/main/java/org/sterl/spring/persistent_tasks/api/AddTriggerRequest.java renamed to core/src/main/java/org/sterl/spring/persistent_tasks/api/TriggerRequest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
* For any registered persistentTask a persistentTask trigger represent one unit of work, executing this persistentTask once.
1111
* @param <T> state type which has to be of {@link Serializable}
1212
*/
13-
public record AddTriggerRequest<T extends Serializable>(
13+
public record TriggerRequest<T extends Serializable>(
1414
TriggerKey key,
15+
TriggerStatus status,
1516
T state,
1617
OffsetDateTime runtAt,
1718
int priority,
@@ -23,7 +24,7 @@ public TaskId<T> taskId() {
2324
return (TaskId<T>)key.toTaskId();
2425
}
2526

26-
public Collection<AddTriggerRequest<T>> toList() {
27+
public Collection<TriggerRequest<T>> toList() {
2728
return Collections.singleton(this);
2829
}
2930

core/src/main/java/org/sterl/spring/persistent_tasks/api/TriggerSearch.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,24 @@ public static TriggerSearch byCorrelationId(String correlationId) {
3131
result.setCorrelationId(correlationId);
3232
return result;
3333
}
34+
public static TriggerSearch byStatus(TriggerStatus status) {
35+
var result = new TriggerSearch();
36+
result.setStatus(status);
37+
return result;
38+
}
39+
public static TriggerSearch forTriggerRequest(TriggerRequest<?> trigger) {
40+
var search = new TriggerSearch();
41+
if (trigger.key() != null) {
42+
search.setKeyId(trigger.key().getId());
43+
search.setTaskName(trigger.key().getTaskName());
44+
}
45+
46+
if (trigger.correlationId() != null) search.setCorrelationId(trigger.correlationId());
47+
if (trigger.tag() != null) search.setTag(trigger.tag());
48+
if (trigger.status() != null) search.setStatus(trigger.status());
49+
50+
return search;
51+
}
3452

3553
/** create time ASC */
3654
public static final Sort DEFAULT_SORT = sortByCreatedTime(Direction.ASC);
@@ -45,4 +63,5 @@ public static Pageable applyDefaultSortIfNeeded(Pageable page) {
4563
}
4664
return result;
4765
}
66+
4867
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/TriggerStatus.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import java.util.Set;
55

66
public enum TriggerStatus {
7+
AWAITING_SIGNAL,
78
WAITING,
89
RUNNING,
910
SUCCESS,
1011
FAILED,
11-
CANCELED
12+
CANCELED,
13+
EXPIRED_SIGNAL
1214
;
13-
public static final Set<TriggerStatus> ACTIVE_STATES = EnumSet.of(WAITING, RUNNING);
14-
public static final Set<TriggerStatus> END_STATES = EnumSet.of(SUCCESS, FAILED, CANCELED);
15+
public static final Set<TriggerStatus> ACTIVE_STATES = EnumSet.of(AWAITING_SIGNAL, WAITING, RUNNING);
16+
public static final Set<TriggerStatus> END_STATES = EnumSet.of(SUCCESS, FAILED, CANCELED, EXPIRED_SIGNAL);
1517
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/event/TriggerTaskCommand.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import java.util.Collection;
66
import java.util.Collections;
77

8-
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
8+
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
99
import org.sterl.spring.persistent_tasks.api.TaskId.TriggerBuilder;
1010

1111
/**
1212
* An event to trigger one or multiple persistentTask executions
1313
*/
1414
public record TriggerTaskCommand<T extends Serializable>(
15-
Collection<AddTriggerRequest<T>> triggers) implements PersistentTasksEvent {
15+
Collection<TriggerRequest<T>> triggers) implements PersistentTasksEvent {
1616

1717
public int size() {
1818
return triggers == null ? 0 : triggers.size();
@@ -33,16 +33,16 @@ public static <T extends Serializable> TriggerTaskCommand<T> of(String name, T s
3333
.build()));
3434
}
3535

36-
public static <T extends Serializable> TriggerTaskCommand<T> of(Collection<AddTriggerRequest<T>> triggers) {
36+
public static <T extends Serializable> TriggerTaskCommand<T> of(Collection<TriggerRequest<T>> triggers) {
3737
return new TriggerTaskCommand<>(triggers);
3838
}
3939

40-
public static <T extends Serializable> TriggerTaskCommand<T> of(AddTriggerRequest<T> trigger) {
40+
public static <T extends Serializable> TriggerTaskCommand<T> of(TriggerRequest<T> trigger) {
4141
return new TriggerTaskCommand<>(Collections.singleton(trigger));
4242
}
4343

4444
@SafeVarargs
45-
public static <T extends Serializable> TriggerTaskCommand<T> of(AddTriggerRequest<T>... triggers) {
45+
public static <T extends Serializable> TriggerTaskCommand<T> of(TriggerRequest<T>... triggers) {
4646
return new TriggerTaskCommand<>(Arrays.asList(triggers));
4747
}
4848
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.springframework.transaction.event.TransactionPhase;
1616
import org.springframework.transaction.event.TransactionalEventListener;
1717
import org.springframework.transaction.support.TransactionTemplate;
18-
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
18+
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
1919
import org.sterl.spring.persistent_tasks.api.TriggerKey;
2020
import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent;
2121
import org.sterl.spring.persistent_tasks.scheduler.component.RunOrQueueComponent;
@@ -150,7 +150,7 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
150150
* available it is resolved
151151
*/
152152
@Transactional(timeout = 10)
153-
public <T extends Serializable> TriggerKey runOrQueue(AddTriggerRequest<T> triggerRequest) {
153+
public <T extends Serializable> TriggerKey runOrQueue(TriggerRequest<T> triggerRequest) {
154154
return runOrQueue.execute(triggerRequest);
155155
}
156156

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.concurrent.ConcurrentHashMap;
77
import java.util.concurrent.Future;
88

9-
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
9+
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
1010
import org.sterl.spring.persistent_tasks.api.TriggerKey;
1111
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
1212
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
@@ -34,7 +34,7 @@ public class RunOrQueueComponent {
3434
* @return the reference to the {@link Future} with the key, if no threads are
3535
* available it is resolved
3636
*/
37-
public <T extends Serializable> TriggerKey execute(AddTriggerRequest<T> triggerRequest) {
37+
public <T extends Serializable> TriggerKey execute(TriggerRequest<T> triggerRequest) {
3838
var trigger = triggerService.queue(triggerRequest);
3939

4040
trigger = offerToRun(trigger);

core/src/main/java/org/sterl/spring/persistent_tasks/task/TaskService.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.sterl.spring.persistent_tasks.task;
22

33
import java.io.Serializable;
4+
import java.util.Map;
45
import java.util.Optional;
56
import java.util.Set;
7+
import java.util.concurrent.ConcurrentHashMap;
68
import java.util.function.Consumer;
79

810
import org.springframework.lang.NonNull;
@@ -22,6 +24,7 @@ public class TaskService {
2224

2325
private final TaskTransactionComponent taskTransactionComponent;
2426
private final TaskRepository taskRepository;
27+
private final Map<PersistentTask<? extends Serializable>, Optional<TransactionTemplate>> cache = new ConcurrentHashMap<>();
2528

2629
public Set<TaskId<? extends Serializable>> findAllTaskIds() {
2730
return this.taskRepository.all();
@@ -31,9 +34,14 @@ public <T extends Serializable> Optional<PersistentTask<T>> get(TaskId<T> id) {
3134
return taskRepository.get(id);
3235
}
3336

34-
public <T extends Serializable> Optional<TransactionTemplate> getTransactionTemplate(
37+
/**
38+
* Returns a {@link TransactionTemplate} if the task and the framework may join transaction.
39+
*/
40+
public <T extends Serializable> Optional<TransactionTemplate> getTransactionTemplateIfJoinable(
3541
PersistentTask<T> task) {
36-
return taskTransactionComponent.getTransactionTemplate(task);
42+
43+
return cache.computeIfAbsent(task,
44+
t -> taskTransactionComponent.buildOrGetDefaultTransactionTemplate(t));
3745
}
3846

3947
/**
@@ -78,7 +86,7 @@ public <T extends Serializable> TaskId<T> register(String name, PersistentTask<T
7886
* A way to manually register a PersistentTask, usually not needed as spring beans will be added automatically.
7987
*/
8088
public <T extends Serializable> TaskId<T> register(TaskId<T> id, PersistentTask<T> task) {
81-
taskTransactionComponent.getTransactionTemplate(task);
89+
taskTransactionComponent.buildOrGetDefaultTransactionTemplate(task);
8290
return taskRepository.addTask(id, task);
8391
}
8492
/**

0 commit comments

Comments
 (0)