Skip to content

Commit c273c8e

Browse files
Merge pull request #18663 from hmdrzsharifi/BAEL-8695
Bael 8695: Retstart a job on failure and continue in Spring Batch
2 parents cb091f3 + ba653fe commit c273c8e

File tree

5 files changed

+253
-0
lines changed

5 files changed

+253
-0
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.baeldung.restartjob;
2+
3+
import org.springframework.batch.core.Job;
4+
import org.springframework.batch.core.Step;
5+
import org.springframework.batch.core.configuration.annotation.StepScope;
6+
import org.springframework.batch.core.job.builder.JobBuilder;
7+
import org.springframework.batch.core.repository.JobRepository;
8+
import org.springframework.batch.core.step.builder.StepBuilder;
9+
import org.springframework.batch.item.ItemProcessor;
10+
import org.springframework.batch.item.ItemWriter;
11+
import org.springframework.batch.item.file.FlatFileItemReader;
12+
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
13+
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
14+
import org.springframework.context.annotation.Bean;
15+
import org.springframework.context.annotation.Configuration;
16+
import org.springframework.core.io.ClassPathResource;
17+
import org.springframework.transaction.PlatformTransactionManager;
18+
19+
@Configuration
20+
public class BatchConfig {
21+
22+
@Bean
23+
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
24+
return new JobBuilder("simpleJob", jobRepository)
25+
.start(step1(jobRepository, transactionManager))
26+
.build();
27+
}
28+
29+
@Bean
30+
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
31+
return new StepBuilder("step1", jobRepository)
32+
.<String, String>chunk(2, transactionManager)
33+
.reader(flatFileItemReader())
34+
.processor(itemProcessor())
35+
.writer(itemWriter())
36+
.build();
37+
}
38+
39+
@Bean
40+
@StepScope
41+
public FlatFileItemReader<String> flatFileItemReader() {
42+
return new FlatFileItemReaderBuilder<String>()
43+
.name("itemReader")
44+
.resource(new ClassPathResource("data.csv"))
45+
.lineMapper(new PassThroughLineMapper())
46+
.saveState(true)
47+
.build();
48+
}
49+
50+
@Bean
51+
public RestartItemProcessor itemProcessor() {
52+
return new RestartItemProcessor();
53+
}
54+
55+
@Bean
56+
public ItemWriter<String> itemWriter() {
57+
return items -> {
58+
System.out.println("Writing items:");
59+
for (String item : items) {
60+
System.out.println("- " + item);
61+
}
62+
};
63+
}
64+
65+
static class RestartItemProcessor implements ItemProcessor<String, String> {
66+
private boolean failOnItem3 = true;
67+
68+
public void setFailOnItem3(boolean failOnItem3) {
69+
this.failOnItem3 = failOnItem3;
70+
}
71+
72+
@Override
73+
public String process(String item) throws Exception {
74+
System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
75+
if (failOnItem3 && item.equals("Item3")) {
76+
throw new RuntimeException("Simulated failure on Item3");
77+
}
78+
return "PROCESSED " + item;
79+
}
80+
}
81+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.baeldung.restartjob;
2+
3+
import java.util.List;
4+
5+
import org.springframework.batch.core.BatchStatus;
6+
import org.springframework.batch.core.Job;
7+
import org.springframework.batch.core.JobExecution;
8+
import org.springframework.batch.core.JobInstance;
9+
import org.springframework.batch.core.JobParameters;
10+
import org.springframework.batch.core.JobParametersBuilder;
11+
import org.springframework.batch.core.explore.JobExplorer;
12+
import org.springframework.batch.core.launch.JobLauncher;
13+
import org.springframework.batch.core.launch.JobOperator;
14+
import org.springframework.boot.CommandLineRunner;
15+
import org.springframework.boot.SpringApplication;
16+
import org.springframework.boot.autoconfigure.SpringBootApplication;
17+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18+
import org.springframework.context.annotation.Bean;
19+
20+
@SpringBootApplication
21+
public class RestartJobBatchApp {
22+
23+
public static void main(String[] args) {
24+
SpringApplication app = new SpringApplication(RestartJobBatchApp.class);
25+
app.setAdditionalProfiles("restart");
26+
app.run(args);
27+
}
28+
29+
@Bean
30+
@ConditionalOnProperty(prefix = "job.autorun", name = "enabled", havingValue = "true", matchIfMissing = true)
31+
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
32+
JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
33+
return args -> {
34+
JobParameters jobParameters = new JobParametersBuilder()
35+
.addString("jobId", "test-job-" + System.currentTimeMillis())
36+
.toJobParameters();
37+
38+
List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
39+
if (!instances.isEmpty()) {
40+
JobInstance lastInstance = instances.get(0);
41+
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
42+
if (!executions.isEmpty()) {
43+
JobExecution lastExecution = executions.get(0);
44+
if (lastExecution.getStatus() == BatchStatus.FAILED) {
45+
System.out.println("Restarting failed job execution with ID: " + lastExecution.getId());
46+
itemProcessor.setFailOnItem3(false);
47+
48+
JobExecution restartedExecution = jobLauncher.run(job, jobParameters);
49+
50+
// final Long restartId = jobOperator.restart(lastExecution.getId());
51+
// final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution);
52+
53+
System.out.println("Restarted job status: " + restartedExecution.getStatus());
54+
return;
55+
}
56+
}
57+
}
58+
59+
System.out.println("Starting new job execution...");
60+
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
61+
System.out.println("Job started with status: " + jobExecution.getStatus());
62+
};
63+
}
64+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
spring.datasource.url=jdbc:h2:~/test;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
2+
spring.datasource.username=sa
3+
spring.datasource.password=
4+
spring.datasource.driver-class-name=org.h2.Driver
5+
6+
spring.jpa.hibernate.ddl-auto=create-drop
7+
spring.jpa.properties.hibernate.format_sql=true
8+
9+
spring.batch.jdbc.initialize-schema=always
10+
spring.sql.init.mode=always
11+
spring.batch.jdbc.table-prefix=BATCH_
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Item1
2+
Item2
3+
Item3
4+
Item4
5+
Item5
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.baeldung.restartjob;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.io.IOException;
6+
import java.nio.file.Files;
7+
import java.nio.file.Path;
8+
import java.nio.file.StandardCopyOption;
9+
10+
import org.junit.jupiter.api.Test;
11+
import org.springframework.batch.core.BatchStatus;
12+
import org.springframework.batch.core.Job;
13+
import org.springframework.batch.core.JobExecution;
14+
import org.springframework.batch.core.JobParameters;
15+
import org.springframework.batch.core.JobParametersBuilder;
16+
import org.springframework.batch.core.launch.JobLauncher;
17+
import org.springframework.batch.test.JobLauncherTestUtils;
18+
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.boot.test.context.SpringBootTest;
20+
import org.springframework.boot.test.context.TestConfiguration;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Import;
23+
import org.springframework.core.io.ClassPathResource;
24+
import org.springframework.core.io.Resource;
25+
26+
@SpringBootTest(classes = {RestartJobBatchApp.class, BatchConfig.class},
27+
properties = {"job.autorun.enabled=false"})
28+
@Import(RestartJobBatchAppIntegrationTest.TestConfig.class)
29+
public class RestartJobBatchAppIntegrationTest {
30+
31+
@Autowired
32+
private JobLauncherTestUtils jobLauncherTestUtils;
33+
34+
@Autowired
35+
private BatchConfig.RestartItemProcessor itemProcessor;
36+
37+
@TestConfiguration
38+
static class TestConfig {
39+
@Autowired
40+
private JobLauncher jobLauncher;
41+
42+
@Autowired
43+
private Job job;
44+
45+
@Bean
46+
public JobLauncherTestUtils jobLauncherTestUtils() {
47+
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
48+
jobLauncherTestUtils.setJobLauncher(jobLauncher);
49+
jobLauncherTestUtils.setJob(job);
50+
return jobLauncherTestUtils;
51+
}
52+
}
53+
54+
private final Resource inputFile = new ClassPathResource("data.csv");
55+
56+
@Test
57+
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
58+
// Given
59+
createTestFile("Item1\nItem2\nItem3\nItem4");
60+
61+
JobParameters jobParameters = new JobParametersBuilder()
62+
.addLong("time", System.currentTimeMillis())
63+
.toJobParameters();
64+
65+
// When
66+
JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
67+
assertEquals(BatchStatus.FAILED, firstExecution.getStatus());
68+
69+
Long executionId = firstExecution.getId();
70+
71+
itemProcessor.setFailOnItem3(false);
72+
73+
// Then
74+
JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
75+
76+
assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());
77+
78+
assertEquals(
79+
firstExecution.getJobInstance().getInstanceId(),
80+
restartedExecution.getJobInstance().getInstanceId()
81+
);
82+
}
83+
84+
private void createTestFile(String content) throws IOException {
85+
Path tempFile = Files.createTempFile("test-data", ".csv");
86+
Files.write(tempFile, content.getBytes());
87+
Files.copy(tempFile, inputFile.getFile().toPath(), StandardCopyOption.REPLACE_EXISTING);
88+
}
89+
90+
}
91+
92+

0 commit comments

Comments
 (0)