Skip to content
Merged

V1.6 #10

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## v1.6

- Running triggers can be canceled now
- Running triggers can be failed now
- https://github.com/sterlp/spring-persistent-tasks/wiki/Cancel-a-task-trigger
- Triggers have now correlationId to collect them
- Added Re-Queue / Re-Run trigger to history page
- Correlation Id is shown in the UI
- ID search includes also Correlation Id
- Moved helper classes to own test jar

## v1.5.6 - (2025-03-06)

- Better ID search
Expand Down
4 changes: 2 additions & 2 deletions RUN_AND_BUILD.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mvn versions:display-dependency-updates
mvn versions:set -DnewVersion=1.5.5 -DgenerateBackupPoms=false
mvn versions:set -DnewVersion=1.6.0 -DgenerateBackupPoms=false
git tag -a v1.5.3 -m "v1.5.3 release"
mvn versions:set -DnewVersion=1.5.4-SNAPSHOT -DgenerateBackupPoms=false
mvn versions:set -DnewVersion=1.6.0-SNAPSHOT -DgenerateBackupPoms=false

## postgres

Expand Down
17 changes: 11 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.sterl.spring</groupId>
<artifactId>spring-persistent-tasks-root</artifactId>
<version>1.5.7-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -33,6 +33,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down Expand Up @@ -73,17 +77,18 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>uk.co.jemos.podam</groupId>
<artifactId>podam</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sterl.test</groupId>
<artifactId>hibernate-asserts</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.context.event.EventListener;
import org.springframework.data.domain.Pageable;
Expand All @@ -17,13 +16,13 @@
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
import org.sterl.spring.persistent_tasks.history.HistoryService;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

/**
* Abstraction to {@link SchedulerService} or {@link TriggerService}
Expand All @@ -34,37 +33,40 @@
public class PersistentTaskService {

private final Optional<SchedulerService> schedulerService;
private final List<SchedulerService> schedulers;
private final TriggerService triggerService;
private final HistoryService historyService;

/**
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
* Maybe out of the history event from a retry execution of the very same id.
*
*
* @param key the {@link TriggerKey} to look for
* @return the {@link TriggerData} to the {@link TriggerKey}
*/
public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
final Optional<TriggerEntity> trigger = triggerService.get(key);
if (trigger.isEmpty()) {
var history = historyService.findLastKnownStatus(key);
if (history.isPresent()) return Optional.ofNullable(history.get().getData());
if (history.isPresent()) {
return Optional.ofNullable(history.get().getData());
}
return Optional.empty();
} else {
return Optional.ofNullable(trigger.get().getData());
}
}

public Optional<TriggerData> getLastDetailData(TriggerKey key) {
var data = historyService.findAllDetailsForKey(key, Pageable.ofSize(1));
if (data.isEmpty()) return Optional.empty();
if (data.isEmpty()) {
return Optional.empty();
}
return Optional.of(data.getContent().get(0).getData());
}

@EventListener
void queue(TriggerTaskCommand<? extends Serializable> event) {
if (event.triggers().size() == 1) {
if (event.size() == 1) {
runOrQueue(event.triggers().iterator().next());
} else {
queue(event.triggers());
Expand All @@ -73,22 +75,26 @@ void queue(TriggerTaskCommand<? extends Serializable> event) {

/**
* Queues/updates the given triggers, if the {@link TriggerKey} is already present
*
*
* @param <T> the state type
* @param triggers the triggers to add
* @return the {@link TriggerKey}
*/
@Transactional(timeout = 10)
@NonNull
public <T extends Serializable> List<TriggerKey> queue(Collection<AddTriggerRequest<T>> triggers) {
if (triggers == null || triggers.isEmpty()) {
return Collections.emptyList();
}

return triggers.stream() //
.map(t -> triggerService.queue(t)) //
.map(TriggerEntity::getKey) //
.toList();
}
/**
* Queues/updates the given trigger, if the {@link TriggerKey} is already present.
*
*
* @param <T> the state type
* @param trigger the trigger to add
* @return the {@link TriggerKey}
Expand All @@ -102,7 +108,7 @@ public <T extends Serializable> TriggerKey queue(AddTriggerRequest<T> trigger) {
/**
* Runs the given trigger if a free threads are available
* and the runAt time is not in the future.
* @return the reference to the {@link TriggerKey}
* @return the reference to the {@link TriggerKey}
*/
public <T extends Serializable> TriggerKey runOrQueue(
AddTriggerRequest<T> triggerRequest) {
Expand All @@ -113,40 +119,27 @@ public <T extends Serializable> TriggerKey runOrQueue(
}
return triggerRequest.key();
}

/**
* Triggers the execution of all pending triggers.
*
* @return the reference to the {@link TriggerKey} of the running tasks
*/
public List<Future<TriggerKey>> executeTriggers() {
var result = new ArrayList<Future<TriggerKey>>();
for (SchedulerService s : schedulers) {
result.addAll(s.triggerNextTasks());
}
return result;
}


/**
* Triggers the execution of all pending triggers and wait for the result.
* Returns all triggers for a correlationId sorted by the creation time.
* @param correlationId the id to search for
* @return the found {@link TriggerData} sorted by create time ASC
*/
@SneakyThrows
public List<TriggerKey> executeTriggersAndWait() {
final var result = new ArrayList<TriggerKey>();

List<Future<TriggerKey>> triggers;
do {
triggers = executeTriggers();
for (Future<TriggerKey> future : triggers) {
try {
result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
final Throwable cause = e.getCause();
throw cause == null ? e : cause;
}
}
} while (!triggers.isEmpty());
@Transactional(readOnly = true, timeout = 5)
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {

var running = triggerService.findTriggerByCorrelationId(correlationId)
.stream().map(TriggerEntity::getData)
.toList();

var done = historyService.findTriggerByCorrelationId(correlationId)
.stream().map(TriggerHistoryLastStateEntity::getData)
.toList();


var result = new ArrayList<TriggerData>(running.size() + done.size());
result.addAll(done);
result.addAll(running);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.io.Serializable;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;

/**
/**
Expand All @@ -12,12 +14,17 @@ public record AddTriggerRequest<T extends Serializable>(
TriggerKey key,
T state,
OffsetDateTime runtAt,
int priority) {
int priority,
String correlationId) {

@SuppressWarnings("unchecked")
public TaskId<T> taskId() {
return (TaskId<T>)key.toTaskId();
}

public Collection<AddTriggerRequest<T>> toList() {
return Collections.singleton(this);
}

public static final int DEFAULT_PRIORITY = 4;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
*/
public record TaskId<T extends Serializable>(String name) implements Serializable {

public TaskTriggerBuilder<T> newTrigger() {
return new TaskTriggerBuilder<>(this);
public TriggerBuilder<T> newTrigger() {
return new TriggerBuilder<>(this);
}

public TaskTriggerBuilder<T> newTrigger(T state) {
return new TaskTriggerBuilder<>(this).state(state);
public TriggerBuilder<T> newTrigger(T state) {
return new TriggerBuilder<>(this).state(state);
}

public AddTriggerRequest<T> newUniqueTrigger(T state) {
return new TaskTriggerBuilder<>(this).state(state).build();
return new TriggerBuilder<>(this).state(state).build();
}

public static TaskId<Serializable> of(String taskId) {
Expand All @@ -30,46 +30,65 @@ public static TaskId<Serializable> of(String taskId) {
}

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public static class TaskTriggerBuilder<T extends Serializable> {
public static class TriggerBuilder<T extends Serializable> {
private final TaskId<T> taskId;
private String id;
private String correlationId;
private T state;
private OffsetDateTime when = OffsetDateTime.now();
private int priority = AddTriggerRequest.DEFAULT_PRIORITY;

public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name) {
return new TaskTriggerBuilder<>(new TaskId<T>(name));
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name) {
return new TriggerBuilder<>(new TaskId<T>(name));
}
public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name, T state) {
return new TaskTriggerBuilder<>(new TaskId<T>(name)).state(state);
public static <T extends Serializable> TriggerBuilder<T> newTrigger(String name, T state) {
return new TriggerBuilder<>(new TaskId<T>(name)).state(state);
}
public AddTriggerRequest<T> build() {
var key = TriggerKey.of(id, taskId);
return new AddTriggerRequest<>(key, state, when, priority);
return new AddTriggerRequest<>(key, state, when, priority, correlationId);
}
public TaskTriggerBuilder<T> id(String id) {
/**
* The ID of this task, same queued ids are replaced.
*/
public TriggerBuilder<T> id(String id) {
this.id = id;
return this;
}
public TaskTriggerBuilder<T> state(T state) {
/**
* An unique ID which is taken over to a chain/set of tasks.
* If task is triggered it in a task, this ID is taken over.
*/
public TriggerBuilder<T> correlationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
public TriggerBuilder<T> state(T state) {
this.state = state;
return this;
}
public TaskTriggerBuilder<T> priority(int priority) {
/**
* The higher the {@link #priority} the earlier this task is picked.
* Same as JMS priority. Default is also 4, like in JMS.
*
* @param priority custom priority e.g. 0-9, also higher numbers are supported
* @return this {@link TriggerBuilder}
*/
public TriggerBuilder<T> priority(int priority) {
this.priority = priority;
return this;
}
/**
* synonym for {@link #runAt(OffsetDateTime)}
*/
public TaskTriggerBuilder<T> when(OffsetDateTime when) {
public TriggerBuilder<T> when(OffsetDateTime when) {
return runAt(when);
}
public TaskTriggerBuilder<T> runAt(OffsetDateTime when) {
public TriggerBuilder<T> runAt(OffsetDateTime when) {
this.when = when;
return this;
}
public TaskTriggerBuilder<T> runAfter(Duration duration) {
public TriggerBuilder<T> runAfter(Duration duration) {
runAt(OffsetDateTime.now().plus(duration));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class Trigger {
/** the business key which is unique it is combination for triggers but not the history! */
private TriggerKey key;

private String correlationId;

private String runningOn;

private OffsetDateTime createdTime = OffsetDateTime.now();
Expand Down
Loading