diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/AbstractPartitionHandler.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/AbstractPartitionHandler.java index fbaeda9cbe..cb08bcbb91 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/AbstractPartitionHandler.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/AbstractPartitionHandler.java @@ -18,9 +18,11 @@ import java.util.Collection; import java.util.Set; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.step.StepExecution; import org.springframework.batch.core.partition.PartitionHandler; import org.springframework.batch.core.partition.StepExecutionSplitter; +import org.springframework.util.CollectionUtils; /** * Base {@link PartitionHandler} implementation providing common base features. Subclasses @@ -55,8 +57,25 @@ protected abstract Set doHandle(StepExecution managerStepExecutio public Collection handle(StepExecutionSplitter stepSplitter, final StepExecution managerStepExecution) throws Exception { final Set stepExecutions = stepSplitter.split(managerStepExecution, gridSize); + final Set alreadyCompleted = CollectionUtils.newHashSet(stepExecutions.size()); + final Set toBeExecuted = CollectionUtils.newHashSet(stepExecutions.size()); - return doHandle(managerStepExecution, stepExecutions); + for (StepExecution stepExecution : stepExecutions) { + if (stepExecution.getStatus() == BatchStatus.COMPLETED) { + alreadyCompleted.add(stepExecution); + } + else { + toBeExecuted.add(stepExecution); + } + } + + final Set updatedExecutions = CollectionUtils.newHashSet(stepExecutions.size()); + final Set handledExecutions = doHandle(managerStepExecution, toBeExecuted); + if (handledExecutions != null) { + updatedExecutions.addAll(handledExecutions); + } + updatedExecutions.addAll(alreadyCompleted); + return updatedExecutions; } /** diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java index 09bb10c53a..636095ad68 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java @@ -143,8 +143,13 @@ public Set split(StepExecution stepExecution, int gridSize) throw set.add(currentStepExecution); } else { // restart - if (lastStepExecution.getStatus() != BatchStatus.COMPLETED - && shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) { + if (lastStepExecution.getStatus() == BatchStatus.COMPLETED) { + // Keep completed partitions for aggregation on restart. + if (!isSameJobExecution(stepExecution, lastStepExecution)) { + set.add(lastStepExecution); + } + } + else if (shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) { StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution); currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); jobRepository.updateExecutionContext(currentStepExecution); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java index 9a69a3d823..91b7298a47 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -245,4 +246,55 @@ public void aggregate(StepExecution result, Collection executions assertEquals(true, stepExecution.getExecutionContext().get("aggregated")); } + @Test + void testRestartWhenAllPartitionsCompletedShouldAggregateCompletedPartitions() throws Exception { + AtomicInteger aggregateSizeOnRestart = new AtomicInteger(-1); + AtomicBoolean firstAggregation = new AtomicBoolean(true); + step.setStepExecutionAggregator((result, executions) -> { + if (firstAggregation.getAndSet(false)) { + return; + } + aggregateSizeOnRestart.set(executions.size()); + }); + + SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter(jobRepository, + step.getName(), new SimplePartitioner()); + step.setStepExecutionSplitter(stepExecutionSplitter); + step.setPartitionHandler((stepSplitter, managerExecution) -> { + Set executions = stepSplitter.split(managerExecution, 2); + for (StepExecution execution : executions) { + if (execution.getStatus() != BatchStatus.COMPLETED) { + execution.setStatus(BatchStatus.COMPLETED); + execution.setExitStatus(ExitStatus.COMPLETED); + jobRepository.update(execution); + } + } + return executions; + }); + + step.afterPropertiesSet(); + JobParameters jobParameters = new JobParameters(); + ExecutionContext executionContext = new ExecutionContext(); + JobInstance jobInstance = jobRepository.createJobInstance("remotePartitionRestartJob", jobParameters); + JobExecution firstJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, executionContext); + StepExecution firstManagerStepExecution = jobRepository.createStepExecution("manager", firstJobExecution); + step.execute(firstManagerStepExecution); + + firstManagerStepExecution.setStatus(BatchStatus.FAILED); + firstManagerStepExecution.setExitStatus(ExitStatus.FAILED); + jobRepository.update(firstManagerStepExecution); + firstJobExecution.setStatus(BatchStatus.FAILED); + firstJobExecution.setEndTime(LocalDateTime.now()); + jobRepository.update(firstJobExecution); + + JobExecution restartedJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, + executionContext); + StepExecution restartedManagerStepExecution = jobRepository.createStepExecution("manager", + restartedJobExecution); + step.execute(restartedManagerStepExecution); + + assertEquals(BatchStatus.COMPLETED, restartedManagerStepExecution.getStatus()); + assertEquals(2, aggregateSizeOnRestart.get()); + } + } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitterTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitterTests.java index 7a13062c52..c0f8c0a812 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitterTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitterTests.java @@ -119,6 +119,19 @@ void testRememberGridSize() throws Exception { assertEquals(2, provider.split(stepExecution, 3).size()); } + @Test + void testCompletedPartitionsAreReturnedForAggregationOnRestart() throws Exception { + SimpleStepExecutionSplitter provider = new SimpleStepExecutionSplitter(jobRepository, step.getName(), + new SimplePartitioner()); + Set split = provider.split(stepExecution, 2); + assertEquals(2, split.size()); + stepExecution = update(split, stepExecution, BatchStatus.COMPLETED, false); + + Set restartSplit = provider.split(stepExecution, 2); + assertEquals(2, restartSplit.size()); + assertTrue(restartSplit.stream().allMatch(execution -> execution.getStatus() == BatchStatus.COMPLETED)); + } + @Test void testRememberPartitionNames() throws Exception { class CustomPartitioner implements Partitioner, PartitionNameProvider { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/TaskExecutorPartitionHandlerTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/TaskExecutorPartitionHandlerTests.java index 4786c1cd53..681b72b6f9 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/TaskExecutorPartitionHandlerTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/TaskExecutorPartitionHandlerTests.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.job.JobExecution; import org.springframework.batch.core.job.JobExecutionException; @@ -128,4 +129,31 @@ void testTaskExecutorFailure() throws Exception { assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode()); } + @Test + void testCompletedStepExecutionsAreNotReExecutedButReturned() throws Exception { + StepExecutionSplitter splitter = new StepExecutionSplitter() { + @Override + public String getStepName() { + return stepExecution.getStepName(); + } + + @Override + public Set split(StepExecution managerStepExecution, int gridSize) { + Set split = new HashSet<>(); + StepExecution completed = new StepExecution(10L, "completed", managerStepExecution.getJobExecution()); + completed.setStatus(BatchStatus.COMPLETED); + completed.setExitStatus(ExitStatus.COMPLETED); + split.add(completed); + split.add(new StepExecution(11L, "started", managerStepExecution.getJobExecution())); + return split; + } + }; + + Collection executions = handler.handle(splitter, stepExecution); + + assertEquals(1, count); + assertEquals("[started]", stepExecutions.toString()); + assertEquals(2, executions.size()); + } + }