Skip to content

Commit e11a3b9

Browse files
committed
Add new implementation of chunk-oriented step
1 parent 35f9949 commit e11a3b9

File tree

4 files changed

+473
-0
lines changed

4 files changed

+473
-0
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The currently available experimental features are the following:
99

1010
* [MongoDB job repository](#mongodb-job-repository)
1111
* [Composite item reader](#composite-item-reader)
12+
* [New chunk-oriented step implementation](#new-chunk-oriented-step-implementation)
1213

1314
**Important note:** The versioning in this repository follows the [semantic versioning specification](https://semver.org/#spec-item-4).
1415
Public APIs as well as the implementations should not be considered stable and may change at any time :exclamation:
@@ -156,6 +157,30 @@ This snippet configures a `CompositeItemReader` with two delegates to read the s
156157

157158
You can find a complete example in the [CompositeItemReaderIntegrationTests](./src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java) file.
158159

160+
# New chunk-oriented step implementation
161+
162+
*Original issue:* https://github.com/spring-projects/spring-batch/issues/3950
163+
164+
This is not a new feature, but rather a new implementation of the chunk-oriented processing model. The goal is to address
165+
the problems with the current implementation as explained in [#3950](https://github.com/spring-projects/spring-batch/issues/3950).
166+
167+
The new implementation does **not** address fault-tolerance and concurrency features for the moment. Those will be addressed incrementally
168+
in future versions. Our main focus for now is correctness, ie simplify the code with minimal to no behavioral changes.
169+
170+
The new implementation is in the `ChunkOrientedStep` class, which can be used as follows:
171+
172+
```java
173+
@Bean
174+
public Step chunkOrientedStep(JobRepository jobRepository, JdbcTransactionManager transactionManager,
175+
ItemReader<Person> itemReader, ItemProcessor<Person, Person> itemProcessor, ItemWriter<Person> itemWriter) {
176+
return new ChunkOrientedStep<>("step", 2, itemReader, itemProcessor, itemWriter, jobRepository, transactionManager);
177+
}
178+
```
179+
180+
The first two parameters are the step name and chunk size. Other parameters are self explanatory.
181+
Once defined, this step can then be added to a Spring Batch job flow like any other step type.
182+
You can find a complete example in the [ChunkOrientedStepIntegrationTests](./src/test/java/org/springframework/batch/experimental/core/step/item/ChunkOrientedStepIntegrationTests.java) file.
183+
159184
# Contribute
160185

161186
The best way to contribute to this project is by trying out the experimental features and sharing your feedback!
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.springframework.batch.experimental.core.step.item;
2+
3+
import org.springframework.batch.core.StepListener;
4+
import org.springframework.batch.item.Chunk;
5+
6+
/*
7+
* The current org.springframework.batch.core.ChunkListener uses ChunkContext which is passed as parameter to chunk listener methods.
8+
* In the new implementation, this context is not used (it is part of the repeat package, which is not used here).
9+
* Therefore, it makes more sense to pass the chunk of items to the listener's methods (consistent with item listeners).
10+
*
11+
* Notable difference: afterChunk is called inside the transaction, not outside the transaction.
12+
*/
13+
public interface ChunkListener<O> extends StepListener {
14+
15+
/**
16+
* Callback before the chunk is processed, inside the transaction.
17+
*/
18+
default void beforeChunk() {
19+
}
20+
21+
/**
22+
* Callback after the chunk is processed, inside the transaction.
23+
*/
24+
default void afterChunk(Chunk<O> chunk) {
25+
}
26+
27+
/**
28+
* Callback if an exception occurs while processing a chunk, inside the transaction,
29+
* which is about to be rolled back. As a result, you should use {@code PROPAGATION_REQUIRES_NEW}
30+
* for any transactional operation that is called from here.</em>
31+
*
32+
* @param exception the exception that caused the underlying rollback.
33+
* @param chunk the processed chunk
34+
*/
35+
default void onChunkError(Exception exception, Chunk<O> chunk) {
36+
}
37+
38+
}
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.experimental.core.step.item;
17+
18+
import org.apache.commons.logging.Log;
19+
import org.apache.commons.logging.LogFactory;
20+
21+
import org.springframework.batch.core.ItemProcessListener;
22+
import org.springframework.batch.core.ItemReadListener;
23+
import org.springframework.batch.core.ItemWriteListener;
24+
import org.springframework.batch.core.StepContribution;
25+
import org.springframework.batch.core.StepExecution;
26+
import org.springframework.batch.core.repository.JobRepository;
27+
import org.springframework.batch.core.step.AbstractStep;
28+
import org.springframework.batch.core.step.FatalStepExecutionException;
29+
import org.springframework.batch.core.step.StepInterruptionPolicy;
30+
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
31+
import org.springframework.batch.item.Chunk;
32+
import org.springframework.batch.item.ExecutionContext;
33+
import org.springframework.batch.item.ItemProcessor;
34+
import org.springframework.batch.item.ItemReader;
35+
import org.springframework.batch.item.ItemStream;
36+
import org.springframework.batch.item.ItemWriter;
37+
import org.springframework.batch.item.support.CompositeItemStream;
38+
import org.springframework.transaction.PlatformTransactionManager;
39+
import org.springframework.transaction.TransactionStatus;
40+
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
41+
import org.springframework.transaction.interceptor.TransactionAttribute;
42+
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
43+
import org.springframework.transaction.support.TransactionTemplate;
44+
import org.springframework.util.Assert;
45+
46+
/**
47+
* Step implementation for the chunk-oriented processing model.
48+
*
49+
* @author Mahmoud Ben Hassine
50+
* @param <I> type of input items
51+
* @param <O> type of output items
52+
*/
53+
public class ChunkOrientedStep<I, O> extends AbstractStep {
54+
55+
private static final Log logger = LogFactory.getLog(ChunkOrientedStep.class.getName());
56+
57+
/*
58+
* Step Input / Output parameters
59+
*/
60+
private final ItemReader<I> itemReader;
61+
private ItemReadListener<I> itemReadListener = new ItemReadListener<>() {};
62+
63+
private final ItemProcessor<I, O> itemProcessor;
64+
private ItemProcessListener<I, O> itemProcessListener = new ItemProcessListener<>() {};
65+
66+
private final ItemWriter<O> itemWriter;
67+
private ItemWriteListener<O> itemWriteListener = new ItemWriteListener<>() {};
68+
69+
/*
70+
* Transactional related parameters
71+
*/
72+
private final TransactionTemplate transactionTemplate;
73+
private final TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
74+
75+
@Override
76+
public boolean rollbackOn(Throwable ex) {
77+
return true;
78+
}
79+
80+
};
81+
82+
/*
83+
* Chunk related parameters
84+
*/
85+
private final int chunkSize;
86+
private final ChunkTracker chunkTracker = new ChunkTracker();
87+
private ChunkListener<O> chunkListener = new ChunkListener<>() {};
88+
89+
/*
90+
* Step state / interruption parameters
91+
*/
92+
private final CompositeItemStream stream = new CompositeItemStream();
93+
private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
94+
95+
96+
public ChunkOrientedStep(String name, int chunkSize,
97+
ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter,
98+
JobRepository jobRepository, PlatformTransactionManager transactionManager) {
99+
super(name);
100+
Assert.isTrue(chunkSize > 0, "Chunk size must be greater than 0");
101+
setJobRepository(jobRepository);
102+
this.itemReader = itemReader;
103+
this.itemProcessor = itemProcessor;
104+
this.itemWriter = itemWriter;
105+
this.transactionTemplate = new TransactionTemplate(transactionManager, this.transactionAttribute);
106+
this.chunkSize = chunkSize;
107+
if (this.itemReader instanceof ItemStream itemStream) {
108+
this.stream.register(itemStream);
109+
}
110+
if (this.itemProcessor instanceof ItemStream itemStream) {
111+
this.stream.register(itemStream);
112+
}
113+
if (this.itemWriter instanceof ItemStream itemStream) {
114+
this.stream.register(itemStream);
115+
}
116+
}
117+
118+
public ChunkOrientedStep(String name, int chunkSize,
119+
ItemReader<I> itemReader, ItemWriter<O> itemWriter,
120+
JobRepository jobRepository, PlatformTransactionManager transactionManager) {
121+
this(name, chunkSize, itemReader, item -> (O) item, itemWriter, jobRepository, transactionManager);
122+
}
123+
124+
@Override
125+
protected void doExecute(StepExecution stepExecution) throws Exception {
126+
while (this.chunkTracker.moreItems()) {
127+
this.interruptionPolicy.checkInterrupted(stepExecution);
128+
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
129+
@Override
130+
protected void doInTransactionWithoutResult(TransactionStatus status) {
131+
StepContribution contribution = stepExecution.createStepContribution();
132+
Chunk<I> inputChunk = new Chunk<>();
133+
Chunk<O> processedChunk = new Chunk<>();
134+
try {
135+
chunkListener.beforeChunk();
136+
inputChunk = read(contribution);
137+
processedChunk = process(inputChunk, contribution);
138+
write(processedChunk, contribution);
139+
chunkListener.afterChunk(processedChunk);
140+
stepExecution.apply(contribution);
141+
stepExecution.incrementCommitCount();
142+
stream.update(stepExecution.getExecutionContext());
143+
getJobRepository().update(stepExecution);
144+
getJobRepository().updateExecutionContext(stepExecution);
145+
} catch (Exception e) {
146+
logger.error("Rolling back chunk transaction", e);
147+
status.setRollbackOnly();
148+
stepExecution.incrementRollbackCount();
149+
chunkListener.onChunkError(e, processedChunk);
150+
throw new FatalStepExecutionException("Unable process chunk", e);
151+
}
152+
}
153+
});
154+
}
155+
}
156+
157+
private Chunk<I> read(StepContribution contribution) throws Exception {
158+
Chunk<I> chunk = new Chunk<>();
159+
for (int i = 0; i < chunkSize; i++) {
160+
this.itemReadListener.beforeRead();
161+
try {
162+
I item = itemReader.read();
163+
if (item == null) {
164+
chunkTracker.noMoreItems();
165+
break;
166+
} else {
167+
chunk.add(item);
168+
contribution.incrementReadCount();
169+
this.itemReadListener.afterRead(item);
170+
}
171+
} catch (Exception exception) {
172+
this.itemReadListener.onReadError(exception);
173+
throw exception;
174+
}
175+
176+
}
177+
return chunk;
178+
}
179+
180+
private Chunk<O> process(Chunk<I> chunk, StepContribution contribution) throws Exception {
181+
Chunk<O> processedChunk = new Chunk<>();
182+
for (I item : chunk) {
183+
try {
184+
this.itemProcessListener.beforeProcess(item);
185+
O processedItem = this.itemProcessor.process(item);
186+
this.itemProcessListener.afterProcess(item, processedItem);
187+
if (processedItem == null) {
188+
contribution.incrementFilterCount(1);
189+
} else {
190+
processedChunk.add(processedItem);
191+
}
192+
} catch (Exception exception) {
193+
this.itemProcessListener.onProcessError(item, exception);
194+
throw exception;
195+
}
196+
}
197+
return processedChunk;
198+
}
199+
200+
private void write(Chunk<O> chunk, StepContribution contribution) throws Exception {
201+
try {
202+
this.itemWriteListener.beforeWrite(chunk);
203+
this.itemWriter.write(chunk);
204+
contribution.incrementWriteCount(chunk.size());
205+
this.itemWriteListener.afterWrite(chunk);
206+
} catch (Exception e) {
207+
this.itemWriteListener.onWriteError(e, chunk);
208+
throw e;
209+
}
210+
211+
}
212+
213+
@Override
214+
protected void open(ExecutionContext executionContext) throws Exception {
215+
this.stream.open(executionContext);
216+
}
217+
218+
@Override
219+
protected void close(ExecutionContext executionContext) throws Exception {
220+
this.stream.close();
221+
}
222+
223+
/**
224+
* Checked at chunk boundaries. Defaults to {@link ThreadStepInterruptionPolicy}.
225+
*/
226+
public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
227+
this.interruptionPolicy = interruptionPolicy;
228+
}
229+
230+
public void registerStream(ItemStream stream) {
231+
this.stream.register(stream);
232+
}
233+
234+
public void setItemReadListener(ItemReadListener<I> itemReadListener) {
235+
this.itemReadListener = itemReadListener;
236+
}
237+
238+
public void setItemProcessListener(ItemProcessListener<I, O> itemProcessListener) {
239+
this.itemProcessListener = itemProcessListener;
240+
}
241+
242+
public void setItemWriteListener(ItemWriteListener<O> itemWriteListener) {
243+
this.itemWriteListener = itemWriteListener;
244+
}
245+
246+
public void setChunkListener(ChunkListener<O> chunkListener) {
247+
this.chunkListener = chunkListener;
248+
}
249+
250+
public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
251+
this.transactionTemplate.setIsolationLevel(transactionAttribute.getIsolationLevel());
252+
this.transactionTemplate.setPropagationBehavior(transactionAttribute.getPropagationBehavior());
253+
this.transactionTemplate.setTimeout(transactionAttribute.getTimeout());
254+
}
255+
256+
private static class ChunkTracker {
257+
private boolean moreItems = true;
258+
259+
void noMoreItems() {
260+
this.moreItems = false;
261+
}
262+
263+
boolean moreItems() {
264+
return this.moreItems;
265+
}
266+
}
267+
268+
269+
}

0 commit comments

Comments
 (0)