Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## v1.6
## v1.6.1
- simpler RetryStrategy - as function
- showing last ping
- showing execution time or still running triggers
- saver way to keep track of running triggers

## v1.6.0 - (2025-03-11)

- Running triggers can be canceled now
- Running triggers can be failed now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import java.util.Optional;

import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -122,24 +125,46 @@ public <T extends Serializable> TriggerKey runOrQueue(

/**
* Returns all triggers for a correlationId sorted by the creation time.
* Data is limited to overall 300 elements.
*
* @param correlationId the id to search for
* @return the found {@link TriggerData} sorted by create time ASC
*/
@Transactional(readOnly = true, timeout = 5)
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {

var running = triggerService.findTriggerByCorrelationId(correlationId)
var running = triggerService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(100))
.stream().map(TriggerEntity::getData)
.toList();

var done = historyService.findTriggerByCorrelationId(correlationId)
var done = historyService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(200))
.stream().map(TriggerHistoryLastStateEntity::getData)
.toList();


var result = new ArrayList<TriggerData>(running.size() + done.size());
result.addAll(done);
result.addAll(running);
return result;
}

/**
* Returns the first info to a trigger based on the correlationId.
*
* @param correlationId the id to search for
* @return the found {@link TriggerData}
*/
@Transactional(readOnly = true, timeout = 5)
public Optional<TriggerData> findLastTriggerByCorrelationId(String correlationId) {
final var page = PageRequest.of(0, 1, Sort.by(Direction.DESC, "data.createdTime"));
var result = triggerService.findTriggerByCorrelationId(correlationId, page)
.stream().map(TriggerEntity::getData)
.toList();

if (result.isEmpty()) {
result = historyService.findTriggerByCorrelationId(correlationId, page)
.stream().map(TriggerHistoryLastStateEntity::getData)
.toList();
}
return result.isEmpty() ? Optional.empty() : Optional.of(result.getFirst());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@FunctionalInterface
public interface RetryStrategy {
RetryStrategy NO_RETRY = (c, e) -> false;
RetryStrategy NO_RETRY = (c, e) -> null;
/**
* One initial execution and after that we will try it 3 more times. Overall 4 executions.
*/
Expand All @@ -23,27 +23,26 @@ public interface RetryStrategy {

/**
* Determines whether a retry should be attempted based on the current
* execution count and the provided exception.
* execution count and the provided exception. (optional)
*
* @param executionCount The number of attempts already made.
* @param error The exception that triggered the retry.
* @return {@code true} if the current execution count is less than
* the maximum execution count; {@code false} otherwise.
*/
boolean shouldRetry(int executionCount, @Nullable Exception error);
default boolean shouldRetry(int executionCount, @Nullable Exception error) {
return true;
}

/**
* Calculates the time of the next retry attempt based on the current
* execution count and the provided exception.
*
* @param executionCount The number of attempts already made.
* @param exception The exception that triggered the retry.
* @return The {@link OffsetDateTime} representing the time of the next retry attempt.
* @return {@link OffsetDateTime} of the next execution, <code>null</code> for no retry.
*/
default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) {
return OffsetDateTime.now().plusMinutes(executionCount);
}

OffsetDateTime retryAt(int executionCount, @Nullable Exception exception);

// Default implementations
/**
Expand All @@ -70,11 +69,11 @@ class LinearRetryStrategy implements RetryStrategy {
private final int offset;

@Override
public boolean shouldRetry(int executionCount, Exception error) {
public boolean shouldRetry(int executionCount, @Nullable Exception error) {
return maxExecutionCount > executionCount;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
return OffsetDateTime.now().plus(offset + executionCount, unit);
}
}
Expand Down Expand Up @@ -103,12 +102,14 @@ class MultiplicativeRetryStrategy implements RetryStrategy {
private final int scalingFactor;

@Override
public boolean shouldRetry(int executionCount, Exception error) {
public boolean shouldRetry(int executionCount, @Nullable Exception error) {
return maxExecutionCount > executionCount;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
var next = OffsetDateTime.now();
if (scalingFactor > 0 && executionCount > 0) return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
return next;
}
}

Expand All @@ -133,11 +134,11 @@ class FixedIntervalRetryStrategy implements RetryStrategy {
private final int interval;

@Override
public boolean shouldRetry(int executionCount, Exception error) {
public boolean shouldRetry(int executionCount, @Nullable Exception error) {
return maxExecutionCount > executionCount;
}
@Override
public OffsetDateTime retryAt(int executionCount, Exception error) {
public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
return OffsetDateTime.now().plus(interval, unit);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Trigger {

private OffsetDateTime runAt = OffsetDateTime.now();

private OffsetDateTime lastPing;

private OffsetDateTime start;

private OffsetDateTime end;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;

/**
* The {@link RunningTrigger} state will be provided by this context holder to any thread.
Expand Down Expand Up @@ -38,7 +37,11 @@ public static String getCorrelationId() {
public static String buildOrGetCorrelationId(String newCorrelationId) {
var correlationId = getCorrelationId();
if (correlationId == null) correlationId = newCorrelationId;
if (correlationId == null) correlationId = UUID.randomUUID().toString();
// take over any key from the trigger before ...
if (correlationId == null) {
var c = getContext();
if (c != null) correlationId = c.getKey().getId();
}
return correlationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Optional;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
Expand All @@ -13,14 +14,13 @@
import org.sterl.spring.persistent_tasks.api.TaskStatusHistoryOverview;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity;
import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity;
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository;
import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository;
import org.sterl.spring.persistent_tasks.shared.StringHelper;
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService;
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;

import lombok.RequiredArgsConstructor;

Expand All @@ -29,7 +29,7 @@
public class HistoryService {
private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository;
private final TriggerHistoryDetailRepository triggerHistoryDetailRepository;
private final TriggerService triggerService;
private final ApplicationEventPublisher applicationEventPublisher;

public Optional<TriggerHistoryLastStateEntity> findStatus(long triggerId) {
return triggerHistoryLastStateRepository.findById(triggerId);
Expand Down Expand Up @@ -70,7 +70,7 @@ public Page<TriggerHistoryDetailEntity> findAllDetailsForKey(TriggerKey key, Pag
return triggerHistoryDetailRepository.listKnownStatusFor(key, page);
}

public Optional<TriggerEntity> reQueue(Long id, OffsetDateTime runAt) {
public Optional<TriggerKey> reQueue(Long id, OffsetDateTime runAt) {
final var lastState = triggerHistoryLastStateRepository.findById(id);
if (lastState.isEmpty()) return Optional.empty();

Expand All @@ -81,8 +81,9 @@ public Optional<TriggerEntity> reQueue(Long id, OffsetDateTime runAt) {
.priority(data.getPriority())
.id(data.getKey().getId())
.build();

return Optional.of(triggerService.queue(trigger));

applicationEventPublisher.publishEvent(TriggerTaskCommand.of(trigger));
return Optional.of(trigger.key());
}

public long countTriggers(TriggerKey key) {
Expand Down Expand Up @@ -120,7 +121,7 @@ public List<TaskStatusHistoryOverview> taskStatusHistory() {
return triggerHistoryLastStateRepository.listTriggerStatus();
}

public List<TriggerHistoryLastStateEntity> findTriggerByCorrelationId(String correlationId) {
return triggerHistoryLastStateRepository.findByCorrelationId(correlationId);
public List<TriggerHistoryLastStateEntity> findTriggerByCorrelationId(String correlationId, Pageable page) {
return triggerHistoryLastStateRepository.findByCorrelationId(correlationId, page);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.sterl.spring.persistent_tasks.history.HistoryService;
import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.FromLastTriggerStateEntity;
import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.FromTriggerStateDetailEntity;
import org.sterl.spring.persistent_tasks.trigger.api.TriggerConverter.FromTriggerEntity;

import lombok.RequiredArgsConstructor;

Expand Down Expand Up @@ -55,8 +54,8 @@ public PagedModel<Trigger> list(
}

@PostMapping("history/{id}/re-run")
public ResponseEntity<Trigger> reRunTrigger(@PathVariable(name = "id", required = true) Long id) {
public ResponseEntity<TriggerKey> reRunTrigger(@PathVariable(name = "id", required = true) Long id) {
var newTrigger = historyService.reQueue(id, OffsetDateTime.now());
return ResponseEntity.of(FromTriggerEntity.INSTANCE.convert(newTrigger));
return ResponseEntity.of(newTrigger);
}
}
Loading
Loading