Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B install --file pom.xml
- name: PMD with Maven
run: mvn pmd:pmd --file pom.xml

Expand Down
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## [v1.0.1]
## v1.1.0 - (2024-12-30)

- Showing trigger history entries
- Added PersistentTaskService as a abstraction
- Added `PersistentTaskService` as a new abstraction
- Added cancel trigger button to the UI
- Retry is now 3 times as in the strategy name
102 changes: 101 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ SpringBeanTask<Vehicle> task1(VehicleRepository vehicleRepository) {

## Queue a task execution

### Direct usage of the TriggerService.
### Direct usage of the `TriggerService` or `PersistentTaskService`.

```java
private final TriggerService triggerService;
private final PersistentTaskService persistentTaskService;

public void buildVehicle() {
// Vehicle has to be Serializable
Expand All @@ -82,6 +83,9 @@ SpringBeanTask<Vehicle> task1(VehicleRepository vehicleRepository) {

// queue it
triggerService.queue(BuildVehicleTask.ID.newUniqueTrigger(v));
// will queue it and run it if possible.
// if the scheduler service is missing it is same as above
persistentTaskService.runOrQueue(BuildVehicleTask.ID.newUniqueTrigger(v));
}
```

Expand Down Expand Up @@ -115,6 +119,102 @@ SpringBeanTask<Vehicle> task1(VehicleRepository vehicleRepository) {
}
```

### Triggers and Tasks in JUnit Tests

The `SchedulerService` can be disabled for unit testing, which ensures that no trigger will be
executed automatically.

```yml
spring:
persistent-tasks:
scheduler-enabled: false
```

Now you can run any trigger manually using the `TriggerService`

```java
@Autowired
private TriggerService triggerService;

@Test
void testRunTriggerDirectly() {
// GIVEN
// setup your test and create any triggers needed

// WHEN run any pending triggers
triggerService.run(triggerService.queue(trigger));

// THEN
// any asserts you might need
}

@Test
void testRunUnknownTriggersCreated() {
// GIVEN
// setup your test call any method which might create triggers

// WHEN run any pending triggers
triggerService.run(triggerService.lockNextTrigger("test"));

// THEN
// any asserts you might need
}
```

It is also possible to define a test scheduler and use the async way to execute any triggers (without the spring scheduler which would trigger them automatically):

```java
@Configuration
public static class TestConfig {

@Primary
@SuppressWarnings("resource")
SchedulerService schedulerService(TriggerService triggerService, EditSchedulerStatusComponent editSchedulerStatus,
TransactionTemplate trx) throws UnknownHostException {

final var taskExecutor = new TaskExecutorComponent(triggerService, 10);
taskExecutor.setMaxShutdownWaitTime(Duration.ofSeconds(0));
return new SchedulerService("testScheduler", triggerService, taskExecutor, editSchedulerStatus, trx);
}
}
```

Now the `PersistentTaskService` has a method to trigger or to trigger and to wait for the result:

```java
@Autowired
private PersistentTaskService persistentTaskService;

@Test
void testFoo() {
// GIVEN
// setup your test and create any triggers needed

// WHEN run any pending triggers
persistentTaskService.executeTriggersAndWait();

// THEN
// any asserts you might need
}
```

During the setup and cleanup it is possible to cancel any pending stuff:

```java
@BeforeEach
public void beforeEach() throws Exception {
triggerService.deleteAll();
historyService.deleteAll();
schedulerA.setMaxThreads(10);
schedulerService.start();
}

@AfterEach
public void afterEach() throws Exception {
schedulerService.stop();
}
```

### Spring configuration options

| Property | Type | Description | Default Value |
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.sterl.spring</groupId>
<artifactId>spring-persistent-tasks-root</artifactId>
<version>1.0.1-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,133 @@
package org.sterl.spring.persistent_tasks;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
import org.sterl.spring.persistent_tasks.history.HistoryService;
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;

/**
* Abstraction to {@link SchedulerService} or {@link TriggerService}
* depends on if the {@link SchedulerService} is available.
*/
@Service
@RequiredArgsConstructor
public class PersistentTaskService {

@Autowired(required = false)
private SchedulerService schedulerService;
@Autowired
private TriggerService triggerService;
private final Optional<SchedulerService> schedulerService;
private final List<SchedulerService> schedulers;
private final TriggerService triggerService;
private final HistoryService historyService;

/**
* Returns the last known {@link TriggerData} to a given key. First running triggers are checked.
* Maybe out of the history event from a retry execution of the very same id.
*
* @param key the {@link TriggerKey} to look for
* @return the {@link TriggerData} to the {@link TriggerKey}
*/
public Optional<TriggerData> getLastTriggerData(TriggerKey key) {
final Optional<TriggerEntity> trigger = triggerService.get(key);
if (trigger.isEmpty()) {
var history = historyService.findLastKnownStatus(key);
if (history.isPresent()) return Optional.ofNullable(history.get().getData());
return Optional.empty();
} else {
return Optional.ofNullable(trigger.get().getData());
}
}

@EventListener
void queue(TriggerTaskCommand<? extends Serializable> event) {
if (event.triggers().size() == 1) {
runOrQueue(event.triggers().iterator().next());
} else {
triggerService.queueAll(event.triggers());
queueAll(event.triggers());
}
}

/**
* Queues the given triggers.
*
* @param <T> the state type
* @param triggers the triggers to add
* @return the {@link TriggerKey}
*/
@Transactional(timeout = 10)
@NonNull
public <T extends Serializable> List<TriggerKey> queueAll(Collection<AddTriggerRequest<T>> triggers) {
return triggers.stream() //
.map(t -> triggerService.queue(t)) //
.map(TriggerEntity::getKey) //
.toList();
}

/**
* Runs the given trigger if a free threads are available
* and the runAt time is not in the future.
* @return the reference to the {@link TriggerKey}
*/
public <T extends Serializable> TriggerKey runOrQueue(
AddTriggerRequest<T> triggerRequest) {
if (schedulerService == null) {
schedulerService.runOrQueue(triggerRequest);
if (schedulerService.isPresent()) {
schedulerService.get().runOrQueue(triggerRequest);
} else {
triggerService.queue(triggerRequest);
}
return triggerRequest.key();
}

/**
* Triggers the execution of all pending triggers.
*
* @return the reference to the {@link TriggerKey} of the running tasks
*/
public List<Future<TriggerKey>> executeTriggers() {
var result = new ArrayList<Future<TriggerKey>>();
for (SchedulerService s : schedulers) {
result.addAll(s.triggerNextTasks());
}
return result;
}

/**
* Triggers the execution of all pending triggers and wait for the result.
*/
@SneakyThrows
public List<TriggerKey> executeTriggersAndWait() {
final var result = new ArrayList<TriggerKey>();

List<Future<TriggerKey>> triggers;
do {
triggers = executeTriggers();
for (Future<TriggerKey> future : triggers) {
try {
result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
final Throwable cause = e.getCause();
throw cause == null ? e : cause;
}
}
} while (!triggers.isEmpty());

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
@FunctionalInterface
public interface RetryStrategy {
RetryStrategy NO_RETRY = (c, e) -> false;
RetryStrategy THREE_RETRIES = (c, e) -> c < 3;
/**
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
*/
RetryStrategy THREE_RETRIES = (c, e) -> c < 4;
/**
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
*/
RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() {
@Override
public boolean shouldRetry(int executionCount, Exception error) {
return executionCount < 3;
return executionCount < 4;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public static class TaskTriggerBuilder<T extends Serializable> {
public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name) {
return new TaskTriggerBuilder<>(new TaskId<T>(name));
}
public static <T extends Serializable> TaskTriggerBuilder<T> newTrigger(String name, T state) {
return new TaskTriggerBuilder<>(new TaskId<T>(name)).state(state);
}
public AddTriggerRequest<T> build() {
var key = TriggerKey.of(id, taskId);
return new AddTriggerRequest<>(key, state, when, priority);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import java.util.Collection;
import java.util.Collections;

import org.sterl.spring.persistent_tasks.api.TaskId.TaskTriggerBuilder;
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
import org.sterl.spring.persistent_tasks.api.TaskId.TaskTriggerBuilder;

/**
* An event to trigger one or multiple task executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.springframework.data.domain.Sort.Direction;
import org.sterl.spring.persistent_tasks.api.TaskId;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository;
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.sterl.spring.persistent_tasks.history.api;

import org.springframework.lang.NonNull;
import org.sterl.spring.persistent_tasks.api.Trigger;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.shared.ExtendetConvert;
import org.sterl.spring.persistent_tasks.shared.converter.ToTrigger;

Expand All @@ -11,8 +12,9 @@ interface HistoryConverter {
enum FromLastTriggerStateEntity implements ExtendetConvert<TriggerHistoryLastStateEntity, Trigger> {
INSTANCE;

@NonNull
@Override
public Trigger convert(TriggerHistoryLastStateEntity source) {
public Trigger convert(@NonNull TriggerHistoryLastStateEntity source) {
var result = ToTrigger.INSTANCE.convert(source);
result.setId(source.getId());
result.setInstanceId(source.getId());
Expand All @@ -23,8 +25,9 @@ public Trigger convert(TriggerHistoryLastStateEntity source) {
enum FromTriggerStateDetailEntity implements ExtendetConvert<TriggerHistoryDetailEntity, Trigger> {
INSTANCE;

@NonNull
@Override
public Trigger convert(TriggerHistoryDetailEntity source) {
public Trigger convert(@NonNull TriggerHistoryDetailEntity source) {
var result = ToTrigger.INSTANCE.convert(source);
result.setId(source.getId());
result.setInstanceId(source.getInstanceId());
Expand Down
Loading
Loading