Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import java.util.Collection;
import java.util.Set;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.util.CollectionUtils;

/**
* Base {@link PartitionHandler} implementation providing common base features. Subclasses
Expand Down Expand Up @@ -55,8 +57,25 @@ protected abstract Set<StepExecution> doHandle(StepExecution managerStepExecutio
public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
final StepExecution managerStepExecution) throws Exception {
final Set<StepExecution> stepExecutions = stepSplitter.split(managerStepExecution, gridSize);
final Set<StepExecution> alreadyCompleted = CollectionUtils.newHashSet(stepExecutions.size());
final Set<StepExecution> toBeExecuted = CollectionUtils.newHashSet(stepExecutions.size());

return doHandle(managerStepExecution, stepExecutions);
for (StepExecution stepExecution : stepExecutions) {
if (stepExecution.getStatus() == BatchStatus.COMPLETED) {
alreadyCompleted.add(stepExecution);
}
else {
toBeExecuted.add(stepExecution);
}
}

final Set<StepExecution> updatedExecutions = CollectionUtils.newHashSet(stepExecutions.size());
final Set<StepExecution> handledExecutions = doHandle(managerStepExecution, toBeExecuted);
if (handledExecutions != null) {
updatedExecutions.addAll(handledExecutions);
}
updatedExecutions.addAll(alreadyCompleted);
return updatedExecutions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,13 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw
set.add(currentStepExecution);
}
else { // restart
if (lastStepExecution.getStatus() != BatchStatus.COMPLETED
&& shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) {
if (lastStepExecution.getStatus() == BatchStatus.COMPLETED) {
// Keep completed partitions for aggregation on restart.
if (!isSameJobExecution(stepExecution, lastStepExecution)) {
set.add(lastStepExecution);
}
}
else if (shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) {
StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution);
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
jobRepository.updateExecutionContext(currentStepExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -245,4 +246,55 @@ public void aggregate(StepExecution result, Collection<StepExecution> executions
assertEquals(true, stepExecution.getExecutionContext().get("aggregated"));
}

@Test
void testRestartWhenAllPartitionsCompletedShouldAggregateCompletedPartitions() throws Exception {
AtomicInteger aggregateSizeOnRestart = new AtomicInteger(-1);
AtomicBoolean firstAggregation = new AtomicBoolean(true);
step.setStepExecutionAggregator((result, executions) -> {
if (firstAggregation.getAndSet(false)) {
return;
}
aggregateSizeOnRestart.set(executions.size());
});

SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter(jobRepository,
step.getName(), new SimplePartitioner());
step.setStepExecutionSplitter(stepExecutionSplitter);
step.setPartitionHandler((stepSplitter, managerExecution) -> {
Set<StepExecution> executions = stepSplitter.split(managerExecution, 2);
for (StepExecution execution : executions) {
if (execution.getStatus() != BatchStatus.COMPLETED) {
execution.setStatus(BatchStatus.COMPLETED);
execution.setExitStatus(ExitStatus.COMPLETED);
jobRepository.update(execution);
}
}
return executions;
});

step.afterPropertiesSet();
JobParameters jobParameters = new JobParameters();
ExecutionContext executionContext = new ExecutionContext();
JobInstance jobInstance = jobRepository.createJobInstance("remotePartitionRestartJob", jobParameters);
JobExecution firstJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, executionContext);
StepExecution firstManagerStepExecution = jobRepository.createStepExecution("manager", firstJobExecution);
step.execute(firstManagerStepExecution);

firstManagerStepExecution.setStatus(BatchStatus.FAILED);
firstManagerStepExecution.setExitStatus(ExitStatus.FAILED);
jobRepository.update(firstManagerStepExecution);
firstJobExecution.setStatus(BatchStatus.FAILED);
firstJobExecution.setEndTime(LocalDateTime.now());
jobRepository.update(firstJobExecution);

JobExecution restartedJobExecution = jobRepository.createJobExecution(jobInstance, jobParameters,
executionContext);
StepExecution restartedManagerStepExecution = jobRepository.createStepExecution("manager",
restartedJobExecution);
step.execute(restartedManagerStepExecution);

assertEquals(BatchStatus.COMPLETED, restartedManagerStepExecution.getStatus());
assertEquals(2, aggregateSizeOnRestart.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ void testRememberGridSize() throws Exception {
assertEquals(2, provider.split(stepExecution, 3).size());
}

@Test
void testCompletedPartitionsAreReturnedForAggregationOnRestart() throws Exception {
SimpleStepExecutionSplitter provider = new SimpleStepExecutionSplitter(jobRepository, step.getName(),
new SimplePartitioner());
Set<StepExecution> split = provider.split(stepExecution, 2);
assertEquals(2, split.size());
stepExecution = update(split, stepExecution, BatchStatus.COMPLETED, false);

Set<StepExecution> restartSplit = provider.split(stepExecution, 2);
assertEquals(2, restartSplit.size());
assertTrue(restartSplit.stream().allMatch(execution -> execution.getStatus() == BatchStatus.COMPLETED));
}

@Test
void testRememberPartitionNames() throws Exception {
class CustomPartitioner implements Partitioner, PartitionNameProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobExecutionException;
Expand Down Expand Up @@ -128,4 +129,31 @@ void testTaskExecutorFailure() throws Exception {
assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode());
}

@Test
void testCompletedStepExecutionsAreNotReExecutedButReturned() throws Exception {
StepExecutionSplitter splitter = new StepExecutionSplitter() {
@Override
public String getStepName() {
return stepExecution.getStepName();
}

@Override
public Set<StepExecution> split(StepExecution managerStepExecution, int gridSize) {
Set<StepExecution> split = new HashSet<>();
StepExecution completed = new StepExecution(10L, "completed", managerStepExecution.getJobExecution());
completed.setStatus(BatchStatus.COMPLETED);
completed.setExitStatus(ExitStatus.COMPLETED);
split.add(completed);
split.add(new StepExecution(11L, "started", managerStepExecution.getJobExecution()));
return split;
}
};

Collection<StepExecution> executions = handler.handle(splitter, stepExecution);

assertEquals(1, count);
assertEquals("[started]", stepExecutions.toString());
assertEquals(2, executions.size());
}

}