Skip to content

Commit dda8bfe

Browse files
committed
added correlationId
1 parent d01267e commit dda8bfe

23 files changed

+247
-28
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Serializable;
44
import java.util.ArrayList;
55
import java.util.Collection;
6+
import java.util.Collections;
67
import java.util.List;
78
import java.util.Optional;
89
import java.util.concurrent.ExecutionException;
@@ -17,6 +18,7 @@
1718
import org.sterl.spring.persistent_tasks.api.TriggerKey;
1819
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
1920
import org.sterl.spring.persistent_tasks.history.HistoryService;
21+
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
2022
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
2123
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
2224
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
@@ -37,7 +39,7 @@ public class PersistentTaskService {
3739
private final List<SchedulerService> schedulers;
3840
private final TriggerService triggerService;
3941
private final HistoryService historyService;
40-
42+
4143
/**
4244
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
4345
* Maybe out of the history event from a retry execution of the very same id.
@@ -64,7 +66,7 @@ public Optional<TriggerData> getLastDetailData(TriggerKey key) {
6466

6567
@EventListener
6668
void queue(TriggerTaskCommand<? extends Serializable> event) {
67-
if (event.triggers().size() == 1) {
69+
if (event.size() == 1) {
6870
runOrQueue(event.triggers().iterator().next());
6971
} else {
7072
queue(event.triggers());
@@ -81,6 +83,8 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {
8183
@Transactional(timeout = 10)
8284
@NonNull
8385
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
86+
if (triggers == null || triggers.isEmpty()) return Collections.emptyList();
87+
8488
return triggers.stream() //
8589
.map(t -> triggerService.queue(t)) //
8690
.map(TriggerEntity::getKey) //
@@ -149,4 +153,19 @@ public List<TriggerKey> executeTriggersAndWait() {
149153

150154
return result;
151155
}
156+
157+
public List<TriggerData> findTriggerByCorrelationId(String correlationId) {
158+
var running = triggerService.findTriggerByCorrelationId(correlationId)
159+
.stream().map(TriggerEntity::getData)
160+
.toList();
161+
162+
var done = historyService.findTriggerByCorrelationId(correlationId)
163+
.stream().map(TriggerHistoryLastStateEntity::getData)
164+
.toList();
165+
166+
var result = new ArrayList<TriggerData>(running.size() + done.size());
167+
result.addAll(running);
168+
result.addAll(done);
169+
return result;
170+
}
152171
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/AddTriggerRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.io.Serializable;
44
import java.time.OffsetDateTime;
5+
import java.util.Collection;
6+
import java.util.Collections;
57

68
/**
79
/**
@@ -12,12 +14,17 @@ public record AddTriggerRequest<T extends Serializable>(
1214
TriggerKey key,
1315
T state,
1416
OffsetDateTime runtAt,
15-
int priority) {
17+
int priority,
18+
String correlationId) {
1619

1720
@SuppressWarnings("unchecked")
1821
public TaskId<T> taskId() {
1922
return (TaskId<T>)key.toTaskId();
2023
}
2124

25+
public Collection<AddTriggerRequest<T>> toList() {
26+
return Collections.singleton(this);
27+
}
28+
2229
public static final int DEFAULT_PRIORITY = 4;
2330
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/TaskId.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static TaskId<Serializable> of(String taskId) {
3333
public static class TriggerBuilder<T extends Serializable> {
3434
private final TaskId<T> taskId;
3535
private String id;
36+
private String correlationId;
3637
private T state;
3738
private OffsetDateTime when = OffsetDateTime.now();
3839
private int priority = AddTriggerRequest.DEFAULT_PRIORITY;
@@ -45,16 +46,34 @@ public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name,
4546
}
4647
public AddTriggerRequest<T> build() {
4748
var key = TriggerKey.of(id, taskId);
48-
return new AddTriggerRequest<>(key, state, when, priority);
49+
return new AddTriggerRequest<>(key, state, when, priority, correlationId);
4950
}
51+
/**
52+
* The ID of this task, same queued ids are replaced.
53+
*/
5054
public TriggerBuilder<T> id(String id) {
5155
this.id = id;
5256
return this;
5357
}
58+
/**
59+
* An unique ID which is taken over to a chain/set of tasks.
60+
* If task is triggered it in a task, this ID is taken over.
61+
*/
62+
public TriggerBuilder<T> correlationId(String correlationId) {
63+
this.correlationId = correlationId;
64+
return this;
65+
}
5466
public TriggerBuilder<T> state(T state) {
5567
this.state = state;
5668
return this;
5769
}
70+
/**
71+
* The higher the {@link #priority} the earlier this task is picked.
72+
* Same as JMS priority. Default is also 4, like in JMS.
73+
*
74+
* @param priority custom priority e.g. 0-9, also higher numbers are supported
75+
* @return this {@link TriggerBuilder}
76+
*/
5877
public TriggerBuilder<T> priority(int priority) {
5978
this.priority = priority;
6079
return this;

core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public class Trigger {
1515
/** the business key which is unique it is combination for triggers but not the history! */
1616
private TriggerKey key;
1717

18+
private String correlationId;
19+
1820
private String runningOn;
1921

2022
private OffsetDateTime createdTime = OffsetDateTime.now();

core/src/main/java/org/sterl/spring/persistent_tasks/api/event/TriggerTaskCommand.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
*/
1414
public record TriggerTaskCommand<T extends Serializable>(
1515
Collection<AddTriggerRequest<T>> triggers) implements PersistentTasksEvent {
16+
17+
public int size() {
18+
return triggers == null ? 0 : triggers.size();
19+
}
1620

1721
public static <T extends Serializable> TriggerTaskCommand<T> of(String name, T state) {
1822
return new TriggerTaskCommand<>(Collections.singleton(TriggerBuilder
@@ -21,6 +25,10 @@ public static <T extends Serializable> TriggerTaskCommand<T> of(String name, T s
2125
.build()));
2226
}
2327

28+
public static <T extends Serializable> TriggerTaskCommand<T> of(Collection<AddTriggerRequest<T>> triggers) {
29+
return new TriggerTaskCommand<>(triggers);
30+
}
31+
2432
public static <T extends Serializable> TriggerTaskCommand<T> of(AddTriggerRequest<T> trigger) {
2533
return new TriggerTaskCommand<>(Collections.singleton(trigger));
2634
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/task/ComplexPersistentTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sterl.spring.persistent_tasks.api.task;
22

33
import java.io.Serializable;
4+
import java.util.Collection;
45

56
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
67
import org.sterl.spring.persistent_tasks.api.Trigger;
@@ -19,9 +20,10 @@ public interface ComplexPersistentTask<T extends Serializable, R extends Seriali
1920

2021
/**
2122
* Default execution method of a trigger, which also allows to queue the next trigger as needed.
23+
*
2224
* @param <R> the state type of the next trigger
2325
* @param data the data of the current trigger
2426
* @return optional next trigger to queue, <code>null</code> means done.
2527
*/
26-
AddTriggerRequest<R> accept(RunningTrigger<T> data);
28+
Collection<AddTriggerRequest<R>> accept(RunningTrigger<T> data);
2729
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/task/RunningTrigger.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
@Data
1010
public class RunningTrigger<T extends Serializable> {
1111
private final TriggerKey key;
12+
private final String correlationId;
1213
private final int executionCount;
1314
private final T data;
1415
}

core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,8 @@ private Pageable applyDefaultSortIfNeeded(Pageable page) {
118118
public List<TaskStatusHistoryOverview> taskStatusHistory() {
119119
return triggerHistoryLastStateRepository.listTriggerStatus();
120120
}
121+
122+
public List<TriggerHistoryLastStateEntity> findTriggerByCorrelationId(String correlationId) {
123+
return triggerHistoryLastStateRepository.findByCorrelationId(correlationId);
124+
}
121125
}

core/src/main/java/org/sterl/spring/persistent_tasks/history/model/TriggerHistoryDetailEntity.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
@Index(name = "idx_pt_triggers_history_trigger_id", columnList = "trigger_id"),
3232
@Index(name = "idx_pt_triggers_history_status", columnList = "status"),
3333
@Index(name = "idx_pt_triggers_history_created_time", columnList = "created_time"),
34+
@Index(name = "idx_pt_trigger_history_details_correlation_id", columnList = "correlation_id"),
3435
})
3536
@Data
3637
@NoArgsConstructor

core/src/main/java/org/sterl/spring/persistent_tasks/history/model/TriggerHistoryLastStateEntity.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
@Index(name = "idx_pt_trigger_history_last_states_trigger_id", columnList = "trigger_id"),
2626
@Index(name = "idx_pt_trigger_history_last_states_status", columnList = "status"),
2727
@Index(name = "idx_pt_trigger_history_last_states_created_time", columnList = "created_time"),
28+
@Index(name = "idx_pt_trigger_history_last_states_correlation_id", columnList = "correlation_id"),
2829
})
2930
@Data
3031
@NoArgsConstructor

0 commit comments

Comments
 (0)