diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fcf72858..c56406989 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ # Changelog -## v1.6 +## v1.6.1 +- simpler RetryStrategy - as function +- showing last ping +- showing execution time or still running triggers +- saver way to keep track of running triggers + +## v1.6.0 - (2025-03-11) - Running triggers can be canceled now - Running triggers can be failed now diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java index bec72871a..5382048e7 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java @@ -8,7 +8,10 @@ import java.util.Optional; import org.springframework.context.event.EventListener; +import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.data.domain.Sort.Direction; import org.springframework.lang.NonNull; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -122,24 +125,46 @@ public TriggerKey runOrQueue( /** * Returns all triggers for a correlationId sorted by the creation time. + * Data is limited to overall 300 elements. + * * @param correlationId the id to search for * @return the found {@link TriggerData} sorted by create time ASC */ @Transactional(readOnly = true, timeout = 5) public List findAllTriggerByCorrelationId(String correlationId) { - var running = triggerService.findTriggerByCorrelationId(correlationId) + var running = triggerService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(100)) .stream().map(TriggerEntity::getData) .toList(); - var done = historyService.findTriggerByCorrelationId(correlationId) + var done = historyService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(200)) .stream().map(TriggerHistoryLastStateEntity::getData) .toList(); - var result = new ArrayList(running.size() + done.size()); result.addAll(done); result.addAll(running); return result; } + + /** + * Returns the first info to a trigger based on the correlationId. + * + * @param correlationId the id to search for + * @return the found {@link TriggerData} + */ + @Transactional(readOnly = true, timeout = 5) + public Optional findLastTriggerByCorrelationId(String correlationId) { + final var page = PageRequest.of(0, 1, Sort.by(Direction.DESC, "data.createdTime")); + var result = triggerService.findTriggerByCorrelationId(correlationId, page) + .stream().map(TriggerEntity::getData) + .toList(); + + if (result.isEmpty()) { + result = historyService.findTriggerByCorrelationId(correlationId, page) + .stream().map(TriggerHistoryLastStateEntity::getData) + .toList(); + } + return result.isEmpty() ? Optional.empty() : Optional.of(result.getFirst()); + } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java index c3968d1b9..6d4cb5aa3 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java @@ -10,7 +10,7 @@ @FunctionalInterface public interface RetryStrategy { - RetryStrategy NO_RETRY = (c, e) -> false; + RetryStrategy NO_RETRY = (c, e) -> null; /** * One initial execution and after that we will try it 3 more times. Overall 4 executions. */ @@ -23,14 +23,16 @@ public interface RetryStrategy { /** * Determines whether a retry should be attempted based on the current - * execution count and the provided exception. + * execution count and the provided exception. (optional) * * @param executionCount The number of attempts already made. * @param error The exception that triggered the retry. * @return {@code true} if the current execution count is less than * the maximum execution count; {@code false} otherwise. */ - boolean shouldRetry(int executionCount, @Nullable Exception error); + default boolean shouldRetry(int executionCount, @Nullable Exception error) { + return true; + } /** * Calculates the time of the next retry attempt based on the current @@ -38,12 +40,9 @@ public interface RetryStrategy { * * @param executionCount The number of attempts already made. * @param exception The exception that triggered the retry. - * @return The {@link OffsetDateTime} representing the time of the next retry attempt. + * @return {@link OffsetDateTime} of the next execution, null for no retry. */ - default OffsetDateTime retryAt(int executionCount, @Nullable Exception exception) { - return OffsetDateTime.now().plusMinutes(executionCount); - } - + OffsetDateTime retryAt(int executionCount, @Nullable Exception exception); // Default implementations /** @@ -70,11 +69,11 @@ class LinearRetryStrategy implements RetryStrategy { private final int offset; @Override - public boolean shouldRetry(int executionCount, Exception error) { + public boolean shouldRetry(int executionCount, @Nullable Exception error) { return maxExecutionCount > executionCount; } @Override - public OffsetDateTime retryAt(int executionCount, Exception error) { + public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) { return OffsetDateTime.now().plus(offset + executionCount, unit); } } @@ -103,12 +102,14 @@ class MultiplicativeRetryStrategy implements RetryStrategy { private final int scalingFactor; @Override - public boolean shouldRetry(int executionCount, Exception error) { + public boolean shouldRetry(int executionCount, @Nullable Exception error) { return maxExecutionCount > executionCount; } @Override - public OffsetDateTime retryAt(int executionCount, Exception error) { - return OffsetDateTime.now().plus(scalingFactor * executionCount, unit); + public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) { + var next = OffsetDateTime.now(); + if (scalingFactor > 0 && executionCount > 0) return OffsetDateTime.now().plus(scalingFactor * executionCount, unit); + return next; } } @@ -133,11 +134,11 @@ class FixedIntervalRetryStrategy implements RetryStrategy { private final int interval; @Override - public boolean shouldRetry(int executionCount, Exception error) { + public boolean shouldRetry(int executionCount, @Nullable Exception error) { return maxExecutionCount > executionCount; } @Override - public OffsetDateTime retryAt(int executionCount, Exception error) { + public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) { return OffsetDateTime.now().plus(interval, unit); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java index 9dbb32657..e235762e7 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/Trigger.java @@ -23,6 +23,8 @@ public class Trigger { private OffsetDateTime runAt = OffsetDateTime.now(); + private OffsetDateTime lastPing; + private OffsetDateTime start; private OffsetDateTime end; diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/api/task/RunningTriggerContextHolder.java b/core/src/main/java/org/sterl/spring/persistent_tasks/api/task/RunningTriggerContextHolder.java index 1496d007f..da69dfdd9 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/api/task/RunningTriggerContextHolder.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/api/task/RunningTriggerContextHolder.java @@ -2,7 +2,6 @@ import java.io.Serializable; import java.util.Objects; -import java.util.UUID; /** * The {@link RunningTrigger} state will be provided by this context holder to any thread. @@ -38,7 +37,11 @@ public static String getCorrelationId() { public static String buildOrGetCorrelationId(String newCorrelationId) { var correlationId = getCorrelationId(); if (correlationId == null) correlationId = newCorrelationId; - if (correlationId == null) correlationId = UUID.randomUUID().toString(); + // take over any key from the trigger before ... + if (correlationId == null) { + var c = getContext(); + if (c != null) correlationId = c.getKey().getId(); + } return correlationId; } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java index 55d8c46ac..f2863a794 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Optional; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; @@ -13,14 +14,13 @@ import org.sterl.spring.persistent_tasks.api.TaskStatusHistoryOverview; import org.sterl.spring.persistent_tasks.api.TriggerKey; import org.sterl.spring.persistent_tasks.api.TriggerStatus; +import org.sterl.spring.persistent_tasks.api.event.TriggerTaskCommand; import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryDetailEntity; import org.sterl.spring.persistent_tasks.history.model.TriggerHistoryLastStateEntity; import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryDetailRepository; import org.sterl.spring.persistent_tasks.history.repository.TriggerHistoryLastStateRepository; import org.sterl.spring.persistent_tasks.shared.StringHelper; import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService; -import org.sterl.spring.persistent_tasks.trigger.TriggerService; -import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; import lombok.RequiredArgsConstructor; @@ -29,7 +29,7 @@ public class HistoryService { private final TriggerHistoryLastStateRepository triggerHistoryLastStateRepository; private final TriggerHistoryDetailRepository triggerHistoryDetailRepository; - private final TriggerService triggerService; + private final ApplicationEventPublisher applicationEventPublisher; public Optional findStatus(long triggerId) { return triggerHistoryLastStateRepository.findById(triggerId); @@ -70,7 +70,7 @@ public Page findAllDetailsForKey(TriggerKey key, Pag return triggerHistoryDetailRepository.listKnownStatusFor(key, page); } - public Optional reQueue(Long id, OffsetDateTime runAt) { + public Optional reQueue(Long id, OffsetDateTime runAt) { final var lastState = triggerHistoryLastStateRepository.findById(id); if (lastState.isEmpty()) return Optional.empty(); @@ -81,8 +81,9 @@ public Optional reQueue(Long id, OffsetDateTime runAt) { .priority(data.getPriority()) .id(data.getKey().getId()) .build(); - - return Optional.of(triggerService.queue(trigger)); + + applicationEventPublisher.publishEvent(TriggerTaskCommand.of(trigger)); + return Optional.of(trigger.key()); } public long countTriggers(TriggerKey key) { @@ -120,7 +121,7 @@ public List taskStatusHistory() { return triggerHistoryLastStateRepository.listTriggerStatus(); } - public List findTriggerByCorrelationId(String correlationId) { - return triggerHistoryLastStateRepository.findByCorrelationId(correlationId); + public List findTriggerByCorrelationId(String correlationId, Pageable page) { + return triggerHistoryLastStateRepository.findByCorrelationId(correlationId, page); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/history/api/TriggerHistoryResource.java b/core/src/main/java/org/sterl/spring/persistent_tasks/history/api/TriggerHistoryResource.java index 66f260b7d..551b7c05a 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/history/api/TriggerHistoryResource.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/history/api/TriggerHistoryResource.java @@ -21,7 +21,6 @@ import org.sterl.spring.persistent_tasks.history.HistoryService; import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.FromLastTriggerStateEntity; import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.FromTriggerStateDetailEntity; -import org.sterl.spring.persistent_tasks.trigger.api.TriggerConverter.FromTriggerEntity; import lombok.RequiredArgsConstructor; @@ -55,8 +54,8 @@ public PagedModel list( } @PostMapping("history/{id}/re-run") - public ResponseEntity reRunTrigger(@PathVariable(name = "id", required = true) Long id) { + public ResponseEntity reRunTrigger(@PathVariable(name = "id", required = true) Long id) { var newTrigger = historyService.reQueue(id, OffsetDateTime.now()); - return ResponseEntity.of(FromTriggerEntity.INSTANCE.convert(newTrigger)); + return ResponseEntity.of(newTrigger); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java index 50bbce3ae..7c6c6c43c 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java @@ -2,13 +2,13 @@ import java.io.Serializable; import java.time.OffsetDateTime; +import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import org.slf4j.event.Level; import org.springframework.lang.NonNull; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -18,6 +18,7 @@ import org.sterl.spring.persistent_tasks.api.AddTriggerRequest; import org.sterl.spring.persistent_tasks.api.TriggerKey; import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent; +import org.sterl.spring.persistent_tasks.scheduler.component.RunOrQueueComponent; import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent; import org.sterl.spring.persistent_tasks.scheduler.entity.SchedulerEntity; import org.sterl.spring.persistent_tasks.trigger.TriggerService; @@ -43,16 +44,18 @@ public class SchedulerService { @Getter private final String name; private final TriggerService triggerService; + private final TaskExecutorComponent taskExecutor; private final EditSchedulerStatusComponent editSchedulerStatus; + + private final RunOrQueueComponent runOrQueue; + private final TransactionTemplate trx; - private final Map shouldRun = new ConcurrentHashMap<>(); @PostConstruct public void start() { taskExecutor.start(); - final var s = editSchedulerStatus.checkinToRegistry(name); - log.info("Started {} on {} threads.", s, taskExecutor.getMaxThreads()); + editSchedulerStatus.checkinToRegistry(name, 0, taskExecutor.getMaxThreads()); } public void setMaxThreads(int value) { @@ -63,7 +66,6 @@ public void setMaxThreads(int value) { public void stop() { taskExecutor.close(); editSchedulerStatus.offline(name); - log.info("Stopped {}", name); } public void shutdownNow() { @@ -73,22 +75,15 @@ public void shutdownNow() { editSchedulerStatus.offline(name); } - @Transactional - public SchedulerEntity pingRegistry() { - var result = editSchedulerStatus.checkinToRegistry(name); - result.setRunnungTasks(taskExecutor.getRunningTasks()); - result.setTasksSlotCount(taskExecutor.getMaxThreads()); - log.debug("Ping {}", result); - return result; - } - public SchedulerEntity getScheduler() { var result = editSchedulerStatus.get(name); return result; } public Optional findStatus(String name) { - return editSchedulerStatus.find(name); + if (name == null) return Optional.empty(); + else if (name.equals(this.name)) return Optional.of(getScheduler()); + else return editSchedulerStatus.find(name); } /** @@ -112,17 +107,18 @@ public List> triggerNextTasks(OffsetDateTime timeDue) { if (taskExecutor.getFreeThreads() > 0) { final var result = trx.execute(t -> { var triggers = triggerService.lockNextTrigger(name, taskExecutor.getFreeThreads(), timeDue); - pingRegistry().addRunning(triggers.size()); + editSchedulerStatus.checkinToRegistry(name, + taskExecutor.countRunning() + triggers.size(), taskExecutor.getMaxThreads()); return triggers; }); return taskExecutor.submit(result); } else { - log.debug("No free threads {}/{} right now to run jobs due for: {}", + log.info("No free threads {}/{} right now to run jobs due for: {}", taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), timeDue); - pingRegistry(); + editSchedulerStatus.checkinToRegistry(name, taskExecutor.countRunning(), taskExecutor.getMaxThreads()); return Collections.emptyList(); } } @@ -136,31 +132,14 @@ public List> triggerNextTasks(OffsetDateTime timeDue) { */ @Transactional(timeout = 10) public TriggerKey runOrQueue(AddTriggerRequest triggerRequest) { - var trigger = triggerService.queue(triggerRequest); - - if (!trigger.shouldRunInFuture()) { - if (taskExecutor.getFreeThreads() > 0) { - trigger = triggerService.markTriggersAsRunning(trigger, name); - pingRegistry().addRunning(1); - shouldRun.put(trigger.getId(), trigger); - log.debug("{} added for immediate execution, waitng for commit on={}", trigger.getKey(), name); - } else { - log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.", - taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey()); - } - } - // we will listen for the commit event to execute this trigger ... - return trigger.getKey(); + return runOrQueue.execute(triggerRequest); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) - void checkIfTrigerIsRunning(TriggerAddedEvent addedTrigger) { - final var toRun = shouldRun.remove(addedTrigger.id()); - if (toRun != null) { - taskExecutor.submit(toRun); - log.debug("{} immediately started on={}.", addedTrigger.key(), name); + public void checkIfTrigerShouldRun(TriggerAddedEvent addedTrigger) { + if (runOrQueue.checkIfTrigerShouldRun(addedTrigger.id())) { + editSchedulerStatus.checkinToRegistry(name, taskExecutor.countRunning(), taskExecutor.getMaxThreads()); } - // TODO implement a cleanup for old pending triggers which may never been triggered! } public SchedulerEntity getStatus() { @@ -175,7 +154,7 @@ public List rescheduleAbandonedTasks(OffsetDateTime timeout) { .toList(); int running = triggerService.markTriggersAsRunning(runningKeys, name); - log.debug("({}) - {} trigger(s) are running on {} schedulers", + log.atLevel(running > 0 ? Level.INFO : Level.DEBUG).log("({}) - {} trigger(s) are running on {} schedulers", running, runningKeys, schedulers); return triggerService.rescheduleAbandonedTasks(timeout); } @@ -183,8 +162,11 @@ public List rescheduleAbandonedTasks(OffsetDateTime timeout) { public List listAll() { return editSchedulerStatus.listAll(); } - + public Collection> getRunning() { + return taskExecutor.getRunningTasks(); + } public boolean hasRunningTriggers() { - return !this.taskExecutor.getRunningTriggers().isEmpty(); + return !taskExecutor.isStopped() + && (taskExecutor.countRunning() > 0 || runOrQueue.hasWaitingTriggers()); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponent.java index cc41ebc99..d56e52298 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponent.java @@ -14,23 +14,28 @@ import org.sterl.spring.persistent_tasks.scheduler.repository.TaskSchedulerRepository; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; @Component @Transactional(timeout = 10) @RequiredArgsConstructor +@Slf4j public class EditSchedulerStatusComponent { private final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); private final MemoryMXBean memory = ManagementFactory.getMemoryMXBean(); private final TaskSchedulerRepository schedulerRepository; - public SchedulerEntity checkinToRegistry(String name) { + public SchedulerEntity checkinToRegistry(String name, int runningTasks, int maxTasks) { var result = get(name); result.setSystemLoadAverage(os.getSystemLoadAverage()); result.setMaxHeap(memory.getHeapMemoryUsage().getMax()); result.setUsedHeap(memory.getHeapMemoryUsage().getUsed()); + result.setRunningTasks(runningTasks); + result.setTasksSlotCount(maxTasks); result.setLastPing(OffsetDateTime.now()); + log.debug("Ping {}", result); return schedulerRepository.save(result); } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java new file mode 100644 index 000000000..14f4694e2 --- /dev/null +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java @@ -0,0 +1,87 @@ +package org.sterl.spring.persistent_tasks.scheduler.component; + +import java.io.Serializable; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; + +import org.sterl.spring.persistent_tasks.api.AddTriggerRequest; +import org.sterl.spring.persistent_tasks.api.TriggerKey; +import org.sterl.spring.persistent_tasks.trigger.TriggerService; +import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + *

+ * Not a spring bean! + *

+ */ +@Slf4j +@RequiredArgsConstructor +public class RunOrQueueComponent { + private final String schedulerName; + private final TriggerService triggerService; + private final TaskExecutorComponent taskExecutor; + private final Map shouldRun = new ConcurrentHashMap<>(); + + /** + * Runs the given trigger if a free threads are available and the runAt time is + * not in the future. + * + * @return the reference to the {@link Future} with the key, if no threads are + * available it is resolved + */ + public TriggerKey execute(AddTriggerRequest triggerRequest) { + var trigger = triggerService.queue(triggerRequest); + + trigger = offerToRun(trigger); + // we will listen for the commit event to execute this trigger ... + return trigger.getKey(); + } + + private TriggerEntity offerToRun(TriggerEntity trigger) { + if (!trigger.shouldRunInFuture()) { + if (taskExecutor.getFreeThreads() > 0) { + trigger = triggerService.markTriggersAsRunning(trigger, schedulerName); + shouldRun.put(trigger.getId(), trigger); + log.debug("{} added for immediate execution, waiting for commit on={}", trigger.getKey(), schedulerName); + } else { + log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.", + taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey()); + } + } + return trigger; + } + + public boolean checkIfTrigerShouldRun(long triggerId) { + final var toRun = shouldRun.remove(triggerId); + if (toRun != null) { + taskExecutor.submit(toRun); + log.debug("{} immediately started on={}.", toRun.key(), schedulerName); + } + clearNotCreatedTriggers(); + return toRun != null; + } + + public boolean hasWaitingTriggers() { + clearNotCreatedTriggers(); + return shouldRun.size() > 0; + } + + private void clearNotCreatedTriggers() { + if (shouldRun.size() > 0) { + final var timeout = OffsetDateTime.now().plusSeconds(15); + final var triggers = shouldRun.entrySet().iterator(); + while (triggers.hasNext()) { + var entry = triggers.next(); + if (entry.getValue().getData().getCreatedTime().isAfter(timeout)) { + log.info("Removed {} which was not committed!", entry.getValue().key()); + triggers.remove(); + } + } + } + } +} diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java index f99aae70b..9211e6700 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java @@ -3,6 +3,7 @@ import java.io.Closeable; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -20,15 +21,20 @@ import org.sterl.spring.persistent_tasks.trigger.TriggerService; import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +/** + * The executor of a scheduler + *

+ * Not a spring bean! + *

+ */ @Slf4j public class TaskExecutorComponent implements Closeable { + private final String schedulerName; private final TriggerService triggerService; private final AtomicInteger maxThreads = new AtomicInteger(0); @Getter @@ -36,18 +42,21 @@ public class TaskExecutorComponent implements Closeable { private Duration maxShutdownWaitTime = Duration.ofSeconds(10); @Nullable private ExecutorService executor; + // also the LOCK object ... private final ConcurrentHashMap> runningTasks = new ConcurrentHashMap<>(); private final AtomicBoolean stopped = new AtomicBoolean(true); - - public TaskExecutorComponent(TriggerService triggerService, int maxThreads) { + + public TaskExecutorComponent(String schedulerName, TriggerService triggerService, int maxThreads) { super(); + this.schedulerName = schedulerName; this.triggerService = triggerService; this.maxThreads.set(maxThreads); } @NonNull public List> submit(List trigger) { - if (trigger == null || trigger.isEmpty()) return Collections.emptyList(); + if (trigger == null || trigger.isEmpty()) + return Collections.emptyList(); final List> result = new ArrayList<>(trigger.size()); for (TriggerEntity triggerEntity : trigger) { @@ -61,10 +70,16 @@ public Future submit(@Nullable TriggerEntity trigger) { if (trigger == null) { return CompletableFuture.completedFuture(null); } - if (stopped.get()) throw new IllegalStateException("Executor is already stopped"); + if (stopped.get() || executor == null) { + throw new IllegalStateException("Executor of " + schedulerName + " is already stopped"); + } + + Future result; + synchronized (runningTasks) { + result = executor.submit(() -> runTrigger(trigger)); + runningTasks.put(trigger, result); + } - final var result = executor.submit(() -> runTrigger(trigger)); - runningTasks.put(trigger, result); return result; } @@ -77,22 +92,20 @@ private TriggerKey runTrigger(TriggerEntity trigger) { } } - @SuppressWarnings("resource") - @PostConstruct public void start() { if (stopped.compareAndExchange(true, false)) { - synchronized (stopped) { + synchronized (runningTasks) { runningTasks.clear(); executor = Executors.newFixedThreadPool(maxThreads.get()); + log.info("Started {} with {} threads.", schedulerName, maxThreads.get()); } } } @Override - @PreDestroy public void close() { if (stopped.compareAndExchange(false, true)) { - synchronized (stopped) { + synchronized (runningTasks) { doShutdown(); } } @@ -102,8 +115,8 @@ private void doShutdown() { if (executor != null) { executor.shutdown(); if (runningTasks.size() > 0) { - log.info("Shutdown executor with {} running tasks, waiting for {}.", - runningTasks.size(), maxShutdownWaitTime); + log.info("Shutdown {} with {} running tasks, waiting for {}.", schedulerName, runningTasks.size(), + maxShutdownWaitTime); try { executor.awaitTermination(maxShutdownWaitTime.getSeconds(), TimeUnit.SECONDS); @@ -121,9 +134,16 @@ private void doShutdown() { } public void shutdownNow() { - stopped.set(true); - if (executor != null) executor.shutdownNow(); - executor = null; + if (stopped.compareAndExchange(false, true)) { + synchronized (runningTasks) { + if (executor != null) { + executor.shutdownNow(); + log.info("Force stop {} with {} running tasks", schedulerName, runningTasks.size()); + runningTasks.clear(); + executor = null; + } + } + } } public int getFreeThreads() { @@ -133,14 +153,18 @@ public int getFreeThreads() { return Math.max(maxThreads.get() - runningTasks.size(), 0); } - public int getRunningTasks() { + public int countRunning() { return runningTasks.size(); } + public Collection> getRunningTasks() { + return runningTasks.values(); + } + public boolean isStopped() { return stopped.get() || maxThreads.get() <= 0; } - + public List getRunningTriggers() { return Collections.list(this.runningTasks.keys()); } @@ -148,8 +172,9 @@ public List getRunningTriggers() { public void setMaxThreads(int value) { this.maxThreads.set(value); } + public int getMaxThreads() { - return this.maxThreads.get(); + return isStopped() ? 0 : this.maxThreads.get(); } public boolean isRunning(TriggerEntity trigger) { diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/config/SchedulerConfig.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/config/SchedulerConfig.java index b76ebf877..ca70cc75b 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/config/SchedulerConfig.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/config/SchedulerConfig.java @@ -2,6 +2,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.Optional; import org.springframework.beans.factory.annotation.Value; @@ -12,6 +13,7 @@ import org.springframework.transaction.support.TransactionTemplate; import org.sterl.spring.persistent_tasks.scheduler.SchedulerService; import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent; +import org.sterl.spring.persistent_tasks.scheduler.component.RunOrQueueComponent; import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent; import org.sterl.spring.persistent_tasks.trigger.TriggerService; @@ -52,9 +54,26 @@ SchedulerService schedulerService( TransactionTemplate trx) throws UnknownHostException { customizer = customizer.isEmpty() ? Optional.of(new SchedulerCustomizer() {}) : customizer; + final var name = customizer.get().name(); + final var maxShutdownWaitTime = Duration.ofSeconds(10); - return new SchedulerService(customizer.get().name(), triggerService, - new TaskExecutorComponent(triggerService, maxThreads), - editSchedulerStatus, trx); + return newSchedulerService(name, triggerService, editSchedulerStatus, maxThreads, maxShutdownWaitTime, trx); + } + + public static SchedulerService newSchedulerService(final String name, TriggerService triggerService, + EditSchedulerStatusComponent editSchedulerStatus, int maxThreads, final Duration maxShutdownWaitTime, + TransactionTemplate trx) { + + final var taskExecutor = new TaskExecutorComponent(name, triggerService, maxThreads); + if (maxShutdownWaitTime != null) taskExecutor.setMaxShutdownWaitTime(maxShutdownWaitTime); + + final var runOrQueue = new RunOrQueueComponent(name, triggerService, taskExecutor); + + return new SchedulerService(name, + triggerService, + taskExecutor, + editSchedulerStatus, + runOrQueue, + trx); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java index 00f517d7b..8265e00b9 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java @@ -15,7 +15,7 @@ @Entity @Table(name = "pt_scheduler", indexes = @Index(name = "idx_task_scheduler_status", columnList = "last_ping")) @Data -@ToString(of = { "id", "lastPing", "runnungTasks", "tasksSlotCount" }) +@ToString(of = { "id", "lastPing", "runningTasks", "tasksSlotCount" }) @EqualsAndHashCode(of = "id") @NoArgsConstructor public class SchedulerEntity { @@ -29,7 +29,7 @@ public class SchedulerEntity { private int tasksSlotCount; - private int runnungTasks; + private int runningTasks; private double systemLoadAverage = 0.0d; @@ -45,7 +45,7 @@ public SchedulerEntity(String name) { } public SchedulerEntity addRunning(int count) { - this.runnungTasks += count; + this.runningTasks += count; return this; } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/HasTriggerData.java b/core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/HasTriggerData.java index 76d57e836..440c5af26 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/HasTriggerData.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/shared/model/HasTriggerData.java @@ -29,6 +29,7 @@ default TaskId newTaskId() { } default boolean shouldRunInFuture() { - return getData().getRunAt().isAfter(OffsetDateTime.now()); + if (getData().getRunAt() == null) return true; + return getData().getRunAt().toEpochSecond() > OffsetDateTime.now().toEpochSecond(); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java b/core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java index 68c053b0f..34f20a58d 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java @@ -5,7 +5,10 @@ import java.util.Set; import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.data.domain.Sort.Direction; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; @@ -17,6 +20,16 @@ @NoRepositoryBean public interface TriggerDataRepository extends JpaRepository { + Sort DEFAULT_SORT = Sort.by(Direction.ASC, "data.createdTime"); + + default Pageable applyDefaultSortIfNeeded(Pageable page) { + var result = page; + if (page.getSort() == Sort.unsorted()) { + result = PageRequest.of(page.getPageNumber(), page.getPageSize(), DEFAULT_SORT); + } + return result; + } + @Query(""" SELECT e FROM #{#entityName} e WHERE ((:id IS NULL OR e.data.key.id LIKE :id) @@ -67,11 +80,10 @@ WHERE e.data.status IN ( :status ) """) @Modifying long deleteOlderThan(@Param("age") OffsetDateTime age); - + @Query(""" - SELECT e FROM #{#entityName} e - WHERE e.data.correlationId = :correlationId - ORDER BY e.data.createdTime ASC + SELECT e FROM #{#entityName} e + WHERE e.data.correlationId = :correlationId """) - List findByCorrelationId(@Param("correlationId") String correlationId); + List findByCorrelationId(@Param("correlationId") String correlationId, Pageable page); } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java index acf761955..c4f4ec883 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java @@ -193,8 +193,8 @@ public Optional updateRunAt(TriggerKey key, OffsetDateTime time) }); } - public List findTriggerByCorrelationId(String correlationId) { - return readTrigger.findTriggerByCorrelationId(correlationId); + public List findTriggerByCorrelationId(String correlationId, Pageable page) { + return readTrigger.findTriggerByCorrelationId(correlationId, page); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerConverter.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerConverter.java index 1b19065d7..9c60a2751 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerConverter.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerConverter.java @@ -16,6 +16,7 @@ public Trigger convert(TriggerEntity source) { result.setId(source.getId()); result.setInstanceId(source.getId()); result.setRunningOn(source.getRunningOn()); + result.setLastPing(source.getLastPing()); return result; } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/ReadTriggerComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/ReadTriggerComponent.java index 65d1da214..d3e948d9d 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/ReadTriggerComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/ReadTriggerComponent.java @@ -64,7 +64,7 @@ public Page listTriggers(TaskId task, Pag return triggerRepository.findAll(task.name(), page); } - public List findTriggerByCorrelationId(String correlationId) { - return triggerRepository.findByCorrelationId(correlationId); + public List findTriggerByCorrelationId(String correlationId, Pageable page) { + return triggerRepository.findByCorrelationId(correlationId, triggerRepository.applyDefaultSortIfNeeded(page)); } } diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java index 9b2ce05e5..2dc58d155 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/RunTriggerComponent.java @@ -81,7 +81,11 @@ private Optional failTaskAndState(RunTaskWithStateCommand runTask result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, null); } else { final OffsetDateTime retryAt = task.retryStrategy().retryAt(trigger.getData().getExecutionCount(), e); - log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e); + if (retryAt == null) { + log.error("Failed trigger={}, no further retries!", trigger.getKey(), e); + } else { + log.warn("Failed trigger={} with retryAt={}", trigger.getKey(), retryAt, e); + } result = editTrigger.failTrigger(trigger.getKey(), runTaskWithStateCommand.state(), e, retryAt); } return result; diff --git a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerFailedEvent.java b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerFailedEvent.java index 758408d68..25ee4c09d 100644 --- a/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerFailedEvent.java +++ b/core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerFailedEvent.java @@ -12,7 +12,8 @@ */ public record TriggerFailedEvent(long id, TriggerData data, Serializable state, - Exception exception, OffsetDateTime retryAt) implements TriggerLifeCycleEvent { + Exception exception, + OffsetDateTime retryAt) implements TriggerLifeCycleEvent { @Override public boolean isDone() { diff --git a/core/src/test/java/.gitignore b/core/src/test/java/.gitignore new file mode 100644 index 000000000..2b01424a0 --- /dev/null +++ b/core/src/test/java/.gitignore @@ -0,0 +1 @@ +/FooTest.java diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java index 138a1d394..727c395e4 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java @@ -24,7 +24,7 @@ import org.sterl.spring.persistent_tasks.history.HistoryService; import org.sterl.spring.persistent_tasks.scheduler.SchedulerService; import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent; -import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent; +import org.sterl.spring.persistent_tasks.scheduler.config.SchedulerConfig; import org.sterl.spring.persistent_tasks.task.TaskService; import org.sterl.spring.persistent_tasks.test.AsyncAsserts; import org.sterl.spring.persistent_tasks.test.PersistentTaskTestService; @@ -36,7 +36,7 @@ import jakarta.persistence.EntityManager; import lombok.RequiredArgsConstructor; -// @ActiveProfiles("mssql") // postgres mssql mariadb mysql +//@ActiveProfiles("mssql") // postgres mssql mariadb mysql @SpringBootTest(classes = SampleApp.class, webEnvironment = WebEnvironment.RANDOM_PORT) @RecordApplicationEvents public class AbstractSpringTest { @@ -94,9 +94,8 @@ HibernateAsserts hibernateAsserts(EntityManager entityManager) { SchedulerService schedulerA(TriggerService triggerService, EditSchedulerStatusComponent editSchedulerStatus, TransactionTemplate trx) throws UnknownHostException { - final var taskExecutor = new TaskExecutorComponent(triggerService, 10); - taskExecutor.setMaxShutdownWaitTime(Duration.ofSeconds(0)); - return new SchedulerService("schedulerA", triggerService, taskExecutor, editSchedulerStatus, trx); + final var name = "schedulerA"; + return SchedulerConfig.newSchedulerService(name, triggerService, editSchedulerStatus, 10, Duration.ZERO, trx); } @Bean @@ -104,9 +103,8 @@ SchedulerService schedulerA(TriggerService triggerService, EditSchedulerStatusCo SchedulerService schedulerB(TriggerService triggerService, EditSchedulerStatusComponent editSchedulerStatus, TransactionTemplate trx) throws UnknownHostException { - final var taskExecutor = new TaskExecutorComponent(triggerService, 20); - taskExecutor.setMaxShutdownWaitTime(Duration.ofSeconds(0)); - return new SchedulerService("schedulerB", triggerService, taskExecutor, editSchedulerStatus, trx); + final var name = "schedulerB"; + return SchedulerConfig.newSchedulerService(name, triggerService, editSchedulerStatus, 20, Duration.ZERO, trx); } /** diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/PersistentTaskServiceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/PersistentTaskServiceTest.java index 57e883a49..75220b533 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/PersistentTaskServiceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/PersistentTaskServiceTest.java @@ -41,10 +41,11 @@ void testChainedTasks() throws Exception { final var correlationId = UUID.randomUUID().toString(); // WHEN + System.err.println("-----"); subject.runOrQueue(task1.newTrigger(234).correlationId(correlationId).build()); // THEN - asserts.awaitOrdered("234::chainTask1", "chainTask1::234::chainTask1"); + asserts.awaitOrdered(persistentTaskTestService::awaitRunningTriggers, "234::chainTask1", "chainTask1::234::chainTask1"); assertThat(correlationId).isEqualTo(correlationFound.get()); // AND var trigger= subject.findAllTriggerByCorrelationId(correlationId); diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/history/HistoryServiceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/history/HistoryServiceTest.java index b486e28f5..28f5bfce3 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/history/HistoryServiceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/history/HistoryServiceTest.java @@ -13,8 +13,8 @@ import org.sterl.spring.persistent_tasks.AbstractSpringTest.TaskConfig.Task3; import org.sterl.spring.persistent_tasks.PersistentTaskService; import org.sterl.spring.persistent_tasks.api.AddTriggerRequest; +import org.sterl.spring.persistent_tasks.api.TriggerKey; import org.sterl.spring.persistent_tasks.api.TriggerStatus; -import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; class HistoryServiceTest extends AbstractSpringTest { @@ -32,13 +32,13 @@ void testReQueueTrigger() { // WHEN asserts.clear(); - final Optional t = subject.reQueue(trigger.getId(), OffsetDateTime.now()); + final Optional newKey = subject.reQueue(trigger.getId(), OffsetDateTime.now()); // THEN - assertThat(t).isPresent(); + assertThat(newKey).isPresent(); + assertThat(newKey.get()).isEqualTo(trigger.key()); // AND - persistentTaskTestService.runNextTrigger(); - asserts.assertValue(Task3.NAME + "::Hallo"); + asserts.awaitValue(persistentTaskTestService::awaitRunningTriggers, Task3.NAME + "::Hallo"); // AND assertThat(subject.countTriggers(trigger.getKey())).isEqualTo(2); } diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java index 2da190bb1..e5cbbe9b8 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java @@ -5,7 +5,6 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,7 +62,7 @@ void testWillTriggerOnlyFreeThreadSize() throws Exception { // AND final SchedulerEntity scheduler = subject.getScheduler(); - assertThat(scheduler.getRunnungTasks()).isEqualTo(10); + assertThat(scheduler.getRunningTasks()).isEqualTo(10); } @Test @@ -101,17 +100,13 @@ void testRunOrQueue() throws Exception { var ref = subject.runOrQueue(triggerRequest); // THEN - assertThat(subject.getScheduler().getRunnungTasks()).isOne(); - // AND - persistentTaskTestService.scheduleNextTriggersAndWait(Duration.ofSeconds(3)); - + asserts.awaitValue(Task3.NAME + "::Hallo"); assertThat(persistentTaskService.getLastTriggerData(ref).get().getStatus()) .isEqualTo(TriggerStatus.SUCCESS); - asserts.assertValue(Task3.NAME + "::Hallo"); } @Test - void testQueuedInFuture() throws TimeoutException, InterruptedException { + void testQueuedInFuture() { // GIVEN final AddTriggerRequest triggerRequest = Task3.ID .newTrigger("Hallo") @@ -120,7 +115,7 @@ void testQueuedInFuture() throws TimeoutException, InterruptedException { subject.runOrQueue(triggerRequest); // WHEN - persistentTaskTestService.scheduleNextTriggersAndWait(Duration.ofSeconds(3)); + persistentTaskTestService.scheduleNextTriggersAndWait(); // THEN asserts.assertMissing(Task3.NAME + "::Hallo"); diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java b/core/src/test/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java index 05269d6bf..c295dfcf2 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java @@ -1,7 +1,6 @@ package org.sterl.spring.persistent_tasks.test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import java.time.Duration; import java.util.ArrayList; @@ -72,10 +71,11 @@ public void awaitValue(String value) { */ public void awaitValue(Runnable fn, String value) { final var start = System.currentTimeMillis(); + if (fn != null) fn.run(); // ensure we call the given function at least once while (!values.contains(value) && (System.currentTimeMillis() - start <= defaultTimeout.toMillis())) { try { - Thread.sleep(50); + Thread.sleep(100); if (fn != null) fn.run(); } catch (InterruptedException e) { if (Thread.interrupted()) break; @@ -129,10 +129,6 @@ public void assertMissing(String value, String... inValues) { public void awaitValueOnce(String value) { awaitValue(null, value); - assertThat(values).contains(value); - var occurrences = values.stream().filter(e -> value.equals(e)).count(); - if (occurrences > 1) { - fail("Expected " + value + " to be present once but was present " + occurrences + " times."); - } + assertThat(values).containsOnlyOnce(value); } } diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java b/core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java index a08e05d47..c71595c6e 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java @@ -5,10 +5,14 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.springframework.stereotype.Service; import org.sterl.spring.persistent_tasks.api.TriggerKey; @@ -17,7 +21,9 @@ import org.sterl.spring.persistent_tasks.trigger.TriggerService; import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; +import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.SneakyThrows; @Service @@ -27,6 +33,9 @@ public class PersistentTaskTestService { private final List schedulers; private final TriggerService triggerService; + @Getter @Setter + private Duration defaultTimeout = Duration.ofSeconds(5); + /** * Runs just the next trigger, if it is due to run. * @@ -40,12 +49,12 @@ public Optional runNextTrigger() { * Runs all triggers which are due until the given time. One by one, so new triggers are picked up. * * @param dueUntil date to also check for trigger in the future - * @return the triggeres executed, to directly check if they have been successful + * @return the triggers executed, to directly check if they have been successful */ public List runAllDueTrigger(OffsetDateTime dueUntil) { var result = new ArrayList(); List trigger; - while( (trigger = triggerService.lockNextTrigger("test", 1, dueUntil)).size() > 0) { + while ( (trigger = triggerService.lockNextTrigger("test", 1, dueUntil)).size() > 0 ) { var key = triggerService.run(trigger.getFirst()); if (key.isPresent()) result.add(key.get()); } @@ -53,61 +62,99 @@ public List runAllDueTrigger(OffsetDateTime dueUntil) { } /** - * Triggers the execution of all pending triggers. + * Triggers the execution of all pending triggers. We also add one second + * to ensure we select all right now created triggers too. * * @return the reference to the {@link TriggerKey} of the running tasks */ public List> scheduleNextTriggers() { var result = new ArrayList>(); - if (schedulers.isEmpty()) throw new IllegalStateException("No schedulers found, cannot run any triggers!"); + assertHasScheduler(); for (SchedulerService s : schedulers) { - result.addAll(s.triggerNextTasks()); + result.addAll(s.triggerNextTasks(OffsetDateTime.now().plusSeconds(1))); } return result; } + public void assertHasScheduler() { + assertThat(schedulers).describedAs("No schedulers found, cannot run any triggers!").isNotEmpty(); + } + /** * Triggers the execution of all pending triggers and wait for the result. */ @SneakyThrows - public List scheduleNextTriggersAndWait(Duration maxWaitTime) { - final var result = new ArrayList(); + public Set scheduleNextTriggersAndWait() { + return scheduleNextTriggersAndWait(defaultTimeout); + } + + /** + * Triggers the execution of all pending triggers and wait for the result. + */ + @SneakyThrows + public Set scheduleNextTriggersAndWait(Duration maxWaitTime) { + final var result = new LinkedHashSet(); final var timeOut = System.currentTimeMillis() + maxWaitTime.toMillis(); - List> triggers; - var isSomethingRunning = false; + result.addAll(awaitRunningTriggers(maxWaitTime)); + List> newTriggers; do { - triggers = scheduleNextTriggers(); - for (Future future : triggers) { - try { - result.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - final Throwable cause = e.getCause(); - throw cause == null ? e : cause; - } - } - isSomethingRunning = hasRunningTriggers(); - if (isSomethingRunning) { - Thread.sleep(Duration.ofMillis(100)); - } - if (System.currentTimeMillis() > timeOut) { throw new RuntimeException("Timeout waiting for triggers after " + maxWaitTime); } - } while (!triggers.isEmpty() || isSomethingRunning); + newTriggers = scheduleNextTriggers(); + result.addAll(awaitTriggers(maxWaitTime, newTriggers)); + } while (newTriggers.size() > 0); + return result; + } + + /** + * Just waits for the current running triggers + * @return return the keys of the currently scheduled triggers + */ + public List awaitRunningTriggers() { + return awaitRunningTriggers(defaultTimeout); + } + + /** + * Just waits for the current running triggers + * + * @param duration how long to wait + * @return return the keys of the currently scheduled triggers + */ + @SneakyThrows + public List awaitRunningTriggers(Duration duration) { + assertHasScheduler(); + List> running = this.schedulers.stream() + .flatMap(s -> s.getRunning().stream()) + .toList(); + + return awaitTriggers(duration, running); + } + public ArrayList awaitTriggers(Duration duration, List> running) throws Throwable { + final var result = new ArrayList(); + final var totalWaitUntil = System.currentTimeMillis() + duration.toMillis(); + for (Future t : running) { + try { + result.add(t.get(totalWaitUntil - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final Throwable cause = e.getCause(); + throw cause == null ? e : cause; + } + } return result; } - public boolean hasRunningTriggers() { - var running = this.schedulers.stream() - .map(s -> s.hasRunningTriggers()) - .filter(r -> r) - .findAny(); + public int countRunningTriggers() { + return schedulers.stream().mapToInt(s -> s.getRunning().size()).sum(); + } - return running.isPresent() && running.get() == true; + public boolean hasRunningTriggers() { + assertHasScheduler(); + return countRunningTriggers() > 0 ; } public void assertNoMoreTriggers() { diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java index 2f3f7ed66..133056dac 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/TriggerServiceTest.java @@ -340,9 +340,11 @@ void testLockTrigger() throws Exception { for (int i = 1; i <= 100; ++i) { lockInvocations.add(() -> triggerService.run(triggerService.lockNextTrigger("test"))); } - + Thread.sleep(500); executor.invokeAll(lockInvocations); - // start any others - as MSSQL and mySQL has now row lock, only table locks implemented + executor.invokeAll(lockInvocations); + executor.invokeAll(lockInvocations); + // start any others - as MSSQL and mySQL has no row lock, only table locks implemented var s = persistentTaskTestService.scheduleNextTriggersAndWait(Duration.ofSeconds(2)); System.err.println("---------> " + s.size()); diff --git a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerResourceTest.java b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerResourceTest.java index a7bc61fd6..f5e79c348 100644 --- a/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerResourceTest.java +++ b/core/src/test/java/org/sterl/spring/persistent_tasks/trigger/api/TriggerResourceTest.java @@ -100,9 +100,9 @@ void testSearchById() { @Test void testSearchByCorrelationId() { // GIVEN - var t1 = triggerService.queue(TriggerBuilder.newTrigger("task1").build()); - var t2 = triggerService.queue(TriggerBuilder.newTrigger("task1").build()); - var t3 = triggerService.queue(TriggerBuilder.newTrigger("task2").build()); + var t1 = triggerService.queue(TriggerBuilder.newTrigger("task1").correlationId(UUID.randomUUID().toString()).build()); + var t2 = triggerService.queue(TriggerBuilder.newTrigger("task1").build()); // null + var t3 = triggerService.queue(TriggerBuilder.newTrigger("task2").correlationId(UUID.randomUUID().toString()).build()); // WHEN var response = template.exchange( @@ -112,8 +112,8 @@ void testSearchByCorrelationId() { String.class); // THEN assertThat(response.getBody()).contains(t3.getData().getCorrelationId()); - assertThat(response.getBody()).doesNotContain(t2.getData().getCorrelationId()); assertThat(response.getBody()).doesNotContain(t1.getData().getCorrelationId()); + assertThat(response.getBody()).doesNotContain(t2.getData().getKey().getId()); } @Test diff --git a/db/src/main/resources/spring-persistent-tasks/db/pt-changelog-v3.xml b/db/src/main/resources/spring-persistent-tasks/db/pt-changelog-v3.xml new file mode 100644 index 000000000..8a2f35fd5 --- /dev/null +++ b/db/src/main/resources/spring-persistent-tasks/db/pt-changelog-v3.xml @@ -0,0 +1,16 @@ + + + + + + + \ No newline at end of file diff --git a/example/pom.xml b/example/pom.xml index e9a8e662d..6a0810d73 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -17,7 +17,7 @@ 1.5.6 --> - 1.6.0-SNAPSHOT + 1.6.1-SNAPSHOT diff --git a/example/src/main/java/org/sterl/spring/example_app/ExampleApplication.java b/example/src/main/java/org/sterl/spring/example_app/ExampleApplication.java index ffbed95ff..720b907b4 100644 --- a/example/src/main/java/org/sterl/spring/example_app/ExampleApplication.java +++ b/example/src/main/java/org/sterl/spring/example_app/ExampleApplication.java @@ -1,6 +1,7 @@ package org.sterl.spring.example_app; import java.net.UnknownHostException; +import java.time.Duration; import org.springdoc.core.models.GroupedOpenApi; import org.springframework.boot.SpringApplication; @@ -22,6 +23,7 @@ import org.sterl.spring.persistent_tasks.scheduler.SchedulerService; import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent; import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent; +import org.sterl.spring.persistent_tasks.scheduler.config.SchedulerConfig; import org.sterl.spring.persistent_tasks.scheduler.config.SchedulerConfig.SchedulerCustomizer; import org.sterl.spring.persistent_tasks.trigger.TriggerService; import org.sterl.spring.persistent_tasks_ui.EnableSpringPersistentTasksUI; @@ -65,14 +67,14 @@ GroupedOpenApi springPersistentTasksApi() { // just one more for demonstration @Bean(name = "schedulerB", initMethod = "start", destroyMethod = "stop") - @SuppressWarnings("resource") SchedulerService schedulerB( TriggerService triggerService, EditSchedulerStatusComponent editSchedulerStatus, TransactionTemplate trx) throws UnknownHostException { - return new SchedulerService("schedulerB", triggerService, - new TaskExecutorComponent(triggerService, 7), editSchedulerStatus, trx); + return SchedulerConfig.newSchedulerService("schedulerB", + triggerService, + editSchedulerStatus, 7, Duration.ofSeconds(1), trx); } @Bean diff --git a/test/src/main/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java b/test/src/main/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java index 05269d6bf..c295dfcf2 100644 --- a/test/src/main/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java +++ b/test/src/main/java/org/sterl/spring/persistent_tasks/test/AsyncAsserts.java @@ -1,7 +1,6 @@ package org.sterl.spring.persistent_tasks.test; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import java.time.Duration; import java.util.ArrayList; @@ -72,10 +71,11 @@ public void awaitValue(String value) { */ public void awaitValue(Runnable fn, String value) { final var start = System.currentTimeMillis(); + if (fn != null) fn.run(); // ensure we call the given function at least once while (!values.contains(value) && (System.currentTimeMillis() - start <= defaultTimeout.toMillis())) { try { - Thread.sleep(50); + Thread.sleep(100); if (fn != null) fn.run(); } catch (InterruptedException e) { if (Thread.interrupted()) break; @@ -129,10 +129,6 @@ public void assertMissing(String value, String... inValues) { public void awaitValueOnce(String value) { awaitValue(null, value); - assertThat(values).contains(value); - var occurrences = values.stream().filter(e -> value.equals(e)).count(); - if (occurrences > 1) { - fail("Expected " + value + " to be present once but was present " + occurrences + " times."); - } + assertThat(values).containsOnlyOnce(value); } } diff --git a/test/src/main/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java b/test/src/main/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java index a08e05d47..c71595c6e 100644 --- a/test/src/main/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java +++ b/test/src/main/java/org/sterl/spring/persistent_tasks/test/PersistentTaskTestService.java @@ -5,10 +5,14 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.springframework.stereotype.Service; import org.sterl.spring.persistent_tasks.api.TriggerKey; @@ -17,7 +21,9 @@ import org.sterl.spring.persistent_tasks.trigger.TriggerService; import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity; +import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.SneakyThrows; @Service @@ -27,6 +33,9 @@ public class PersistentTaskTestService { private final List schedulers; private final TriggerService triggerService; + @Getter @Setter + private Duration defaultTimeout = Duration.ofSeconds(5); + /** * Runs just the next trigger, if it is due to run. * @@ -40,12 +49,12 @@ public Optional runNextTrigger() { * Runs all triggers which are due until the given time. One by one, so new triggers are picked up. * * @param dueUntil date to also check for trigger in the future - * @return the triggeres executed, to directly check if they have been successful + * @return the triggers executed, to directly check if they have been successful */ public List runAllDueTrigger(OffsetDateTime dueUntil) { var result = new ArrayList(); List trigger; - while( (trigger = triggerService.lockNextTrigger("test", 1, dueUntil)).size() > 0) { + while ( (trigger = triggerService.lockNextTrigger("test", 1, dueUntil)).size() > 0 ) { var key = triggerService.run(trigger.getFirst()); if (key.isPresent()) result.add(key.get()); } @@ -53,61 +62,99 @@ public List runAllDueTrigger(OffsetDateTime dueUntil) { } /** - * Triggers the execution of all pending triggers. + * Triggers the execution of all pending triggers. We also add one second + * to ensure we select all right now created triggers too. * * @return the reference to the {@link TriggerKey} of the running tasks */ public List> scheduleNextTriggers() { var result = new ArrayList>(); - if (schedulers.isEmpty()) throw new IllegalStateException("No schedulers found, cannot run any triggers!"); + assertHasScheduler(); for (SchedulerService s : schedulers) { - result.addAll(s.triggerNextTasks()); + result.addAll(s.triggerNextTasks(OffsetDateTime.now().plusSeconds(1))); } return result; } + public void assertHasScheduler() { + assertThat(schedulers).describedAs("No schedulers found, cannot run any triggers!").isNotEmpty(); + } + /** * Triggers the execution of all pending triggers and wait for the result. */ @SneakyThrows - public List scheduleNextTriggersAndWait(Duration maxWaitTime) { - final var result = new ArrayList(); + public Set scheduleNextTriggersAndWait() { + return scheduleNextTriggersAndWait(defaultTimeout); + } + + /** + * Triggers the execution of all pending triggers and wait for the result. + */ + @SneakyThrows + public Set scheduleNextTriggersAndWait(Duration maxWaitTime) { + final var result = new LinkedHashSet(); final var timeOut = System.currentTimeMillis() + maxWaitTime.toMillis(); - List> triggers; - var isSomethingRunning = false; + result.addAll(awaitRunningTriggers(maxWaitTime)); + List> newTriggers; do { - triggers = scheduleNextTriggers(); - for (Future future : triggers) { - try { - result.add(future.get()); - } catch (InterruptedException | ExecutionException e) { - final Throwable cause = e.getCause(); - throw cause == null ? e : cause; - } - } - isSomethingRunning = hasRunningTriggers(); - if (isSomethingRunning) { - Thread.sleep(Duration.ofMillis(100)); - } - if (System.currentTimeMillis() > timeOut) { throw new RuntimeException("Timeout waiting for triggers after " + maxWaitTime); } - } while (!triggers.isEmpty() || isSomethingRunning); + newTriggers = scheduleNextTriggers(); + result.addAll(awaitTriggers(maxWaitTime, newTriggers)); + } while (newTriggers.size() > 0); + return result; + } + + /** + * Just waits for the current running triggers + * @return return the keys of the currently scheduled triggers + */ + public List awaitRunningTriggers() { + return awaitRunningTriggers(defaultTimeout); + } + + /** + * Just waits for the current running triggers + * + * @param duration how long to wait + * @return return the keys of the currently scheduled triggers + */ + @SneakyThrows + public List awaitRunningTriggers(Duration duration) { + assertHasScheduler(); + List> running = this.schedulers.stream() + .flatMap(s -> s.getRunning().stream()) + .toList(); + + return awaitTriggers(duration, running); + } + public ArrayList awaitTriggers(Duration duration, List> running) throws Throwable { + final var result = new ArrayList(); + final var totalWaitUntil = System.currentTimeMillis() + duration.toMillis(); + for (Future t : running) { + try { + result.add(t.get(totalWaitUntil - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + final Throwable cause = e.getCause(); + throw cause == null ? e : cause; + } + } return result; } - public boolean hasRunningTriggers() { - var running = this.schedulers.stream() - .map(s -> s.hasRunningTriggers()) - .filter(r -> r) - .findAny(); + public int countRunningTriggers() { + return schedulers.stream().mapToInt(s -> s.getRunning().size()).sum(); + } - return running.isPresent() && running.get() == true; + public boolean hasRunningTriggers() { + assertHasScheduler(); + return countRunningTriggers() > 0 ; } public void assertNoMoreTriggers() { diff --git a/ui/src/scheduler/views/scheduler.view.tsx b/ui/src/scheduler/views/scheduler.view.tsx index 5fd782a97..47e83a2c8 100644 --- a/ui/src/scheduler/views/scheduler.view.tsx +++ b/ui/src/scheduler/views/scheduler.view.tsx @@ -20,26 +20,28 @@ const SchedulerStatusView = ({ scheduler }: Props) => { Last Ping: {durationSince(new Date(scheduler.lastPing))} - + {"Running " + - scheduler.runnungTasks + + scheduler.runningTasks + " of " + scheduler.tasksSlotCount} - CPU + + CPU + { > - + Memory{" "} {formatMemory(scheduler.usedHeap) + " of " + formatMemory(scheduler.maxHeap)} { export interface SchedulerEntity { id: string; tasksSlotCount: number; - runnungTasks: number; + runningTasks: number; systemLoadAverage: number; maxHeap: number; usedHeap: number; @@ -63,6 +63,7 @@ export interface Trigger { runningOn: string; createdTime: string; runAt: string; + lastPing: string; start: string; end: string; executionCount: number; diff --git a/ui/src/shared/view/trigger-list-item.view.tsx b/ui/src/shared/view/trigger-list-item.view.tsx index 0da497e95..8869d94c7 100644 --- a/ui/src/shared/view/trigger-list-item.view.tsx +++ b/ui/src/shared/view/trigger-list-item.view.tsx @@ -117,6 +117,12 @@ const TriggerItemView = ({ export default TriggerItemView; +function runningSince(value?: string) { + if (!value) return ""; + const msRuntime = new Date().getTime() - new Date(value).getTime(); + return `since ${formatMs(msRuntime)}`; +} + const TriggerCompactView = ({ trigger }: { trigger: Trigger }) => ( @@ -138,7 +144,9 @@ const TriggerCompactView = ({ trigger }: { trigger: Trigger }) => ( {trigger.runningOn ? ( ) : ( + {trigger.lastPing ? ( + + + + + + ) : undefined}