Skip to content

Commit 4c9fe94

Browse files
committed
Fix chunk processing on skip in read
Before this commit, when a skip occurs on read, the step was prematurely stopping chunk reading (ie the chunk size was not honored). This commit fixes that by ensuring that the step continues reading next items even when a skip occurs during the read operation. Resolves #5084
1 parent 316a0e9 commit 4c9fe94

File tree

3 files changed

+129
-6
lines changed

3 files changed

+129
-6
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ private void processChunkConcurrently(TransactionStatus status, StepContribution
386386
List<Future<O>> itemProcessingTasks = new LinkedList<>();
387387
try {
388388
// read items and submit concurrent item processing tasks
389-
for (int i = 0; i < this.chunkSize; i++) {
389+
for (int i = 0; i < this.chunkSize && this.chunkTracker.moreItems(); i++) {
390390
I item = readItem(contribution);
391391
if (item != null) {
392392
Future<O> itemProcessingFuture = this.taskExecutor.submit(() -> processItem(item, contribution));
@@ -477,14 +477,11 @@ private boolean interrupted(StepExecution stepExecution) {
477477

478478
private Chunk<I> readChunk(StepContribution contribution) throws Exception {
479479
Chunk<I> chunk = new Chunk<>();
480-
for (int i = 0; i < chunkSize; i++) {
480+
for (int i = 0; i < chunkSize && this.chunkTracker.moreItems(); i++) {
481481
I item = readItem(contribution);
482482
if (item != null) {
483483
chunk.add(item);
484484
}
485-
else {
486-
break;
487-
}
488485
}
489486
return chunk;
490487
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepFaultToleranceIntegrationTests.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package org.springframework.batch.core.step.item;
1717

18+
import java.util.List;
1819
import java.util.Set;
1920

21+
import org.jspecify.annotations.Nullable;
2022
import org.junit.jupiter.api.Assertions;
2123
import org.junit.jupiter.api.Test;
2224

@@ -31,12 +33,14 @@
3133
import org.springframework.batch.core.step.Step;
3234
import org.springframework.batch.core.step.StepExecution;
3335
import org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder;
36+
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
3437
import org.springframework.batch.core.step.skip.LimitCheckingExceptionHierarchySkipPolicy;
3538
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
3639
import org.springframework.batch.infrastructure.item.ItemProcessor;
3740
import org.springframework.batch.infrastructure.item.ItemReader;
3841
import org.springframework.batch.infrastructure.item.ItemWriter;
3942
import org.springframework.batch.infrastructure.item.file.FlatFileParseException;
43+
import org.springframework.batch.infrastructure.item.support.ListItemReader;
4044
import org.springframework.context.ApplicationContext;
4145
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
4246
import org.springframework.context.annotation.Bean;
@@ -188,6 +192,105 @@ void testConcurrentFaultTolerantChunkOrientedStepFailure() throws Exception {
188192
System.clearProperty("skipLimit");
189193
}
190194

195+
// Issue https://github.com/spring-projects/spring-batch/issues/5084
196+
@Test
197+
void testSkipInReadInSequentialMode() throws Exception {
198+
// given
199+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class,
200+
StepConfiguration.class);
201+
JobOperator jobOperator = context.getBean(JobOperator.class);
202+
Job job = context.getBean(Job.class);
203+
204+
// when
205+
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
206+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
207+
208+
// then
209+
Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
210+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
211+
Assertions.assertEquals(8, stepExecution.getReadCount());
212+
Assertions.assertEquals(8, stepExecution.getWriteCount());
213+
Assertions.assertEquals(2, stepExecution.getCommitCount());
214+
Assertions.assertEquals(0, stepExecution.getRollbackCount());
215+
Assertions.assertEquals(2, stepExecution.getReadSkipCount());
216+
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
217+
}
218+
219+
// Issue https://github.com/spring-projects/spring-batch/issues/5084
220+
@Test
221+
void testSkipInReadInConcurrentMode() throws Exception {
222+
// given
223+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class,
224+
ConcurrentStepConfiguration.class);
225+
JobOperator jobOperator = context.getBean(JobOperator.class);
226+
Job job = context.getBean(Job.class);
227+
228+
// when
229+
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
230+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
231+
232+
// then
233+
Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
234+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
235+
Assertions.assertEquals(8, stepExecution.getReadCount());
236+
Assertions.assertEquals(8, stepExecution.getWriteCount());
237+
Assertions.assertEquals(2, stepExecution.getCommitCount());
238+
Assertions.assertEquals(0, stepExecution.getRollbackCount());
239+
Assertions.assertEquals(2, stepExecution.getReadSkipCount());
240+
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
241+
}
242+
243+
@Configuration
244+
static class StepConfiguration {
245+
246+
@Bean
247+
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
248+
List<String> items = List.of("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");
249+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 5).reader(new ListItemReader<>(items) {
250+
@Override
251+
public @Nullable String read() {
252+
String item = super.read();
253+
if ("three".equals(item) || "seven".equals(item)) {
254+
throw new RuntimeException("Simulated read error on item: " + item);
255+
}
256+
return item;
257+
}
258+
}).writer(chunk -> {
259+
})
260+
.transactionManager(transactionManager)
261+
.faultTolerant()
262+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
263+
.build();
264+
}
265+
266+
}
267+
268+
@Configuration
269+
static class ConcurrentStepConfiguration {
270+
271+
@Bean
272+
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
273+
List<String> items = List.of("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");
274+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 5).reader(new ListItemReader<>(items) {
275+
@Override
276+
public @Nullable String read() {
277+
String item = super.read();
278+
if ("three".equals(item) || "seven".equals(item)) {
279+
throw new RuntimeException("Simulated read error on item: " + item);
280+
}
281+
return item;
282+
}
283+
}).writer(chunk -> {
284+
})
285+
.transactionManager(transactionManager)
286+
.taskExecutor(new SimpleAsyncTaskExecutor())
287+
.faultTolerant()
288+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
289+
.build();
290+
}
291+
292+
}
293+
191294
@Configuration
192295
static class FaultTolerantChunkOrientedStepConfiguration {
193296

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.batch.infrastructure.item.ItemReader;
3131
import org.springframework.batch.infrastructure.item.ItemWriter;
3232
import org.springframework.batch.infrastructure.item.support.ListItemReader;
33+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3334

3435
import static org.mockito.Mockito.mock;
3536
import static org.mockito.Mockito.times;
@@ -64,7 +65,7 @@ void testFaultTolerantChunkOrientedStepSetupWithDefaultRetryLimit() {
6465
}
6566

6667
@Test
67-
void testReadNoMoreThanAvailableItems() throws Exception {
68+
void testReadNoMoreThanAvailableItemsInSequentialMode() throws Exception {
6869
// given
6970
ItemReader<String> reader = mock();
7071
ItemWriter<String> writer = chunk -> {
@@ -84,4 +85,26 @@ void testReadNoMoreThanAvailableItems() throws Exception {
8485
verify(reader, times(6)).read();
8586
}
8687

88+
@Test
89+
void testReadNoMoreThanAvailableItemsInConcurrentMode() throws Exception {
90+
// given
91+
ItemReader<String> reader = mock();
92+
ItemWriter<String> writer = chunk -> {
93+
};
94+
JobRepository jobRepository = new ResourcelessJobRepository();
95+
when(reader.read()).thenReturn("1", "2", "3", "4", "5", null);
96+
ChunkOrientedStep<String, String> step = new ChunkOrientedStep<>("step", 10, reader, writer, jobRepository);
97+
step.setTaskExecutor(new SimpleAsyncTaskExecutor());
98+
step.afterPropertiesSet();
99+
JobInstance jobInstance = new JobInstance(1L, "job");
100+
JobExecution jobExecution = new JobExecution(1L, jobInstance, new JobParameters());
101+
StepExecution stepExecution = new StepExecution(1L, "step", jobExecution);
102+
103+
// when
104+
step.execute(stepExecution);
105+
106+
// then
107+
verify(reader, times(6)).read();
108+
}
109+
87110
}

0 commit comments

Comments
 (0)