Skip to content

Commit cb55ccc

Browse files
committed
Fix exhausted retry handling with non skippable exceptions
Before this commit, chunk scanning was unconditionally preformed when the retry policy is exhausted and non skippable exceptions are configured. This resulted in duplicate item writes which is not intended. This commit fixes that by checking the skip policy before scanning the chunk on an exhausted retry policy. Resolves #5091
1 parent 5b80510 commit cb55ccc

File tree

3 files changed

+58
-1
lines changed

3 files changed

+58
-1
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,8 @@ private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Ex
685685
chunkWriteEvent.chunkWriteStatus = BatchMetrics.STATUS_FAILURE;
686686
observation.lowCardinalityKeyValue(fullyQualifiedMetricName + ".status", BatchMetrics.STATUS_FAILURE);
687687
observation.error(exception);
688-
if (this.faultTolerant && exception instanceof RetryException retryException) {
688+
if (this.faultTolerant && exception instanceof RetryException retryException
689+
&& !this.skipPolicy.shouldSkip(retryException.getCause(), -1)) {
689690
logger.info("Retry exhausted while attempting to write items, scanning the chunk", retryException);
690691
ChunkScanEvent chunkScanEvent = new ChunkScanEvent(contribution.getStepExecution().getStepName(),
691692
contribution.getStepExecution().getId());
@@ -696,6 +697,7 @@ private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Ex
696697
logger.info("Chunk scan completed");
697698
}
698699
else {
700+
logger.error("Retry exhausted after last attempt in recovery path, but exception is not skippable");
699701
throw exception;
700702
}
701703
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/skip/LimitCheckingExceptionHierarchySkipPolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public LimitCheckingExceptionHierarchySkipPolicy(Set<Class<? extends Throwable>>
4949

5050
@Override
5151
public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException {
52+
if (skipCount < 0) {
53+
return !isSkippable(t);
54+
}
5255
if (!isSkippable(t)) {
5356
return false;
5457
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepFaultToleranceIntegrationTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
4646
import org.springframework.context.annotation.Bean;
4747
import org.springframework.context.annotation.Configuration;
48+
import org.springframework.core.retry.RetryException;
4849
import org.springframework.core.retry.RetryPolicy;
4950
import org.springframework.core.task.SimpleAsyncTaskExecutor;
5051
import org.springframework.dao.DataIntegrityViolationException;
@@ -240,6 +241,36 @@ void testSkipInReadInConcurrentMode() throws Exception {
240241
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
241242
}
242243

244+
// Issue https://github.com/spring-projects/spring-batch/issues/5091
245+
@Test
246+
void testExhaustedRetryWithNonSkippableException() throws Exception {
247+
// given
248+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class,
249+
ExhaustedRetryStepConfiguration.class);
250+
JobOperator jobOperator = context.getBean(JobOperator.class);
251+
Job job = context.getBean(Job.class);
252+
253+
// when
254+
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
255+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
256+
257+
// then
258+
Assertions.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
259+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
260+
Throwable failureException = stepExecution.getFailureExceptions().iterator().next();
261+
Assertions.assertInstanceOf(FatalStepExecutionException.class, failureException);
262+
Assertions.assertInstanceOf(RetryException.class, failureException.getCause());
263+
// the test has a single chunk (3 items with chunk size = 3) that is rolled back
264+
// upon retry exhaustion
265+
Assertions.assertEquals(3, stepExecution.getReadCount());
266+
Assertions.assertEquals(0, stepExecution.getWriteCount());
267+
Assertions.assertEquals(0, stepExecution.getCommitCount());
268+
Assertions.assertEquals(1, stepExecution.getRollbackCount());
269+
Assertions.assertEquals(0, stepExecution.getReadSkipCount());
270+
// no scan is triggered as the exception is non skippable
271+
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
272+
}
273+
243274
@Configuration
244275
static class StepConfiguration {
245276

@@ -265,6 +296,27 @@ public Step step(JobRepository jobRepository, JdbcTransactionManager transaction
265296

266297
}
267298

299+
@Configuration
300+
static class ExhaustedRetryStepConfiguration {
301+
302+
@Bean
303+
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
304+
List<String> items = List.of("one", "two", "three");
305+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items))
306+
.writer(chunk -> {
307+
for (String item : chunk) {
308+
if (item.equalsIgnoreCase("two")) {
309+
throw new RuntimeException("Simulated write error on item two");
310+
}
311+
}
312+
})
313+
.transactionManager(transactionManager)
314+
.faultTolerant()
315+
.build();
316+
}
317+
318+
}
319+
268320
@Configuration
269321
static class ConcurrentStepConfiguration {
270322

0 commit comments

Comments
 (0)