Skip to content

Commit 6671b79

Browse files
committed
- timers can now be disabled
- Resume added
1 parent 2338211 commit 6671b79

File tree

18 files changed

+243
-53
lines changed

18 files changed

+243
-53
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/history/HistoryTimer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
import org.springframework.beans.factory.annotation.Value;
88
import org.springframework.scheduling.annotation.Scheduled;
99
import org.springframework.stereotype.Service;
10-
import org.sterl.spring.persistent_tasks.scheduler.config.ConditionalSchedulerServiceByProperty;
10+
import org.sterl.spring.persistent_tasks.shared.TimersEnabled;
1111

1212
import lombok.RequiredArgsConstructor;
1313
import lombok.Setter;
1414
import lombok.extern.slf4j.Slf4j;
1515

16-
@ConditionalSchedulerServiceByProperty
16+
@TimersEnabled
1717
@Service
1818
@RequiredArgsConstructor
1919
@Slf4j

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public SchedulerEntity getStatus() {
166166
}
167167

168168
@Transactional
169-
public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
169+
public List<TriggerEntity> rescheduleAbandonedTriggers(OffsetDateTime timeout) {
170170
var schedulers = editSchedulerStatus.findOnlineSchedulers(timeout);
171171

172172
final List<TriggerKey> runningKeys = this.taskExecutor.getRunningTriggers().stream().map(TriggerEntity::getKey)
@@ -175,7 +175,7 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
175175
int running = triggerService.markTriggersAsRunning(runningKeys, name);
176176
log.atLevel(running > 0 ? Level.INFO : Level.DEBUG).log("({}) - {} trigger(s) are running on {} schedulers",
177177
running, runningKeys, schedulers);
178-
return triggerService.rescheduleAbandonedTasks(timeout);
178+
return triggerService.rescheduleAbandoned(timeout);
179179
}
180180

181181
public List<SchedulerEntity> listAll() {

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerTimer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,23 @@
99
import org.springframework.scheduling.annotation.Scheduled;
1010
import org.springframework.stereotype.Service;
1111
import org.sterl.spring.persistent_tasks.scheduler.config.ConditionalSchedulerServiceByProperty;
12+
import org.sterl.spring.persistent_tasks.shared.TimersEnabled;
1213

1314
import lombok.RequiredArgsConstructor;
1415
import lombok.extern.slf4j.Slf4j;
1516

17+
@TimersEnabled
1618
@ConditionalSchedulerServiceByProperty
1719
@Service
1820
@RequiredArgsConstructor
1921
@Slf4j
2022
class SchedulerTimer {
2123

22-
@Value("${spring.persistent-tasks.task-timeout:PT5M}")
24+
@Value("${spring.persistent-tasks.trigger-timeout:PT5M}")
2325
private Duration taskTimeout = Duration.ofMinutes(5);
2426
private final Collection<SchedulerService> schedulerServices;
2527

26-
@Scheduled(fixedDelayString = "${spring.persistent-tasks.poll-rate:30}", timeUnit = TimeUnit.SECONDS)
28+
@Scheduled(fixedDelayString = "${spring.persistent-tasks.poll-rate:60}", timeUnit = TimeUnit.SECONDS)
2729
void triggerNextTasks() {
2830
for (SchedulerService s : schedulerServices) {
2931
try {
@@ -35,12 +37,12 @@ void triggerNextTasks() {
3537
}
3638
}
3739

38-
@Scheduled(fixedDelayString = "${spring.persistent-tasks.poll-persistentTask-timeout:300}", timeUnit = TimeUnit.SECONDS)
40+
@Scheduled(fixedDelayString = "${spring.persistent-tasks.poll-abandoned-triggers:300}", timeUnit = TimeUnit.SECONDS)
3941
void rescheduleAbandonedTasks() {
4042
var timeout = OffsetDateTime.now().minus(taskTimeout);
4143
for (SchedulerService s : schedulerServices) {
4244
try {
43-
final var count = s.rescheduleAbandonedTasks(timeout);
45+
final var count = s.rescheduleAbandonedTriggers(timeout);
4446
log.debug("Found {} abandoned tasks for {}. Timeout={}",
4547
count.size(), s.getName(), timeout);
4648
} catch (Exception e) {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.sterl.spring.persistent_tasks.shared;
2+
3+
import java.lang.annotation.Documented;
4+
import java.lang.annotation.ElementType;
5+
import java.lang.annotation.Retention;
6+
import java.lang.annotation.RetentionPolicy;
7+
import java.lang.annotation.Target;
8+
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10+
11+
@ConditionalOnProperty(name = "spring.persistent-tasks.timers-enabled", havingValue = "true", matchIfMissing = true)
12+
@Retention(RetentionPolicy.RUNTIME)
13+
@Target({ ElementType.TYPE, ElementType.METHOD })
14+
@Documented
15+
public @interface TimersEnabled {
16+
17+
}

core/src/main/java/org/sterl/spring/persistent_tasks/shared/repository/TriggerDataRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.sterl.spring.persistent_tasks.shared.repository;
22

33
import java.time.OffsetDateTime;
4+
import java.util.Optional;
45
import java.util.Set;
56

67
import org.springframework.data.domain.Page;
@@ -19,6 +20,7 @@
1920
import org.sterl.spring.persistent_tasks.shared.StringHelper;
2021
import org.sterl.spring.persistent_tasks.shared.model.HasTriggerData;
2122
import org.sterl.spring.persistent_tasks.shared.model.QTriggerData;
23+
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2224

2325
import com.querydsl.core.BooleanBuilder;
2426
import com.querydsl.core.types.ExpressionUtils;

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/TriggerService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import org.springframework.lang.Nullable;
1212
import org.springframework.transaction.annotation.Propagation;
1313
import org.springframework.transaction.annotation.Transactional;
14-
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
1514
import org.sterl.spring.persistent_tasks.api.TaskId;
1615
import org.sterl.spring.persistent_tasks.api.TriggerKey;
16+
import org.sterl.spring.persistent_tasks.api.TriggerRequest;
1717
import org.sterl.spring.persistent_tasks.api.TriggerSearch;
1818
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
1919
import org.sterl.spring.persistent_tasks.shared.stereotype.TransactionalService;
@@ -185,7 +185,7 @@ public long countTriggers(@Nullable TriggerStatus status) {
185185
*
186186
* Retry will be triggered based on the set strategy.
187187
*/
188-
public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
188+
public List<TriggerEntity> rescheduleAbandoned(OffsetDateTime timeout) {
189189
final List<TriggerEntity> result = readTrigger.findTriggersLastPingAfter(
190190
timeout);
191191
final var e = new IllegalStateException("Trigger abandoned - timeout: " + timeout);
@@ -197,14 +197,21 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
197197
log.debug("rescheduled {} triggers", result.size());
198198
return result;
199199
}
200+
201+
public List<TriggerEntity> expireTimeoutTriggers() {
202+
return readTrigger.findTriggersTimeoutOut(20)
203+
.stream()
204+
.map(editTrigger::expireTrigger)
205+
.toList();
206+
}
200207

201208
public long countTriggers() {
202209
return readTrigger.countByStatus(null);
203210
}
204211

205212
public Optional<TriggerEntity> updateRunAt(TriggerKey key, OffsetDateTime time) {
206213
return readTrigger.get(key).map(t -> {
207-
if (t.getData().getStatus() != TriggerStatus.WAITING) {
214+
if (t.getData().getStatus() == TriggerStatus.RUNNING) {
208215
throw new IllegalStateException("Cannot update status of " + key
209216
+ " as the current status is: " + t.getData().getStatus());
210217
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.sterl.spring.persistent_tasks.trigger;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import org.springframework.scheduling.annotation.Scheduled;
6+
import org.springframework.stereotype.Component;
7+
import org.sterl.spring.persistent_tasks.shared.TimersEnabled;
8+
9+
import lombok.RequiredArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
12+
@TimersEnabled
13+
@Component
14+
@RequiredArgsConstructor
15+
@Slf4j
16+
public class TriggerTimer {
17+
18+
private final TriggerService triggerService;
19+
20+
@Scheduled(fixedDelayString = "${spring.persistent-tasks.poll-awaiting-trigger-timeout:300}", timeUnit = TimeUnit.SECONDS)
21+
void checkAwaitingTriggersForTimeout() {
22+
int expired = triggerService.expireTimeoutTriggers().size();
23+
if (expired > 0) log.info("{} triggers have not received the signal, wait expired!", expired);
24+
else log.debug("No expired triggers found.");
25+
}
26+
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.sterl.spring.persistent_tasks.api.TriggerStatus;
2020
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
2121
import org.sterl.spring.persistent_tasks.trigger.event.TriggerCanceledEvent;
22+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerExpiredEvent;
2223
import org.sterl.spring.persistent_tasks.trigger.event.TriggerFailedEvent;
24+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerResumedEvent;
2325
import org.sterl.spring.persistent_tasks.trigger.event.TriggerRunningEvent;
2426
import org.sterl.spring.persistent_tasks.trigger.event.TriggerSuccessEvent;
2527
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
@@ -88,14 +90,17 @@ public Optional<TriggerEntity> failTrigger(
8890
public Optional<TriggerEntity> cancelTask(TriggerKey id, Exception e) {
8991
return triggerRepository //
9092
.findByKey(id) //
91-
.map(t -> cancelTask(t, e));
93+
.map(t -> cancelTrigger(t, e));
9294
}
9395

94-
private TriggerEntity cancelTask(TriggerEntity t, Exception e) {
96+
private TriggerEntity cancelTrigger(TriggerEntity t, Exception e) {
9597
t.cancel(e);
98+
9699
publisher.publishEvent(new TriggerCanceledEvent(
97-
t.getId(), t.copyData(),
100+
t.getId(),
101+
t.copyData(),
98102
stateSerializer.deserializeOrNull(t.getData().getState())));
103+
99104
triggerRepository.delete(t);
100105
return t;
101106
}
@@ -121,16 +126,18 @@ public <T extends Serializable> TriggerEntity addTrigger(TriggerRequest<T> tigge
121126
result = triggerRepository.save(result);
122127
log.debug("Added trigger={}", result);
123128
}
129+
124130
publisher.publishEvent(new TriggerAddedEvent(
125131
result.getId(), result.copyData(), tigger.state()));
132+
126133
return result;
127134
}
128135

129136
public Page<TriggerEntity> resume(TriggerRequest<?> trigger) {
130137
var search = TriggerSearch.forTriggerRequest(trigger);
131138
search.setStatus(TriggerStatus.AWAITING_SIGNAL);
132139

133-
var foundTriggers = readTrigger.searchTriggers(TriggerSearch.byCorrelationId(trigger.correlationId()), Pageable.ofSize(100));
140+
var foundTriggers = readTrigger.searchTriggers(search, Pageable.ofSize(100));
134141

135142
log.debug("Resuming {} triggers for given data {}", foundTriggers.getSize(), trigger);
136143
foundTriggers.forEach(t -> {
@@ -140,9 +147,23 @@ public Page<TriggerEntity> resume(TriggerRequest<?> trigger) {
140147

141148
t.setData(newData.getData());
142149
t.runAt(trigger.runtAt());
150+
151+
publisher.publishEvent(new TriggerResumedEvent(t.getId(), t.copyData(), trigger.state()));
143152
});
144153
return foundTriggers;
145154
}
155+
156+
public TriggerEntity expireTrigger(TriggerEntity t) {
157+
t.getData().setStatus(TriggerStatus.EXPIRED_SIGNAL);
158+
t.getData().setStart(null);
159+
t.getData().setEnd(null);
160+
t.getData().updateRunningDuration();
161+
162+
publisher.publishEvent(new TriggerExpiredEvent(
163+
t.getId(), t.copyData(),
164+
stateSerializer.deserializeOrNull(t.getData().getState())));
165+
return t;
166+
}
146167

147168
@NonNull
148169
public <T extends Serializable> List<TriggerEntity> addTriggers(Collection<TriggerRequest<T>> newTriggers) {

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/ReadTriggerComponent.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,11 @@ public Page<TriggerEntity> listTriggers(TaskId<? extends Serializable> task, Pag
6767
if (task == null) return triggerRepository.findAll(page);
6868
return triggerRepository.findAll(task.name(), page);
6969
}
70+
71+
public List<TriggerEntity> findTriggersTimeoutOut(int max) {
72+
return triggerRepository.findByStatusAndRunAtAfter(
73+
TriggerStatus.AWAITING_SIGNAL,
74+
OffsetDateTime.now(),
75+
Pageable.ofSize(max));
76+
}
7077
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.sterl.spring.persistent_tasks.trigger.event;
2+
3+
import java.io.Serializable;
4+
5+
import org.sterl.spring.persistent_tasks.shared.model.TriggerData;
6+
7+
/**
8+
* Fired if a new trigger is added.
9+
* <p>
10+
* Inside a transaction, it is save to join or listen for the <code>AFTER_COMMIT</code>
11+
* </p>
12+
*/
13+
public record TriggerExpiredEvent(long id, TriggerData data, Serializable state) implements TriggerLifeCycleEvent {
14+
15+
@Override
16+
public boolean isDone() {
17+
return false;
18+
}
19+
}

0 commit comments

Comments
 (0)