Skip to content

Commit 9906cd3

Browse files
committed
A non transactional exclusive job should be unlocked after the execution is done and in the same transaction as the deletion
1 parent 0d94dd4 commit 9906cd3

File tree

2 files changed

+68
-4
lines changed

2 files changed

+68
-4
lines changed

modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/NonTransactionalJobHandlerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,24 @@
1414

1515
import static org.assertj.core.api.Assertions.assertThat;
1616

17+
import java.util.Date;
1718
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicReference;
1820

1921
import org.flowable.common.engine.impl.cfg.TransactionContext;
2022
import org.flowable.common.engine.impl.interceptor.CommandContext;
2123
import org.flowable.engine.impl.context.Context;
24+
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
2225
import org.flowable.engine.impl.test.JobTestHelper;
2326
import org.flowable.engine.impl.test.PluggableFlowableTestCase;
2427
import org.flowable.engine.impl.util.CommandContextUtil;
28+
import org.flowable.engine.runtime.ProcessInstance;
29+
import org.flowable.engine.test.Deployment;
2530
import org.flowable.job.api.Job;
2631
import org.flowable.job.service.JobService;
2732
import org.flowable.job.service.impl.nontx.NonTransactionalJobHandler;
2833
import org.flowable.job.service.impl.persistence.entity.JobEntity;
34+
import org.flowable.task.api.Task;
2935
import org.junit.jupiter.api.AfterEach;
3036
import org.junit.jupiter.api.BeforeEach;
3137
import org.junit.jupiter.api.Test;
@@ -83,6 +89,54 @@ public void testJobExecutedWithoutTransaction() {
8389
assertThat(nonTransactionalTestJobHandler.nonTransactionalCounter).hasValue(1);
8490
}
8591

92+
@Test
93+
@Deployment(resources = "org/flowable/engine/test/api/oneTaskProcess.bpmn20.xml")
94+
public void testAsyncExclusiveJob() {
95+
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
96+
.processDefinitionKey("oneTaskProcess")
97+
.start();
98+
Task task = taskService.createTaskQuery()
99+
.processInstanceId(processInstance.getId())
100+
.singleResult();
101+
managementService.executeCommand(commandContext -> {
102+
JobService jobService = CommandContextUtil.getJobService();
103+
JobEntity job = jobService.createJob();
104+
job.setProcessInstanceId(processInstance.getId());
105+
job.setExecutionId(task.getExecutionId());
106+
job.setJobHandlerType(nonTransactionalTestJobHandler.getType());
107+
job.setJobHandlerConfiguration("myTest");
108+
jobService.createAsyncJob(job, true);
109+
jobService.scheduleAsyncJob(job);
110+
return null;
111+
});
112+
113+
AtomicReference<String> processLockOwner = new AtomicReference<>();
114+
AtomicReference<Date> processLockTime = new AtomicReference<>();
115+
nonTransactionalTestJobHandler.nonTransactionalRunnable = () -> {
116+
ExecutionEntity executionEntity = (ExecutionEntity) runtimeService.createProcessInstanceQuery()
117+
.processInstanceId(processInstance.getId())
118+
.singleResult();
119+
processLockOwner.set(executionEntity.getLockOwner());
120+
processLockTime.set(executionEntity.getLockTime());
121+
};
122+
123+
JobTestHelper.waitForJobExecutorOnCondition(processEngineConfiguration, 10000L, 20L,
124+
() -> managementService.createJobQuery().count() == 0);
125+
126+
assertThat(processLockOwner).hasValue(processEngineConfiguration.getAsyncExecutor().getLockOwner());
127+
assertThat(processLockTime).doesNotHaveNullValue();
128+
129+
managementService.executeCommand(commandContext -> {
130+
ExecutionEntity executionEntity = (ExecutionEntity) runtimeService.createProcessInstanceQuery()
131+
.processInstanceId(processInstance.getId())
132+
.singleResult();
133+
134+
assertThat(executionEntity.getLockOwner()).isNull();
135+
assertThat(executionEntity.getLockTime()).isNull();
136+
return null;
137+
});
138+
}
139+
86140
@Test
87141
public void testJobExecutedWithoutTransactionThrowsException() {
88142
managementService.executeCommand(commandContext -> {
@@ -125,6 +179,8 @@ public static class NonTransactionalTestJobHandler implements NonTransactionalJo
125179
protected AtomicInteger withoutTransactionCounter = new AtomicInteger(0);
126180
protected AtomicInteger nonTransactionalCounter = new AtomicInteger(0);
127181

182+
protected Runnable nonTransactionalRunnable;
183+
128184
protected String jobConfiguration;
129185
protected String nonTransactionalOutput;
130186

@@ -154,6 +210,9 @@ public String executeNonTransactionally(JobEntity job, String configuration) {
154210
}
155211

156212
this.jobConfiguration = job.getJobHandlerConfiguration();
213+
if (nonTransactionalRunnable != null) {
214+
nonTransactionalRunnable.run();
215+
}
157216
return jobConfiguration;
158217

159218
}
@@ -189,6 +248,7 @@ public String getJobConfiguration() {
189248
}
190249

191250
public void reset() {
251+
this.nonTransactionalRunnable = null;
192252
this.counter.set(0);
193253
this.withCommandContext.set(0);
194254
this.withoutCommandContext.set(0);

modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/ExecuteAsyncRunnable.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,6 @@ protected boolean handleNontransactionalJob(NonTransactionalJobHandler<Object> j
182182

183183
JobProcessorUtil.callJobProcessors(jobServiceConfiguration, JobProcessorContext.Phase.BEFORE_EXECUTE, (JobEntity) job);
184184

185-
if (unlock) {
186-
jobServiceConfiguration.getCommandExecutor().execute(new UnlockExclusiveJobCmd((Job) job, jobServiceConfiguration));
187-
}
188-
189185
// If an exception is thrown during job handler exception, it goes up and will be caught in the general exception handling.
190186
// The delete at the end won't happen in that case.
191187

@@ -200,6 +196,14 @@ protected boolean handleNontransactionalJob(NonTransactionalJobHandler<Object> j
200196
// The delete still needs to happen in a new transaction
201197
jobServiceConfiguration.getCommandExecutor().execute(commandContext -> {
202198
jobHandler.afterExecute((JobEntity) job, job.getJobHandlerConfiguration(), nonTransactionalOutput, commandContext);
199+
200+
if (unlock) {
201+
// Part of the same transaction to avoid a race condition with the
202+
// potentially new jobs (wrt process instance locking) that are created
203+
// during the execution of the original job
204+
new UnlockExclusiveJobCmd((Job) job, jobServiceConfiguration).execute(commandContext);
205+
}
206+
203207
jobServiceConfiguration.getJobEntityManager().delete((JobEntity) job);
204208
return null;
205209
});

0 commit comments

Comments
 (0)