Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,51 @@ public class BuildVehicleTask implements SpringBeanTask<Vehicle> {

private final VehicleRepository vehicleRepository;

@Transactional(timeout = 5)
@Override
public void accept(Vehicle vehicle) {
// do stuff
// save
vehicleRepository.save(vehicle);
}
// OPTIONAL
@Override
public RetryStrategy retryStrategy() {
// run 5 times, multiply the execution count with 4, add the result in HOURS to now.
return new MultiplicativeRetryStrategy(5, ChronoUnit.HOURS, 4)
}
// OPTIONAL
// if the task in accept requires a DB transaction, join them together with the framework
// if true the TransactionTemplate is used. Set here any timeouts.
@Override
public boolean isTransactional() {
return true;
}
}
```

Consider setting a timeout to the `TransactionTemplate`:

```java
@Bean
TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
TransactionTemplate template = new TransactionTemplate(transactionManager);
template.setTimeout(10);
return template;
}
```

### As a closure

Note: this example has no aspects as above the spring _@Transactional_
Simple task will use defaults:

- Not a transactional task, e.g. HTTP calls
- 4 executions, one regular and 3 retries, linear
- using minutes with an offset of 1 which is added to now

```java
@Bean
SpringBeanTask<Vehicle> task1(VehicleRepository vehicleRepository) {
return v -> vehicleRepository.save(v);
SpringBeanTask<Vehicle> task1(VehicleHttpConnector vehicleHttpConnector) {
return v -> vehicleHttpConnector.send(v);
}
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,114 @@
package org.sterl.spring.persistent_tasks.api;

import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;

import org.springframework.lang.Nullable;

import lombok.RequiredArgsConstructor;

@FunctionalInterface
public interface RetryStrategy {
RetryStrategy NO_RETRY = (c, e) -> false;
/**
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
*/
RetryStrategy THREE_RETRIES = (c, e) -> c < 4;
RetryStrategy THREE_RETRIES = new LinearRetryStrategy(4, ChronoUnit.MINUTES, 1);
/**
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
*/
RetryStrategy THREE_RETRIES_IMMEDIATELY = new RetryStrategy() {
RetryStrategy THREE_RETRIES_IMMEDIATELY = new MultiplicativeRetryStrategy(4, ChronoUnit.MILLIS, 0);


/**
* Determines whether a retry should be attempted based on the current
* execution count and the provided exception.
*
* @param executionCount The number of attempts already made.
* @param error The exception that triggered the retry.
* @return {@code true} if the current execution count is less than
* the maximum execution count; {@code false} otherwise.
*/
boolean shouldRetry(int executionCount, @Nullable Exception error);

/**
* Calculates the time of the next retry attempt based on the current
* execution count and the provided exception.
*
* @param executionCount The number of attempts already made.
* @param exception The exception that triggered the retry.
* @return The {@link OffsetDateTime} representing the time of the next retry attempt.
*/
default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) {
return OffsetDateTime.now().plusMinutes(executionCount);
}


// Default implementations
/**
* A retry strategy that determines the next retry time by adding a fixed
* offset and the execution count to the current time in the specified temporal unit.
*
* <p>This strategy can be used to create retry intervals that increase linearly
* with the number of attempts, providing a predictable delay pattern.</p>
*
* <p>Example:
* If {@code offset = 5}, {@code unit = ChronoUnit.SECONDS}, and
* {@code executionCount = 3}, the next retry will be scheduled after
* {@code 5 + 3 = 8 seconds} from the current time.</p>
*
* <p>Note: The retry attempts will stop once the maximum execution count
* ({@code maxExecutionCount}) is reached.</p>
*
* @author Your Name
*/
@RequiredArgsConstructor
class LinearRetryStrategy implements RetryStrategy {
private final int maxExecutionCount;
private final TemporalUnit unit;
private final int offset;

@Override
public boolean shouldRetry(int executionCount, Exception error) {
return executionCount < 4;
return maxExecutionCount > executionCount;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
return OffsetDateTime.now();
return OffsetDateTime.now().plus(offset + executionCount, unit);
}
};
}

/**
* Check if a retry should be done.
* A retry strategy that determines the next retry time by multiplying
* the execution count by a scaling factor and adding the result to the
* current time in the specified temporal unit.
*
* <p>This strategy can be used to create retry intervals that increase
* multiplicatively with the number of attempts, providing a way to progressively
* delay retries.</p>
*
* @param executionCount 0 based counter how often the execution was tried
* @param error the exception, <code>null</code> on a timeout
*/
boolean shouldRetry(int executionCount, @Nullable Exception error);

/**
* By default a linear retry strategy, adding one minute for each failed try.
* <p>Example:
* If {@code scalingFactor = 2}, {@code unit = ChronoUnit.SECONDS}, and
* {@code executionCount = 3}, the next retry will be scheduled after
* {@code 2 * 3 = 6 seconds} from the current time.</p>
*
* @param executionCount 0 based counter how often the execution was tried
* @param error the exception, <code>null</code> on a timeout
* <p>Note: The retry attempts will stop once the maximum execution count
* ({@code maxExecutionCount}) is reached.</p>
*/
default OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
return OffsetDateTime.now().plusMinutes(1 + executionCount);
@RequiredArgsConstructor
class MultiplicativeRetryStrategy implements RetryStrategy {
private final int maxExecutionCount;
private final TemporalUnit unit;
private final int scalingFactor;

@Override
public boolean shouldRetry(int executionCount, Exception error) {
return maxExecutionCount > executionCount;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,19 @@ public interface SpringBeanTask<T extends Serializable> extends Consumer<T> {
default RetryStrategy retryStrategy() {
return RetryStrategy.THREE_RETRIES;
}

/**
* Whether the task is transaction or not. If <code>true</code> the execution
* is wrapped into the default transaction template together with the state update
* and the following events:
* <ol>
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent</li>
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent</li>
* <li>org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent</li>
* </ol>
* @return {@code true} if the task is transactional; {@code false} otherwise.
*/
default boolean isTransactional() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,19 @@ public List<Future<TriggerKey>> triggerNextTasks() {
*/
@NonNull
public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
List<TriggerEntity> triggers;
if (taskExecutor.getFreeThreads() > 0) {
triggers = triggerService.lockNextTrigger(
name, taskExecutor.getFreeThreads(), timeDue);
final var result = trx.execute(t -> {
var triggers = triggerService.lockNextTrigger(name,
taskExecutor.getFreeThreads(), timeDue);
pingRegistry().addRunning(triggers.size());
return triggers;
});

return taskExecutor.submit(result);
} else {
triggers = Collections.emptyList();
pingRegistry();
return Collections.emptyList();
}
var result = taskExecutor.submit(triggers);
pingRegistry();
return result;
}

/**
Expand All @@ -122,23 +125,25 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
*/
public <T extends Serializable> Future<TriggerKey> runOrQueue(
AddTriggerRequest<T> triggerRequest) {
var runningTrigger = trx.execute(t -> {
final var runningTrigger = trx.execute(t -> {
var trigger = triggerService.queue(triggerRequest);
// exit now if this trigger is for the future ...
if (trigger.shouldRunInFuture()) return trigger;

if (taskExecutor.getFreeThreads() > 0) {
trigger = triggerService.markTriggersAsRunning(trigger, name);
pingRegistry().addRunning(1);
} else {
log.debug("Currently not enough free thread available {} of {} in use. Task {} queued.",
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
}
return trigger;
});
Future<TriggerKey> result = CompletableFuture.completedFuture(runningTrigger.getKey());
Future<TriggerKey> result;
if (runningTrigger.isRunning()) {
result = taskExecutor.submit(runningTrigger);
pingRegistry();
} else {
result = CompletableFuture.completedFuture(runningTrigger.getKey());
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public TaskExecutorComponent(TriggerService triggerService, int maxThreads) {

@NonNull
public List<Future<TriggerKey>> submit(List<TriggerEntity> trigger) {
if (trigger == null || trigger.isEmpty()) return Collections.emptyList();

final List<Future<TriggerKey>> result = new ArrayList<>(trigger.size());
for (TriggerEntity triggerEntity : trigger) {
result.add(submit(triggerEntity));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public SchedulerEntity(String name) {
super();
this.id = name;
}

public SchedulerEntity addRunning(int count) {
this.runnungTasks += count;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public void accept(T state) {
public RetryStrategy retryStrategy() {
return this.fun.retryStrategy();
}
@Override
public boolean isTransactional() {
return this.fun.isTransactional();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent;
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCompleteEvent;
import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent;
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
import org.sterl.spring.persistent_tasks.trigger.repository.TriggerRepository;
Expand Down Expand Up @@ -55,8 +55,8 @@ public Optional<TriggerEntity> completeTaskWithStatus(TriggerKey key, Exception
result.ifPresent(t -> {
t.complete(e);

if (t.getData().getStatus() != TriggerStatus.FAILED) {
publisher.publishEvent(new TriggerCompleteEvent(t));
if (t.getData().getStatus() == TriggerStatus.SUCCESS) {
publisher.publishEvent(new TriggerSuccessEvent(t));
log.debug("Setting {} to status={} {}", key, t.getData().getStatus(),
e == null ? "" : "error=" + e.getClass().getSimpleName());
} else {
Expand Down
Loading
Loading