Skip to content

Commit 57f0399

Browse files
committed
scheduler service leaves current transaction before executing task
1 parent 3afd730 commit 57f0399

File tree

7 files changed

+74
-38
lines changed

7 files changed

+74
-38
lines changed

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerService.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@
55
import java.util.Collections;
66
import java.util.List;
77
import java.util.Optional;
8-
import java.util.concurrent.CompletableFuture;
98
import java.util.concurrent.Future;
109

1110
import org.springframework.lang.NonNull;
1211
import org.springframework.transaction.annotation.Transactional;
12+
import org.springframework.transaction.event.TransactionPhase;
13+
import org.springframework.transaction.event.TransactionalEventListener;
1314
import org.springframework.transaction.support.TransactionTemplate;
1415
import org.sterl.spring.persistent_tasks.api.AddTriggerRequest;
1516
import org.sterl.spring.persistent_tasks.api.TriggerKey;
1617
import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent;
1718
import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent;
1819
import org.sterl.spring.persistent_tasks.scheduler.entity.SchedulerEntity;
19-
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2020
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
21+
import org.sterl.spring.persistent_tasks.trigger.event.TriggerAddedEvent;
2122
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
2223

2324
import jakarta.annotation.PostConstruct;
@@ -100,6 +101,9 @@ public List<Future<TriggerKey>> triggerNextTasks() {
100101
/**
101102
* Like {@link #triggerNextTasks()} but allows to set the time e.g. to the future to trigger
102103
* tasks which wouldn't be triggered now.
104+
* <p>
105+
* This method should not be called in a transaction!
106+
* </p>
103107
*/
104108
@NonNull
105109
public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
@@ -123,29 +127,30 @@ public List<Future<TriggerKey>> triggerNextTasks(OffsetDateTime timeDue) {
123127
* and the runAt time is not in the future.
124128
* @return the reference to the {@link Future} with the key, if no threads are available it is resolved
125129
*/
126-
public <T extends Serializable> Future<TriggerKey> runOrQueue(
127-
AddTriggerRequest<T> triggerRequest) {
128-
final var runningTrigger = trx.execute(t -> {
129-
var trigger = triggerService.queue(triggerRequest);
130-
// exit now if this trigger is for the future ...
131-
if (trigger.shouldRunInFuture()) return trigger;
132-
130+
@Transactional(timeout = 10)
131+
public <T extends Serializable> TriggerKey runOrQueue(
132+
AddTriggerRequest<T> triggerRequest) {
133+
var trigger = triggerService.queue(triggerRequest);
134+
135+
if (!trigger.shouldRunInFuture()) {
133136
if (taskExecutor.getFreeThreads() > 0) {
134137
trigger = triggerService.markTriggersAsRunning(trigger, name);
135138
pingRegistry().addRunning(1);
136139
} else {
137140
log.debug("Currently not enough free thread available {} of {} in use. PersistentTask {} queued.",
138141
taskExecutor.getFreeThreads(), taskExecutor.getMaxThreads(), trigger.getKey());
139142
}
140-
return trigger;
141-
});
142-
Future<TriggerKey> result;
143-
if (runningTrigger.isRunning()) {
144-
result = taskExecutor.submit(runningTrigger);
145-
} else {
146-
result = CompletableFuture.completedFuture(runningTrigger.getKey());
147143
}
148-
return result;
144+
// we will listen for the commit event to execute this trigger ...
145+
return trigger.getKey();
146+
}
147+
148+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
149+
void checkIfTrigerIsRunning(TriggerAddedEvent addedTrigger) {
150+
if (addedTrigger.isRunningOn(name) && !taskExecutor.isRunning(addedTrigger.trigger())) {
151+
log.debug("New triger added for imidiate execution {}", addedTrigger.key());
152+
taskExecutor.submit(addedTrigger.trigger());
153+
}
149154
}
150155

151156
public SchedulerEntity getStatus() {
@@ -166,16 +171,4 @@ public List<TriggerEntity> rescheduleAbandonedTasks(OffsetDateTime timeout) {
166171
running, runningKeys, schedulers);
167172
return triggerService.rescheduleAbandonedTasks(timeout);
168173
}
169-
170-
/**
171-
* Adds or updates an existing trigger based on its {@link TriggerKey}
172-
*
173-
* @param <T> the state type
174-
* @param trigger the {@link AddTriggerRequest} to save
175-
* @return the saved {@link TriggerEntity}
176-
* @throws IllegalStateException if the trigger already exists and is {@link TriggerStatus#RUNNING}
177-
*/
178-
public <T extends Serializable> TriggerEntity queue(AddTriggerRequest<T> trigger) {
179-
return triggerService.queue(trigger);
180-
}
181174
}

core/src/main/java/org/sterl/spring/persistent_tasks/scheduler/component/TaskExecutorComponent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,8 @@ public void setMaxThreads(int value) {
151151
public int getMaxThreads() {
152152
return this.maxThreads.get();
153153
}
154+
155+
public boolean isRunning(TriggerEntity trigger) {
156+
return runningTasks.contains(trigger);
157+
}
154158
}

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/component/EditTriggerComponent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ public <T extends Serializable> TriggerEntity addTrigger(AddTriggerRequest<T> ti
9393
} else {
9494
result = triggerRepository.save(result);
9595
log.debug("Added trigger={}", result);
96-
publisher.publishEvent(new TriggerAddedEvent(result, tigger.state()));
9796
}
97+
publisher.publishEvent(new TriggerAddedEvent(result, tigger.state()));
9898
return result;
9999
}
100100

core/src/main/java/org/sterl/spring/persistent_tasks/trigger/event/TriggerAddedEvent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@
66

77
public record TriggerAddedEvent(TriggerEntity trigger, Serializable state) implements TriggerLifeCycleEvent {
88

9+
public boolean isRunningOn(String name) {
10+
return trigger.isRunning() && name != null && name.equals(trigger.getRunningOn());
11+
}
12+
913
}

core/src/test/java/org/sterl/spring/persistent_tasks/AbstractSpringTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.net.UnknownHostException;
44
import java.time.Duration;
55
import java.util.Optional;
6+
import java.util.concurrent.TimeoutException;
67

78
import org.junit.jupiter.api.AfterEach;
89
import org.junit.jupiter.api.BeforeEach;
@@ -24,6 +25,7 @@
2425
import org.sterl.spring.persistent_tasks.scheduler.SchedulerService;
2526
import org.sterl.spring.persistent_tasks.scheduler.component.EditSchedulerStatusComponent;
2627
import org.sterl.spring.persistent_tasks.scheduler.component.TaskExecutorComponent;
28+
import org.sterl.spring.persistent_tasks.shared.model.TriggerStatus;
2729
import org.sterl.spring.persistent_tasks.task.TaskService;
2830
import org.sterl.spring.persistent_tasks.trigger.TriggerService;
2931
import org.sterl.spring.persistent_tasks.trigger.model.TriggerEntity;
@@ -160,6 +162,16 @@ PersistentTask<Long> slowTask(AsyncAsserts asserts) {
160162
protected Optional<TriggerEntity> runNextTrigger() {
161163
return triggerService.run(triggerService.lockNextTrigger("test"));
162164
}
165+
166+
protected void awaitRunningTasks() throws TimeoutException, InterruptedException {
167+
final long start = System.currentTimeMillis();
168+
if (triggerService.countTriggers(TriggerStatus.RUNNING) > 0) {
169+
if (System.currentTimeMillis() - start > 2000) {
170+
throw new TimeoutException("Still running after 2s");
171+
}
172+
Thread.sleep(50);
173+
}
174+
}
163175

164176
@BeforeEach
165177
public void beforeEach() throws Exception {

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.time.Duration;
66
import java.time.OffsetDateTime;
77
import java.util.concurrent.Future;
8+
import java.util.concurrent.TimeoutException;
89

910
import org.junit.jupiter.api.BeforeEach;
1011
import org.junit.jupiter.api.Test;
@@ -94,16 +95,18 @@ void testRunOrQueue() throws Exception {
9495

9596
// WHEN
9697
var ref = subject.runOrQueue(triggerRequest);
97-
98+
9899
// THEN
99100
assertThat(subject.getScheduler().getRunnungTasks()).isOne();
100-
assertThat(persistentTaskService.getLastTriggerData(
101-
ref.get()).get().getStatus()).isEqualTo(TriggerStatus.SUCCESS);
101+
// AND
102+
awaitRunningTasks();
103+
assertThat(persistentTaskService.getLastTriggerData(ref).get().getStatus())
104+
.isEqualTo(TriggerStatus.SUCCESS);
102105
asserts.assertValue(Task3.NAME + "::Hallo");
103106
}
104107

105108
@Test
106-
void testQueuedInFuture() {
109+
void testQueuedInFuture() throws TimeoutException, InterruptedException {
107110
// GIVEN
108111
final AddTriggerRequest<String> triggerRequest = Task3.ID
109112
.newTrigger("Hallo")
@@ -113,6 +116,7 @@ void testQueuedInFuture() {
113116

114117
// WHEN
115118
persistentTaskService.executeTriggersAndWait();
119+
awaitRunningTasks();
116120

117121
// THEN
118122
asserts.assertMissing(Task3.NAME + "::Hallo");
@@ -124,7 +128,7 @@ void runSimpleTaskMultipleTimesTest() throws Exception {
124128
// GIVEN
125129
TaskId<String> taskId = taskService.replace("foo", c -> asserts.info(c));
126130
for (int i = 1; i < 21; ++i) {
127-
subject.queue(taskId.newTrigger(i + " state").build());
131+
triggerService.queue(taskId.newTrigger(i + " state").build());
128132
}
129133

130134
// WHEN

core/src/test/java/org/sterl/spring/persistent_tasks/scheduler/SchedulerServiceTransactionTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,24 @@ void testFailTrxCount() throws Exception {
122122
// third to write the history
123123
hibernateAsserts.assertTrxCount(3);
124124
}
125+
126+
@Test
127+
void testRunOrQueue() throws Exception {
128+
// GIVEN
129+
var k1 = subject.runOrQueue(TaskTriggerBuilder.newTrigger("savePersonInTrx").state("Paul").build());
130+
var k2 = subject.runOrQueue(TaskTriggerBuilder.newTrigger("savePersonInTrx").state("Paul").build());
131+
132+
// WHEN
133+
assertThat(persistentTaskService.getLastTriggerData(k1).get().getStatus())
134+
.isEqualTo(TriggerStatus.RUNNING);
135+
assertThat(persistentTaskService.getLastTriggerData(k2).get().getStatus())
136+
.isEqualTo(TriggerStatus.RUNNING);
137+
138+
139+
// THEN
140+
awaitRunningTasks();
141+
assertThat(personRepository.count()).isEqualTo(2);
142+
}
125143

126144
@Test
127145
void testRollbackAndRetry() throws Exception {
@@ -131,9 +149,10 @@ void testRollbackAndRetry() throws Exception {
131149

132150
// WHEN
133151
var key = subject.runOrQueue(triggerRequest);
152+
134153
// THEN
135-
key.get();
136-
assertThat(persistentTaskService.getLastTriggerData(key.get()).get().getStatus())
154+
awaitRunningTasks();
155+
assertThat(persistentTaskService.getLastTriggerData(key).get().getStatus())
137156
.isEqualTo(TriggerStatus.WAITING);
138157

139158
// WHEN
@@ -142,7 +161,7 @@ void testRollbackAndRetry() throws Exception {
142161

143162
// THEN
144163
assertThat(executed).hasSize(1);
145-
assertExecutionCount(key.get(), 2);
164+
assertExecutionCount(key, 2);
146165
assertThat(personRepository.count()).isOne();
147166
}
148167

0 commit comments

Comments
 (0)