Skip to content

Commit 30a4d43

Browse files
wklausfmbenhassine
authored andcommitted
BATCH-2480 Fix regression due to the fix of BATCH-2442
1 parent c4a7834 commit 30a4d43

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

spring-batch-core-tests/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2010-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package org.springframework.batch.core.test.step;
217

318
import java.util.ArrayList;
@@ -17,6 +32,7 @@
1732
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
1833
import org.springframework.batch.core.repository.JobRepository;
1934
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
35+
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
2036
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
2137
import org.springframework.batch.core.step.skip.SkipPolicy;
2238
import org.springframework.batch.item.ItemProcessor;
@@ -197,6 +213,53 @@ public void write(List<? extends Integer> items) throws Exception {
197213
assertEquals(1, stepExecution.getProcessSkipCount());
198214
}
199215

216+
@Test(timeout = 3000)
217+
public void testExceptionInProcessAndWriteDuringChunkScan() throws Exception {
218+
// Given
219+
ListItemReader<Integer> itemReader = new ListItemReader<>(Arrays.asList(1, 2, 3));
220+
221+
ItemProcessor<Integer, Integer> itemProcessor = new ItemProcessor<Integer, Integer>() {
222+
@Override
223+
public Integer process(Integer item) throws Exception {
224+
if (item.equals(2)) {
225+
throw new Exception("Error during process item " + item);
226+
}
227+
return item;
228+
}
229+
};
230+
231+
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
232+
@Override
233+
public void write(List<? extends Integer> items) throws Exception {
234+
if (items.contains(3)) {
235+
throw new Exception("Error during write");
236+
}
237+
}
238+
};
239+
240+
Step step = new StepBuilderFactory(jobRepository, transactionManager).get("step")
241+
.<Integer, Integer>chunk(5)
242+
.reader(itemReader)
243+
.processor(itemProcessor)
244+
.writer(itemWriter)
245+
.faultTolerant()
246+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
247+
.build();
248+
249+
// When
250+
StepExecution stepExecution = execute(step);
251+
252+
// Then
253+
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
254+
assertEquals(ExitStatus.COMPLETED, stepExecution.getExitStatus());
255+
assertEquals(3, stepExecution.getReadCount());
256+
assertEquals(1, stepExecution.getWriteCount());
257+
assertEquals(1, stepExecution.getWriteSkipCount());
258+
assertEquals(1, stepExecution.getProcessSkipCount());
259+
assertEquals(3, stepExecution.getRollbackCount());
260+
assertEquals(2, stepExecution.getCommitCount());
261+
}
262+
200263
private List<Integer> createItems() {
201264
List<Integer> items = new ArrayList<>(TOTAL_ITEMS);
202265
for (int i = 1; i <= TOTAL_ITEMS; i++) {

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2018 the original author or authors.
2+
* Copyright 2006-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -580,8 +580,7 @@ private void scan(final StepContribution contribution, final Chunk<I> inputs, fi
580580
Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
581581
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
582582

583-
//BATCH-2442 : do not scan skipped items
584-
if (!inputs.getSkips().isEmpty()) {
583+
if (!inputs.getSkips().isEmpty() && inputs.getItems().size() != outputs.getItems().size()) {
585584
if (outputIterator.hasNext()) {
586585
outputIterator.remove();
587586
return;

0 commit comments

Comments
 (0)