|
19 | 19 | import org.apache.commons.logging.LogFactory; |
20 | 20 | import org.jspecify.annotations.Nullable; |
21 | 21 |
|
22 | | -import org.springframework.batch.core.BatchStatus; |
23 | | -import org.springframework.batch.core.ExitStatus; |
24 | 22 | import org.springframework.batch.core.job.JobInterruptedException; |
25 | 23 | import org.springframework.batch.core.listener.ChunkListener; |
26 | 24 | import org.springframework.batch.core.listener.CompositeChunkListener; |
@@ -309,18 +307,8 @@ protected void close(ExecutionContext executionContext) throws Exception { |
309 | 307 |
|
310 | 308 | @Override |
311 | 309 | protected void doExecute(StepExecution stepExecution) throws Exception { |
312 | | - stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); |
313 | | - while (this.chunkTracker.moreItems()) { |
314 | | - // check interruption policy before processing next chunk |
315 | | - try { |
316 | | - this.interruptionPolicy.checkInterrupted(stepExecution); |
317 | | - } |
318 | | - catch (JobInterruptedException exception) { |
319 | | - stepExecution.setTerminateOnly(); |
320 | | - stepExecution.setStatus(BatchStatus.STOPPED); |
321 | | - stepExecution.setExitStatus(ExitStatus.STOPPED); |
322 | | - return; |
323 | | - } |
| 310 | + stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); |
| 311 | + while (this.chunkTracker.moreItems() && !interrupted(stepExecution)) { |
324 | 312 | // process next chunk in its own transaction |
325 | 313 | this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { |
326 | 314 | @Override |
@@ -355,6 +343,25 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { |
355 | 343 | } |
356 | 344 | } |
357 | 345 |
|
| 346 | + /* |
| 347 | + * Check if the step has been interrupted either internally via user defined policy or |
| 348 | + * externally via job operator. This will be checked at chunk boundaries. |
| 349 | + */ |
| 350 | + private boolean interrupted(StepExecution stepExecution) { |
| 351 | + // check internal interruption via user defined policy |
| 352 | + try { |
| 353 | + this.interruptionPolicy.checkInterrupted(stepExecution); |
| 354 | + } |
| 355 | + catch (JobInterruptedException exception) { |
| 356 | + return true; |
| 357 | + } |
| 358 | + // check external interruption via job operator |
| 359 | + if (stepExecution.isTerminateOnly()) { |
| 360 | + return true; |
| 361 | + } |
| 362 | + return false; |
| 363 | + } |
| 364 | + |
358 | 365 | private Chunk<I> read(StepContribution contribution) throws Exception { |
359 | 366 | Chunk<I> chunk = new Chunk<>(); |
360 | 367 | for (int i = 0; i < chunkSize; i++) { |
|
0 commit comments