Skip to content

Commit 3e091e2

Browse files
committed
cleanup and better test utils and fixed spelling bugs
1 parent 1943719 commit 3e091e2

File tree

16 files changed

+214
-126
lines changed

16 files changed

+214
-126
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/PersistentTaskService.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import java.util.Optional;
99

1010
import org.springframework.context.event.EventListener;
11+
import org.springframework.data.domain.PageRequest;
1112
import org.springframework.data.domain.Pageable;
13+
import org.springframework.data.domain.Sort;
14+
import org.springframework.data.domain.Sort.Direction;
1215
import org.springframework.lang.NonNull;
1316
import org.springframework.stereotype.Service;
1417
import org.springframework.transaction.annotation.Transactional;
@@ -122,24 +125,46 @@ public <T extends Serializable> TriggerKey runOrQueue(
122125

123126
/**
124127
* Returns all triggers for a correlationId sorted by the creation time.
128+
* Data is limited to overall 300 elements.
129+
*
125130
* @param correlationId the id to search for
126131
* @return the found {@link TriggerData} sorted by create time ASC
127132
*/
128133
@Transactional(readOnly = true, timeout = 5)
129134
public List<TriggerData> findAllTriggerByCorrelationId(String correlationId) {
130135

131-
var running = triggerService.findTriggerByCorrelationId(correlationId)
136+
var running = triggerService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(100))
132137
.stream().map(TriggerEntity::getData)
133138
.toList();
134139

135-
var done = historyService.findTriggerByCorrelationId(correlationId)
140+
var done = historyService.findTriggerByCorrelationId(correlationId, Pageable.ofSize(200))
136141
.stream().map(TriggerHistoryLastStateEntity::getData)
137142
.toList();
138143

139-
140144
var result = new ArrayList<TriggerData>(running.size() + done.size());
141145
result.addAll(done);
142146
result.addAll(running);
143147
return result;
144148
}
149+
150+
/**
151+
* Returns the first info to a trigger based on the correlationId.
152+
*
153+
* @param correlationId the id to search for
154+
* @return the found {@link TriggerData}
155+
*/
156+
@Transactional(readOnly = true, timeout = 5)
157+
public Optional<TriggerData> findLastTriggerByCorrelationId(String correlationId) {
158+
final var page = PageRequest.of(0, 1, Sort.by(Direction.DESC, "data.createdTime"));
159+
var result = triggerService.findTriggerByCorrelationId(correlationId, page)
160+
.stream().map(TriggerEntity::getData)
161+
.toList();
162+
163+
if (result.isEmpty()) {
164+
result = historyService.findTriggerByCorrelationId(correlationId, page)
165+
.stream().map(TriggerHistoryLastStateEntity::getData)
166+
.toList();
167+
}
168+
return result.isEmpty() ? Optional.empty() : Optional.of(result.getFirst());
169+
}
145170
}

core/src/main/java/org/sterl/spring/persistent_tasks/api/RetryStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ public boolean shouldRetry(int executionCount, @Nullable Exception error) {
107107
}
108108
@Override
109109
public OffsetDateTime retryAt(int executionCount, @Nullable Exception error) {
110-
return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
110+
var next = OffsetDateTime.now();
111+
if (scalingFactor > 0 && executionCount > 0) return OffsetDateTime.now().plus(scalingFactor * executionCount, unit);
112+
return next;
111113
}
112114
}
113115

core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public List<TaskStatusHistoryOverview> taskStatusHistory() {
121121
return triggerHistoryLastStateRepository.listTriggerStatus();
122122
}
123123

124-
public List<TriggerHistoryLastStateEntity> findTriggerByCorrelationId(String correlationId) {
125-
return triggerHistoryLastStateRepository.findByCorrelationId(correlationId);
124+
public List<TriggerHistoryLastStateEntity> findTriggerByCorrelationId(String correlationId, Pageable page) {
125+
return triggerHistoryLastStateRepository.findByCorrelationId(correlationId, page);
126126
}
127127
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.Serializable;
44
import java.time.OffsetDateTime;
5+
import java.util.Collection;
56
import java.util.Collections;
67
import java.util.List;
78
import java.util.Optional;
@@ -102,9 +103,9 @@ public List<Future<TriggerKey>> triggerNextTasks() {
102103
public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
103104
if (taskExecutor.getFreeThreads() > 0) {
104105
final var result = trx.execute(t -> {
105-
var status = editSchedulerStatus.checkinToRegistry(name, taskExecutor.getRunningTasks(), taskExecutor.getMaxThreads());
106106
var triggers = triggerService.lockNextTrigger(name, taskExecutor.getFreeThreads(), timeDue);
107-
status.addRunning(triggers.size());
107+
editSchedulerStatus.checkinToRegistry(name,
108+
taskExecutor.countRunning() + triggers.size(), taskExecutor.getMaxThreads());
108109
return triggers;
109110
});
110111

@@ -114,7 +115,7 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
114115
taskExecutor.getFreeThreads(),
115116
taskExecutor.getMaxThreads(),
116117
timeDue);
117-
editSchedulerStatus.checkinToRegistry(name, taskExecutor.getRunningTasks(), taskExecutor.getMaxThreads());
118+
editSchedulerStatus.checkinToRegistry(name, taskExecutor.countRunning(), taskExecutor.getMaxThreads());
118119
return Collections.emptyList();
119120
}
120121
}
@@ -134,7 +135,7 @@ public <T extends Serializable> TriggerKey runOrQueue(AddTriggerRequest<T> trigg
134135
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
135136
public void checkIfTrigerShouldRun(TriggerAddedEvent addedTrigger) {
136137
if (runOrQueue.checkIfTrigerShouldRun(addedTrigger.id())) {
137-
editSchedulerStatus.checkinToRegistry(name, taskExecutor.getRunningTasks(), taskExecutor.getMaxThreads());
138+
editSchedulerStatus.checkinToRegistry(name, taskExecutor.countRunning(), taskExecutor.getMaxThreads());
138139
}
139140
}
140141

@@ -158,8 +159,11 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
158159
public List<SchedulerEntity> listAll() {
159160
return editSchedulerStatus.listAll();
160161
}
161-
162+
public Collection<Future<TriggerKey>> getRunning() {
163+
return taskExecutor.getRunningTasks();
164+
}
162165
public boolean hasRunningTriggers() {
163-
return !taskExecutor.isStopped() && taskExecutor.getRunningTriggers().size() > 0;
166+
return !taskExecutor.isStopped()
167+
&& (taskExecutor.countRunning() > 0 || runOrQueue.hasWaitingTriggers());
164168
}
165169
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/EditSchedulerStatusComponent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public SchedulerEntity checkinToRegistry(String name, int runningTasks, int maxT
3131
result.setSystemLoadAverage(os.getSystemLoadAverage());
3232
result.setMaxHeap(memory.getHeapMemoryUsage().getMax());
3333
result.setUsedHeap(memory.getHeapMemoryUsage().getUsed());
34-
result.setRunnungTasks(runningTasks);
34+
result.setRunningTasks(runningTasks);
3535
result.setTasksSlotCount(maxTasks);
3636

3737
result.setLastPing(OffsetDateTime.now());

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/RunOrQueueComponent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,15 @@ public boolean checkIfTrigerShouldRun(long triggerId) {
6565
clearNotCreatedTriggers();
6666
return toRun != null;
6767
}
68+
69+
public boolean hasWaitingTriggers() {
70+
clearNotCreatedTriggers();
71+
return shouldRun.size() > 0;
72+
}
6873

6974
private void clearNotCreatedTriggers() {
7075
if (shouldRun.size() > 0) {
71-
final var timeout = OffsetDateTime.now().minusSeconds(5);
76+
final var timeout = OffsetDateTime.now().plusSeconds(15);
7277
final var triggers = shouldRun.entrySet().iterator();
7378
while (triggers.hasNext()) {
7479
var entry = triggers.next();

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Closeable;
44
import java.time.Duration;
55
import java.util.ArrayList;
6+
import java.util.Collection;
67
import java.util.Collections;
78
import java.util.List;
89
import java.util.concurrent.CompletableFuture;
@@ -131,7 +132,8 @@ public void shutdownNow() {
131132
if (executor != null) {
132133
synchronized (executor) {
133134
executor.shutdownNow();
134-
log.info("Force stop {} with {} running tasks", schedulerName, getRunningTasks());
135+
log.info("Force stop {} with {} running tasks", schedulerName, runningTasks.size());
136+
runningTasks.clear();
135137
executor = null;
136138
}
137139
}
@@ -144,11 +146,15 @@ public int getFreeThreads() {
144146
}
145147
return Math.max(maxThreads.get() - runningTasks.size(), 0);
146148
}
147-
148-
public int getRunningTasks() {
149+
150+
public int countRunning() {
149151
return runningTasks.size();
150152
}
151153

154+
public Collection<Future<TriggerKey>> getRunningTasks() {
155+
return runningTasks.values();
156+
}
157+
152158
public boolean isStopped() {
153159
return stopped.get() || maxThreads.get() <= 0;
154160
}
@@ -161,7 +167,7 @@ public void setMaxThreads(int value) {
161167
this.maxThreads.set(value);
162168
}
163169
public int getMaxThreads() {
164-
return this.maxThreads.get();
170+
return isStopped() ? 0 : this.maxThreads.get();
165171
}
166172

167173
public boolean isRunning(TriggerEntity trigger) {

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/entity/SchedulerEntity.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
@Entity
1616
@Table(name = "pt_scheduler", indexes = @Index(name = "idx_task_scheduler_status", columnList = "last_ping"))
1717
@Data
18-
@ToString(of = { "id", "lastPing", "runnungTasks", "tasksSlotCount" })
18+
@ToString(of = { "id", "lastPing", "runningTasks", "tasksSlotCount" })
1919
@EqualsAndHashCode(of = "id")
2020
@NoArgsConstructor
2121
public class SchedulerEntity {
@@ -29,7 +29,7 @@ public class SchedulerEntity {
2929

3030
private int tasksSlotCount;
3131

32-
private int runnungTasks;
32+
private int runningTasks;
3333

3434
private double systemLoadAverage = 0.0d;
3535

@@ -45,7 +45,7 @@ public SchedulerEntity(String name) {
4545
}
4646

4747
public SchedulerEntity addRunning(int count) {
48-
this.runnungTasks += count;
48+
this.runningTasks += count;
4949
return this;
5050
}
5151
}

core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import java.util.Set;
66

77
import org.springframework.data.domain.Page;
8+
import org.springframework.data.domain.PageRequest;
89
import org.springframework.data.domain.Pageable;
10+
import org.springframework.data.domain.Sort;
11+
import org.springframework.data.domain.Sort.Direction;
912
import org.springframework.data.jpa.repository.JpaRepository;
1013
import org.springframework.data.jpa.repository.Modifying;
1114
import org.springframework.data.jpa.repository.Query;
@@ -17,6 +20,16 @@
1720

1821
@NoRepositoryBean
1922
public interface TriggerDataRepository<T extends HasTriggerData> extends JpaRepository<T, Long> {
23+
Sort DEFAULT_SORT = Sort.by(Direction.ASC, "data.createdTime");
24+
25+
default Pageable applyDefaultSortIfNeeded(Pageable page) {
26+
var result = page;
27+
if (page.getSort() == Sort.unsorted()) {
28+
result = PageRequest.of(page.getPageNumber(), page.getPageSize(), DEFAULT_SORT);
29+
}
30+
return result;
31+
}
32+
2033
@Query("""
2134
SELECT e FROM #{#entityName} e
2235
WHERE ((:id IS NULL OR e.data.key.id LIKE :id)
@@ -67,11 +80,10 @@ WHERE e.data.status IN ( :status )
6780
""")
6881
@Modifying
6982
long deleteOlderThan(@Param("age") OffsetDateTime age);
70-
83+
7184
@Query("""
72-
SELECT e FROM #{#entityName} e
73-
WHERE e.data.correlationId = :correlationId
74-
ORDER BY e.data.createdTime ASC
85+
SELECT e FROM #{#entityName} e
86+
WHERE e.data.correlationId = :correlationId
7587
""")
76-
List<T> findByCorrelationId(@Param("correlationId") String correlationId);
88+
List<T> findByCorrelationId(@Param("correlationId") String correlationId, Pageable page);
7789
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ public Optional<TriggerEntity> updateRunAt(TriggerKey key, OffsetDateTime time)
193193
});
194194
}
195195

196-
public List<TriggerEntity> findTriggerByCorrelationId(String correlationId) {
197-
return readTrigger.findTriggerByCorrelationId(correlationId);
196+
public List<TriggerEntity> findTriggerByCorrelationId(String correlationId, Pageable page) {
197+
return readTrigger.findTriggerByCorrelationId(correlationId, page);
198198

199199
}
200200
}

0 commit comments

Comments
 (0)