Skip to content

Commit 946f788

Browse files
JunggiKimfmbenhassine
authored andcommitted
Fix silent data loss in ChunkOrientedStep when skip policy rejects
When retry is exhausted and skipPolicy.shouldSkip() returns false, ChunkOrientedStep should fail the job instead of silently discarding the item. This change adds proper exception handling to prevent silent data loss. - Add NonSkippableReadException in doSkipInRead() else block - Add NonSkippableProcessException in doSkipInProcess() else block - Add NonSkippableWriteException in scan() else block - Add test case for skip policy rejection scenario Fixes gh-5079 Signed-off-by: kjg <[email protected]>
1 parent 638c183 commit 946f788

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
import org.springframework.batch.core.step.StepInterruptionPolicy;
5252
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
5353
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
54+
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
55+
import org.springframework.batch.core.step.skip.NonSkippableReadException;
56+
import org.springframework.batch.core.step.skip.NonSkippableWriteException;
5457
import org.springframework.batch.core.step.skip.SkipPolicy;
5558
import org.springframework.batch.infrastructure.item.Chunk;
5659
import org.springframework.batch.infrastructure.item.ExecutionContext;
@@ -557,6 +560,9 @@ private void doSkipInRead(RetryException retryException, StepContribution contri
557560
this.compositeSkipListener.onSkipInRead(cause);
558561
contribution.incrementReadSkipCount();
559562
}
563+
else {
564+
throw new NonSkippableReadException("Skip policy rejected skipping item", cause);
565+
}
560566
}
561567

562568
private Chunk<O> processChunk(Chunk<I> chunk, StepContribution contribution) throws Exception {
@@ -650,6 +656,9 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu
650656
this.compositeSkipListener.onSkipInProcess(item, retryException.getCause());
651657
contribution.incrementProcessSkipCount();
652658
}
659+
else {
660+
throw new NonSkippableProcessException("Skip policy rejected skipping item", cause);
661+
}
653662
}
654663

655664
private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Exception {
@@ -734,6 +743,7 @@ private void scan(Chunk<O> chunk, StepContribution contribution) {
734743
else {
735744
logger.error("Failed to write item: " + item, exception);
736745
this.compositeItemWriteListener.onWriteError(exception, singleItemChunk);
746+
throw new NonSkippableWriteException("Skip policy rejected skipping item", exception);
737747
}
738748
}
739749
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,29 @@
2121
import org.junit.jupiter.api.Assertions;
2222
import org.junit.jupiter.api.Test;
2323

24+
import org.springframework.batch.core.ExitStatus;
2425
import org.springframework.batch.core.job.JobExecution;
2526
import org.springframework.batch.core.job.JobInstance;
2627
import org.springframework.batch.core.job.parameters.JobParameters;
2728
import org.springframework.batch.core.repository.JobRepository;
2829
import org.springframework.batch.core.repository.support.ResourcelessJobRepository;
30+
import org.springframework.batch.core.step.FatalStepExecutionException;
2931
import org.springframework.batch.core.step.StepExecution;
3032
import org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder;
3133
import org.springframework.batch.core.step.builder.StepBuilder;
34+
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
35+
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
3236
import org.springframework.batch.infrastructure.item.ItemProcessor;
3337
import org.springframework.batch.infrastructure.item.ItemReader;
3438
import org.springframework.batch.infrastructure.item.ItemWriter;
3539
import org.springframework.batch.infrastructure.item.support.ListItemReader;
3640
import org.springframework.batch.infrastructure.item.support.ListItemWriter;
3741
import org.springframework.batch.infrastructure.support.transaction.ResourcelessTransactionManager;
42+
import org.springframework.core.retry.RetryPolicy;
3843
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3944

4045
import static org.junit.jupiter.api.Assertions.assertEquals;
46+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
4147
import static org.mockito.Mockito.mock;
4248
import static org.mockito.Mockito.times;
4349
import static org.mockito.Mockito.verify;
@@ -215,4 +221,50 @@ void testExplicitRetryConfigurationTakesPrecedence() throws Exception {
215221
"RuntimeException should not be retried when only IllegalStateException is configured");
216222
}
217223

224+
@Test
225+
void testDoSkipInProcessShouldThrowNonSkippableProcessExceptionWhenSkipPolicyReturnsFalse() throws Exception {
226+
// given - fault-tolerant step with NeverSkipItemSkipPolicy and retry limit
227+
ItemReader<String> reader = new ListItemReader<>(List.of("item1", "item2", "item3"));
228+
229+
ItemProcessor<String, String> processor = item -> {
230+
if ("item2".equals(item)) {
231+
throw new RuntimeException("Processing failed for item2");
232+
}
233+
return item.toUpperCase();
234+
};
235+
236+
ItemWriter<String> writer = chunk -> {
237+
};
238+
239+
JobRepository jobRepository = new ResourcelessJobRepository();
240+
ChunkOrientedStep<String, String> step = new ChunkOrientedStep<>("step", 3, reader, writer, jobRepository);
241+
step.setItemProcessor(processor);
242+
step.setFaultTolerant(true);
243+
step.setRetryPolicy(RetryPolicy.withMaxRetries(1)); // retry once (initial + 1
244+
// retry)
245+
step.setSkipPolicy(new NeverSkipItemSkipPolicy()); // never skip
246+
step.afterPropertiesSet();
247+
248+
JobInstance jobInstance = new JobInstance(1L, "job");
249+
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
250+
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);
251+
252+
// when - execute step
253+
step.execute(stepExecution);
254+
255+
// then - should fail with FatalStepExecutionException having
256+
// NonSkippableProcessException as cause
257+
ExitStatus stepExecutionExitStatus = stepExecution.getExitStatus();
258+
assertEquals(ExitStatus.FAILED.getExitCode(), stepExecutionExitStatus.getExitCode());
259+
Throwable throwable = stepExecution.getFailureExceptions().get(0);
260+
assertInstanceOf(FatalStepExecutionException.class, throwable,
261+
"Expected FatalStepExecutionException when skip policy rejects skipping");
262+
Throwable cause = throwable.getCause();
263+
assertInstanceOf(NonSkippableProcessException.class, cause,
264+
"Expected NonSkippableProcessException as cause when skip policy rejects skipping");
265+
assertEquals("Skip policy rejected skipping item", cause.getMessage());
266+
assertEquals(ExitStatus.FAILED.getExitCode(), stepExecution.getExitStatus().getExitCode());
267+
assertEquals(0, stepExecution.getProcessSkipCount(), "Process skip count should be 0");
268+
}
269+
218270
}

0 commit comments

Comments
 (0)