Skip to content

Commit 2e63c8c

Browse files
committed
Fix restart aggregation for completed remote partitions
Signed-off-by: MJY <jmoon0227@gmail.com>
1 parent b24a85e commit 2e63c8c

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/AbstractPartitionHandler.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import java.util.Collection;
1919
import java.util.Set;
2020

21+
import org.springframework.batch.core.BatchStatus;
2122
import org.springframework.batch.core.step.StepExecution;
2223
import org.springframework.batch.core.partition.PartitionHandler;
2324
import org.springframework.batch.core.partition.StepExecutionSplitter;
25+
import org.springframework.util.CollectionUtils;
2426

2527
/**
2628
* Base {@link PartitionHandler} implementation providing common base features. Subclasses
@@ -55,8 +57,25 @@ protected abstract Set<StepExecution> doHandle(StepExecution managerStepExecutio
5557
public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
5658
final StepExecution managerStepExecution) throws Exception {
5759
final Set<StepExecution> stepExecutions = stepSplitter.split(managerStepExecution, gridSize);
60+
final Set<StepExecution> alreadyCompleted = CollectionUtils.newHashSet(stepExecutions.size());
61+
final Set<StepExecution> toBeExecuted = CollectionUtils.newHashSet(stepExecutions.size());
5862

59-
return doHandle(managerStepExecution, stepExecutions);
63+
for (StepExecution stepExecution : stepExecutions) {
64+
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
65+
alreadyCompleted.add(stepExecution);
66+
}
67+
else {
68+
toBeExecuted.add(stepExecution);
69+
}
70+
}
71+
72+
final Set<StepExecution> updatedExecutions = CollectionUtils.newHashSet(stepExecutions.size());
73+
final Set<StepExecution> handledExecutions = doHandle(managerStepExecution, toBeExecuted);
74+
if (handledExecutions != null) {
75+
updatedExecutions.addAll(handledExecutions);
76+
}
77+
updatedExecutions.addAll(alreadyCompleted);
78+
return updatedExecutions;
6079
}
6180

6281
/**

spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,13 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw
143143
set.add(currentStepExecution);
144144
}
145145
else { // restart
146-
if (lastStepExecution.getStatus() != BatchStatus.COMPLETED
147-
&& shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) {
146+
if (lastStepExecution.getStatus() == BatchStatus.COMPLETED) {
147+
// Keep completed partitions for aggregation on restart.
148+
if (!isSameJobExecution(stepExecution, lastStepExecution)) {
149+
set.add(lastStepExecution);
150+
}
151+
}
152+
else if (shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) {
148153
StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution);
149154
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
150155
jobRepository.updateExecutionContext(currentStepExecution);

spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.Set;
2424
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.junit.jupiter.api.BeforeEach;
2728
import org.junit.jupiter.api.Test;
@@ -245,4 +246,55 @@ public void aggregate(StepExecution result, Collection<StepExecution> executions
245246
assertEquals(true, stepExecution.getExecutionContext().get("aggregated"));
246247
}
247248

249+
@Test
250+
void testRestartWhenAllPartitionsCompletedShouldAggregateCompletedPartitions() throws Exception {
251+
AtomicInteger aggregateSizeOnRestart = new AtomicInteger(-1);
252+
AtomicBoolean firstAggregation = new AtomicBoolean(true);
253+
step.setStepExecutionAggregator((result, executions) -> {
254+
if (firstAggregation.getAndSet(false)) {
255+
return;
256+
}
257+
aggregateSizeOnRestart.set(executions.size());
258+
});
259+
260+
SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter(jobRepository,
261+
step.getName(), new SimplePartitioner());
262+
step.setStepExecutionSplitter(stepExecutionSplitter);
263+
step.setPartitionHandler((stepSplitter, managerExecution) -> {
264+
Set<StepExecution> executions = stepSplitter.split(managerExecution, 2);
265+
for (StepExecution execution : executions) {
266+
if (execution.getStatus() != BatchStatus.COMPLETED) {
267+
execution.setStatus(BatchStatus.COMPLETED);
268+
execution.setExitStatus(ExitStatus.COMPLETED);
269+
jobRepository.update(execution);
270+
}
271+
}
272+
return executions;
273+
});
274+
275+
step.afterPropertiesSet();
276+
JobParameters jobParameters = new JobParameters();
277+
ExecutionContext executionContext = new ExecutionContext();
278+
JobInstance jobInstance = jobRepository.createJobInstance("remotePartitionRestartJob", jobParameters);
279+
JobExecution firstJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, executionContext);
280+
StepExecution firstManagerStepExecution = jobRepository.createStepExecution("manager", firstJobExecution);
281+
step.execute(firstManagerStepExecution);
282+
283+
firstManagerStepExecution.setStatus(BatchStatus.FAILED);
284+
firstManagerStepExecution.setExitStatus(ExitStatus.FAILED);
285+
jobRepository.update(firstManagerStepExecution);
286+
firstJobExecution.setStatus(BatchStatus.FAILED);
287+
firstJobExecution.setEndTime(LocalDateTime.now());
288+
jobRepository.update(firstJobExecution);
289+
290+
JobExecution restartedJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters,
291+
executionContext);
292+
StepExecution restartedManagerStepExecution = jobRepository.createStepExecution("manager",
293+
restartedJobExecution);
294+
step.execute(restartedManagerStepExecution);
295+
296+
assertEquals(BatchStatus.COMPLETED, restartedManagerStepExecution.getStatus());
297+
assertEquals(2, aggregateSizeOnRestart.get());
298+
}
299+
248300
}

spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitterTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,19 @@ void testRememberGridSize() throws Exception {
119119
assertEquals(2, provider.split(stepExecution, 3).size());
120120
}
121121

122+
@Test
123+
void testCompletedPartitionsAreReturnedForAggregationOnRestart() throws Exception {
124+
SimpleStepExecutionSplitter provider = new SimpleStepExecutionSplitter(jobRepository, step.getName(),
125+
new SimplePartitioner());
126+
Set<StepExecution> split = provider.split(stepExecution, 2);
127+
assertEquals(2, split.size());
128+
stepExecution = update(split, stepExecution, BatchStatus.COMPLETED, false);
129+
130+
Set<StepExecution> restartSplit = provider.split(stepExecution, 2);
131+
assertEquals(2, restartSplit.size());
132+
assertTrue(restartSplit.stream().allMatch(execution -> execution.getStatus() == BatchStatus.COMPLETED));
133+
}
134+
122135
@Test
123136
void testRememberPartitionNames() throws Exception {
124137
class CustomPartitioner implements Partitioner, PartitionNameProvider {

spring-batch-core/src/test/java/org/springframework/batch/core/partition/support/TaskExecutorPartitionHandlerTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.junit.jupiter.api.BeforeEach;
2828
import org.junit.jupiter.api.Test;
29+
import org.springframework.batch.core.BatchStatus;
2930
import org.springframework.batch.core.ExitStatus;
3031
import org.springframework.batch.core.job.JobExecution;
3132
import org.springframework.batch.core.job.JobExecutionException;
@@ -128,4 +129,31 @@ void testTaskExecutorFailure() throws Exception {
128129
assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode());
129130
}
130131

132+
@Test
133+
void testCompletedStepExecutionsAreNotReExecutedButReturned() throws Exception {
134+
StepExecutionSplitter splitter = new StepExecutionSplitter() {
135+
@Override
136+
public String getStepName() {
137+
return stepExecution.getStepName();
138+
}
139+
140+
@Override
141+
public Set<StepExecution> split(StepExecution managerStepExecution, int gridSize) {
142+
Set<StepExecution> split = new HashSet<>();
143+
StepExecution completed = new StepExecution(10L, "completed", managerStepExecution.getJobExecution());
144+
completed.setStatus(BatchStatus.COMPLETED);
145+
completed.setExitStatus(ExitStatus.COMPLETED);
146+
split.add(completed);
147+
split.add(new StepExecution(11L, "started", managerStepExecution.getJobExecution()));
148+
return split;
149+
}
150+
};
151+
152+
Collection<StepExecution> executions = handler.handle(splitter, stepExecution);
153+
154+
assertEquals(1, count);
155+
assertEquals("[started]", stepExecutions.toString());
156+
assertEquals(2, executions.size());
157+
}
158+
131159
}

0 commit comments

Comments
 (0)