Skip to content

Commit b24a85e

Browse files
KMGeonfmbenhassine
authored andcommitted
Fix duplicate writes in scan mode by deferring chunk scanning to new transaction
Resolves #5210 Resolves #5247 Signed-off-by: KMGeon <pos04167@kakao.com>
1 parent 0ad4f4e commit b24a85e

File tree

4 files changed

+529
-23
lines changed

4 files changed

+529
-23
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 108 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
5555
import org.springframework.batch.core.step.skip.NonSkippableReadException;
5656
import org.springframework.batch.core.step.skip.NonSkippableWriteException;
57+
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
5758
import org.springframework.batch.core.step.skip.SkipPolicy;
5859
import org.springframework.batch.infrastructure.item.Chunk;
5960
import org.springframework.batch.infrastructure.item.ExecutionContext;
@@ -133,7 +134,7 @@ public class ChunkOrientedStep<I, O> extends AbstractStep {
133134
*/
134135
private final int chunkSize;
135136

136-
private final ThreadLocal<ChunkTracker> chunkTracker = ThreadLocal.withInitial(ChunkTracker::new);
137+
private final ThreadLocal<ChunkTracker<O>> chunkTracker = ThreadLocal.withInitial(ChunkTracker::create);
137138

138139
private final CompositeChunkListener<I, O> compositeChunkListener = new CompositeChunkListener<>();
139140

@@ -371,11 +372,18 @@ protected void doExecute(StepExecution stepExecution) throws Exception {
371372
chunkTransactionEvent.begin();
372373
StepContribution contribution = stepExecution.createStepContribution();
373374
processNextChunk(transactionStatus, contribution, stepExecution);
375+
376+
// Skip update during rollback to avoid OptimisticLockingFailureException
377+
if (transactionStatus.isRollbackOnly()) {
378+
chunkTransactionEvent.transactionStatus = BatchMetrics.STATUS_ROLLED_BACK;
379+
chunkTransactionEvent.commit();
380+
return;
381+
}
382+
374383
this.compositeItemStream.update(stepExecution.getExecutionContext());
375384
getJobRepository().updateExecutionContext(stepExecution);
376385
getJobRepository().update(stepExecution);
377-
chunkTransactionEvent.transactionStatus = transactionStatus.isRollbackOnly()
378-
? BatchMetrics.STATUS_ROLLED_BACK : BatchMetrics.STATUS_COMMITTED;
386+
chunkTransactionEvent.transactionStatus = BatchMetrics.STATUS_COMMITTED;
379387
chunkTransactionEvent.commit();
380388
});
381389
}
@@ -394,7 +402,27 @@ private void processNextChunk(TransactionStatus status, StepContribution contrib
394402
private void processChunkConcurrently(TransactionStatus status, StepContribution contribution,
395403
StepExecution stepExecution) {
396404
List<Future<O>> itemProcessingTasks = new LinkedList<>();
405+
Chunk<O> processedChunk = new Chunk<>();
406+
ChunkTracker<O> tracker = this.chunkTracker.get();
407+
397408
try {
409+
if (tracker.isScanMode()) {
410+
logger.info("Executing scan in new transaction after rollback");
411+
Chunk<O> pendingChunk = tracker.getPendingChunk();
412+
if (pendingChunk != null) {
413+
ChunkScanEvent chunkScanEvent = new ChunkScanEvent(stepExecution.getStepName(),
414+
stepExecution.getId());
415+
chunkScanEvent.begin();
416+
scan(pendingChunk, contribution);
417+
chunkScanEvent.skipCount = contribution.getSkipCount();
418+
chunkScanEvent.commit();
419+
logger.info("Chunk scan completed");
420+
tracker.exitScanMode();
421+
stepExecution.incrementCommitCount();
422+
}
423+
return;
424+
}
425+
398426
// read items and submit concurrent item processing tasks
399427
for (int i = 0; i < this.chunkSize && this.chunkTracker.get().moreItems(); i++) {
400428
I item = readItem(contribution);
@@ -417,7 +445,6 @@ private void processChunkConcurrently(TransactionStatus status, StepContribution
417445
}
418446

419447
// collect processed items
420-
Chunk<O> processedChunk = new Chunk<>();
421448
for (Future<O> future : itemProcessingTasks) {
422449
O processedItem = future.get();
423450
if (processedItem != null) {
@@ -433,20 +460,49 @@ private void processChunkConcurrently(TransactionStatus status, StepContribution
433460
logger.error("Rolling back chunk transaction", e);
434461
status.setRollbackOnly();
435462
stepExecution.incrementRollbackCount();
463+
464+
if (tracker.isScanMode()) {
465+
if (e instanceof SkipLimitExceededException || e instanceof NonSkippableWriteException) {
466+
tracker.exitScanMode();
467+
throw new FatalStepExecutionException("Unable to process chunk during scan", e);
468+
}
469+
logger.info("Rollback complete, scan will execute in next transaction");
470+
return;
471+
}
472+
436473
throw new FatalStepExecutionException("Unable to process chunk", e);
437474
}
438475
finally {
439-
// apply contribution
440476
stepExecution.apply(contribution);
441477
}
442-
443478
}
444479

445480
private void processChunkSequentially(TransactionStatus status, StepContribution contribution,
446481
StepExecution stepExecution) {
447482
Chunk<I> inputChunk = new Chunk<>();
448483
Chunk<O> processedChunk = new Chunk<>();
484+
ChunkTracker<O> tracker = this.chunkTracker.get();
485+
449486
try {
487+
if (tracker.isScanMode()) {
488+
logger.info("Executing scan in new transaction after rollback");
489+
Chunk<O> pendingChunk = tracker.getPendingChunk();
490+
if (pendingChunk != null) {
491+
ChunkScanEvent chunkScanEvent = new ChunkScanEvent(stepExecution.getStepName(),
492+
stepExecution.getId());
493+
chunkScanEvent.begin();
494+
compositeChunkListener.beforeChunk(new Chunk<>());
495+
scan(pendingChunk, contribution);
496+
compositeChunkListener.afterChunk(pendingChunk);
497+
chunkScanEvent.skipCount = contribution.getSkipCount();
498+
chunkScanEvent.commit();
499+
logger.info("Chunk scan completed");
500+
tracker.exitScanMode();
501+
stepExecution.incrementCommitCount();
502+
}
503+
return;
504+
}
505+
450506
inputChunk = readChunk(contribution);
451507
if (inputChunk.isEmpty()) {
452508
return;
@@ -461,11 +517,21 @@ private void processChunkSequentially(TransactionStatus status, StepContribution
461517
logger.error("Rolling back chunk transaction", e);
462518
status.setRollbackOnly();
463519
stepExecution.incrementRollbackCount();
520+
521+
if (tracker.isScanMode()) {
522+
if (e instanceof SkipLimitExceededException || e instanceof NonSkippableWriteException) {
523+
tracker.exitScanMode();
524+
compositeChunkListener.onChunkError(e, processedChunk);
525+
throw new FatalStepExecutionException("Unable to process chunk during scan", e);
526+
}
527+
logger.info("Rollback complete, scan will execute in next transaction");
528+
return;
529+
}
530+
464531
compositeChunkListener.onChunkError(e, processedChunk);
465532
throw new FatalStepExecutionException("Unable to process chunk", e);
466533
}
467534
finally {
468-
// apply contribution
469535
stepExecution.apply(contribution);
470536
}
471537
}
@@ -696,21 +762,16 @@ private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Ex
696762
chunkWriteEvent.chunkWriteStatus = BatchMetrics.STATUS_FAILURE;
697763
observation.lowCardinalityKeyValue(fullyQualifiedMetricName + ".status", BatchMetrics.STATUS_FAILURE);
698764
observation.error(exception);
765+
699766
if (this.faultTolerant && exception instanceof RetryException retryException
700767
&& this.skipPolicy.shouldSkip(retryException.getCause(), -1)) {
701-
logger.info("Retry exhausted while attempting to write items, scanning the chunk", retryException);
702-
ChunkScanEvent chunkScanEvent = new ChunkScanEvent(contribution.getStepExecution().getStepName(),
703-
contribution.getStepExecution().getId());
704-
chunkScanEvent.begin();
705-
scan(chunk, contribution);
706-
chunkScanEvent.skipCount = contribution.getSkipCount();
707-
chunkScanEvent.commit();
708-
logger.info("Chunk scan completed");
768+
logger.info("Retry exhausted, entering scan mode for next transaction", retryException);
769+
this.chunkTracker.get().enterScanMode(chunk);
709770
}
710771
else {
711772
logger.error("Retry exhausted after last attempt in recovery path, but exception is not skippable");
712-
throw exception;
713773
}
774+
throw exception;
714775
}
715776
finally {
716777
chunkWriteEvent.commit();
@@ -752,6 +813,7 @@ private void scan(Chunk<O> chunk, StepContribution contribution) {
752813
if (this.skipPolicy.shouldSkip(exception, contribution.getStepSkipCount())) {
753814
this.compositeSkipListener.onSkipInWrite(item, exception);
754815
contribution.incrementWriteSkipCount();
816+
contribution.getStepExecution().incrementRollbackCount();
755817
}
756818
else {
757819
logger.error("Failed to write item: " + item, exception);
@@ -766,20 +828,48 @@ private boolean isConcurrent() {
766828
return this.taskExecutor != null;
767829
}
768830

769-
private static class ChunkTracker {
831+
private static class ChunkTracker<O> {
832+
833+
static <T> ChunkTracker<T> create() {
834+
return new ChunkTracker<>();
835+
}
770836

771837
private boolean moreItems;
772838

839+
private boolean scanMode;
840+
841+
@Nullable private Chunk<O> pendingChunk;
842+
773843
void init() {
774844
this.moreItems = true;
845+
this.scanMode = false;
846+
this.pendingChunk = null;
775847
}
776848

777849
void reset() {
778850
this.moreItems = false;
779851
}
780852

781853
boolean moreItems() {
782-
return this.moreItems;
854+
return this.moreItems || this.scanMode;
855+
}
856+
857+
void enterScanMode(Chunk<O> chunk) {
858+
this.scanMode = true;
859+
this.pendingChunk = new Chunk<>(chunk.getItems());
860+
}
861+
862+
boolean isScanMode() {
863+
return this.scanMode;
864+
}
865+
866+
@Nullable Chunk<O> getPendingChunk() {
867+
return this.pendingChunk;
868+
}
869+
870+
void exitScanMode() {
871+
this.scanMode = false;
872+
this.pendingChunk = null;
783873
}
784874

785875
}

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepFaultToleranceIntegrationTests.java

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void testFaultTolerantChunkOrientedStepSuccess() throws Exception {
8686
Assertions.assertEquals(4, stepExecution.getReadCount());
8787
Assertions.assertEquals(3, stepExecution.getWriteCount());
8888
Assertions.assertEquals(3, stepExecution.getCommitCount());
89-
Assertions.assertEquals(0, stepExecution.getRollbackCount());
89+
Assertions.assertEquals(2, stepExecution.getRollbackCount());
9090
Assertions.assertEquals(2, stepExecution.getReadSkipCount());
9191
Assertions.assertEquals(1, stepExecution.getWriteSkipCount());
9292
Assertions.assertEquals(3, stepExecution.getSkipCount());
@@ -117,7 +117,7 @@ void testConcurrentFaultTolerantChunkOrientedStepSuccess() throws Exception {
117117
Assertions.assertEquals(4, stepExecution.getReadCount());
118118
Assertions.assertEquals(3, stepExecution.getWriteCount());
119119
Assertions.assertEquals(3, stepExecution.getCommitCount());
120-
Assertions.assertEquals(0, stepExecution.getRollbackCount());
120+
Assertions.assertEquals(2, stepExecution.getRollbackCount());
121121
Assertions.assertEquals(2, stepExecution.getReadSkipCount());
122122
Assertions.assertEquals(1, stepExecution.getWriteSkipCount());
123123
Assertions.assertEquals(3, stepExecution.getSkipCount());
@@ -151,7 +151,7 @@ void testFaultTolerantChunkOrientedStepFailure() throws Exception {
151151
Assertions.assertEquals(3, stepExecution.getReadCount());
152152
Assertions.assertEquals(2, stepExecution.getWriteCount());
153153
Assertions.assertEquals(1, stepExecution.getCommitCount());
154-
Assertions.assertEquals(1, stepExecution.getRollbackCount());
154+
Assertions.assertEquals(2, stepExecution.getRollbackCount());
155155
Assertions.assertEquals(1, stepExecution.getReadSkipCount());
156156
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
157157
Assertions.assertEquals(1, stepExecution.getSkipCount());
@@ -185,7 +185,7 @@ void testConcurrentFaultTolerantChunkOrientedStepFailure() throws Exception {
185185
Assertions.assertEquals(3, stepExecution.getReadCount());
186186
Assertions.assertEquals(2, stepExecution.getWriteCount());
187187
Assertions.assertEquals(1, stepExecution.getCommitCount());
188-
Assertions.assertEquals(1, stepExecution.getRollbackCount());
188+
Assertions.assertEquals(2, stepExecution.getRollbackCount());
189189
Assertions.assertEquals(1, stepExecution.getReadSkipCount());
190190
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
191191
Assertions.assertEquals(1, stepExecution.getSkipCount());
@@ -271,6 +271,56 @@ void testExhaustedRetryWithNonSkippableException() throws Exception {
271271
Assertions.assertEquals(0, stepExecution.getWriteSkipCount());
272272
}
273273

274+
// Issue https://github.com/spring-projects/spring-batch/issues/5210
275+
@Test
276+
void testNoDuplicateWritesAfterScanModeInSequentialMode() throws Exception {
277+
// given
278+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class,
279+
DuplicateWriteDetectionStepConfiguration.class);
280+
JobOperator jobOperator = context.getBean(JobOperator.class);
281+
Job job = context.getBean(Job.class);
282+
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
283+
284+
// when
285+
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
286+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
287+
288+
// then
289+
Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
290+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
291+
Assertions.assertEquals(3, stepExecution.getReadCount());
292+
Assertions.assertEquals(1, stepExecution.getWriteCount());
293+
Assertions.assertEquals(2, stepExecution.getWriteSkipCount());
294+
Assertions.assertEquals(1,
295+
jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class));
296+
Assertions.assertEquals(1, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery"));
297+
}
298+
299+
// Issue https://github.com/spring-projects/spring-batch/issues/5210
300+
@Test
301+
void testNoDuplicateWritesAfterScanModeInConcurrentMode() throws Exception {
302+
// given
303+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class,
304+
ConcurrentDuplicateWriteDetectionStepConfiguration.class);
305+
JobOperator jobOperator = context.getBean(JobOperator.class);
306+
Job job = context.getBean(Job.class);
307+
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
308+
309+
// when
310+
JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
311+
JobExecution jobExecution = jobOperator.start(job, jobParameters);
312+
313+
// then
314+
Assertions.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
315+
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
316+
Assertions.assertEquals(3, stepExecution.getReadCount());
317+
Assertions.assertEquals(1, stepExecution.getWriteCount());
318+
Assertions.assertEquals(2, stepExecution.getWriteSkipCount());
319+
Assertions.assertEquals(1,
320+
jdbcTemplate.queryForObject("SELECT COUNT(*) FROM delivery WHERE item_number = '1'", Integer.class));
321+
Assertions.assertEquals(1, JdbcTestUtils.countRowsInTable(jdbcTemplate, "delivery"));
322+
}
323+
274324
@Configuration
275325
static class StepConfiguration {
276326

@@ -414,4 +464,53 @@ public Step concurrentFaulTolerantChunkOrientedStep(JobRepository jobRepository,
414464

415465
}
416466

467+
@Configuration
468+
static class DuplicateWriteDetectionStepConfiguration {
469+
470+
@Bean
471+
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager,
472+
JdbcTemplate jdbcTemplate) {
473+
List<String> items = List.of("1", "2", "3");
474+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items))
475+
.writer(chunk -> {
476+
for (String item : chunk) {
477+
if ("2".equals(item) || "3".equals(item)) {
478+
throw new RuntimeException("Simulated write error for item: " + item);
479+
}
480+
jdbcTemplate.update("INSERT INTO delivery (item_number) VALUES (?)", item);
481+
}
482+
})
483+
.transactionManager(transactionManager)
484+
.faultTolerant()
485+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
486+
.build();
487+
}
488+
489+
}
490+
491+
@Configuration
492+
static class ConcurrentDuplicateWriteDetectionStepConfiguration {
493+
494+
@Bean
495+
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager,
496+
JdbcTemplate jdbcTemplate) {
497+
List<String> items = List.of("1", "2", "3");
498+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 3).reader(new ListItemReader<>(items))
499+
.writer(chunk -> {
500+
for (String item : chunk) {
501+
if ("2".equals(item) || "3".equals(item)) {
502+
throw new RuntimeException("Simulated write error for item: " + item);
503+
}
504+
jdbcTemplate.update("INSERT INTO delivery (item_number) VALUES (?)", item);
505+
}
506+
})
507+
.transactionManager(transactionManager)
508+
.taskExecutor(new SimpleAsyncTaskExecutor())
509+
.faultTolerant()
510+
.skipPolicy(new AlwaysSkipItemSkipPolicy())
511+
.build();
512+
}
513+
514+
}
515+
417516
}

0 commit comments

Comments
 (0)