Skip to content

Commit 525d236

Browse files
wynnteoWynn Teo
andauthored
BAEL-9303 (#18624)
Co-authored-by: Wynn Teo <[email protected]>
1 parent a153d5b commit 525d236

File tree

10 files changed

+412
-0
lines changed

10 files changed

+412
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.baeldung.multiprocessorandwriter;
2+
3+
import org.springframework.batch.core.Job;
4+
import org.springframework.batch.core.JobParameters;
5+
import org.springframework.batch.core.JobParametersBuilder;
6+
import org.springframework.batch.core.launch.JobLauncher;
7+
import org.springframework.boot.CommandLineRunner;
8+
import org.springframework.boot.SpringApplication;
9+
import org.springframework.boot.autoconfigure.SpringBootApplication;
10+
import org.springframework.context.annotation.Bean;
11+
12+
@SpringBootApplication
13+
public class BatchApp {
14+
15+
public static void main(String[] args) {
16+
SpringApplication.run(BatchApp.class, args);
17+
}
18+
19+
@Bean
20+
CommandLineRunner run(JobLauncher jobLauncher, Job job) {
21+
return args -> {
22+
JobParameters parameters = new JobParametersBuilder().addLong("startAt", System.currentTimeMillis())
23+
.toJobParameters();
24+
jobLauncher.run(job, parameters);
25+
};
26+
}
27+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.baeldung.multiprocessorandwriter.config;
2+
3+
import java.util.List;
4+
5+
import javax.sql.DataSource;
6+
7+
import org.springframework.batch.core.Job;
8+
import org.springframework.batch.core.Step;
9+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
10+
import org.springframework.batch.core.job.builder.JobBuilder;
11+
import org.springframework.batch.core.repository.JobRepository;
12+
import org.springframework.batch.core.step.builder.StepBuilder;
13+
import org.springframework.batch.item.database.JdbcBatchItemWriter;
14+
import org.springframework.batch.item.database.JpaItemWriter;
15+
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
16+
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
17+
import org.springframework.batch.item.file.FlatFileItemReader;
18+
import org.springframework.batch.item.file.FlatFileItemWriter;
19+
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
20+
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
21+
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
22+
import org.springframework.batch.item.support.CompositeItemWriter;
23+
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.boot.jdbc.DataSourceBuilder;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.core.io.ClassPathResource;
29+
import org.springframework.core.io.FileSystemResource;
30+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
31+
import org.springframework.transaction.PlatformTransactionManager;
32+
33+
import com.baeldung.multiprocessorandwriter.model.Customer;
34+
import com.baeldung.multiprocessorandwriter.processor.CustomerProcessorRouter;
35+
import com.baeldung.multiprocessorandwriter.processor.TypeAProcessor;
36+
import com.baeldung.multiprocessorandwriter.processor.TypeBProcessor;
37+
38+
import jakarta.persistence.EntityManagerFactory;
39+
40+
@Configuration
41+
@EnableBatchProcessing
42+
public class BatchConfig {
43+
44+
@Bean
45+
public FlatFileItemReader<Customer> customerReader() {
46+
return new FlatFileItemReaderBuilder<Customer>().name("customerItemReader")
47+
.resource(new ClassPathResource("customers.csv"))
48+
.delimited()
49+
.names("id", "name", "email", "type")
50+
.linesToSkip(1)
51+
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
52+
setTargetType(Customer.class);
53+
}})
54+
.build();
55+
}
56+
57+
@Bean
58+
public TypeAProcessor typeAProcessor() {
59+
return new TypeAProcessor();
60+
}
61+
62+
@Bean
63+
public TypeBProcessor typeBProcessor() {
64+
return new TypeBProcessor();
65+
}
66+
67+
@Bean
68+
public CustomerProcessorRouter processorRouter(TypeAProcessor typeAProcessor, TypeBProcessor typeBProcessor) {
69+
return new CustomerProcessorRouter(typeAProcessor, typeBProcessor);
70+
}
71+
72+
@Bean
73+
public JpaItemWriter<Customer> jpaDBWriter(EntityManagerFactory entityManagerFactory) {
74+
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
75+
writer.setEntityManagerFactory(entityManagerFactory);
76+
return writer;
77+
}
78+
79+
@Bean
80+
public FlatFileItemWriter<Customer> fileWriter() {
81+
return new FlatFileItemWriterBuilder<Customer>().name("customerItemWriter")
82+
.resource(new FileSystemResource("output/processed_customers.txt"))
83+
.delimited()
84+
.delimiter(",")
85+
.names("id", "name", "email", "type")
86+
.build();
87+
}
88+
89+
@Bean
90+
public CompositeItemWriter<Customer> compositeWriter(JpaItemWriter<Customer> jpaDBWriter, FlatFileItemWriter<Customer> fileWriter) {
91+
return new CompositeItemWriterBuilder<Customer>().delegates(List.of(jpaDBWriter, fileWriter))
92+
.build();
93+
}
94+
95+
@Bean
96+
public Step processCustomersStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, FlatFileItemReader<Customer> reader,
97+
CustomerProcessorRouter processorRouter, CompositeItemWriter<Customer> compositeWriter) {
98+
return new StepBuilder("processCustomersStep", jobRepository).<Customer, Customer> chunk(10, transactionManager)
99+
.reader(reader)
100+
.processor(processorRouter)
101+
.writer(compositeWriter)
102+
.build();
103+
}
104+
105+
@Bean
106+
public Job processCustomersJob(JobRepository jobRepository, Step processCustomersStep) {
107+
return new JobBuilder("customerProcessingJob", jobRepository).start(processCustomersStep)
108+
.build();
109+
}
110+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.baeldung.multiprocessorandwriter.model;
2+
3+
import jakarta.persistence.Entity;
4+
import jakarta.persistence.Id;
5+
6+
@Entity
7+
public class Customer {
8+
@Id
9+
private Long id;
10+
private String name;
11+
private String email;
12+
private String type;
13+
14+
public Customer() {}
15+
16+
public Customer(Long id, String name, String email, String type) {
17+
this.id = id;
18+
this.name = name;
19+
this.email = email;
20+
this.type = type;
21+
}
22+
23+
public Long getId() { return id; }
24+
public void setId(Long id) { this.id = id; }
25+
public String getName() { return name; }
26+
public void setName(String name) { this.name = name; }
27+
public String getEmail() { return email; }
28+
public void setEmail(String email) { this.email = email; }
29+
public String getType() { return type; }
30+
public void setType(String type) { this.type = type; }
31+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.multiprocessorandwriter.processor;
2+
3+
import org.springframework.batch.item.ItemProcessor;
4+
5+
import com.baeldung.multiprocessorandwriter.model.Customer;
6+
7+
public class CustomerProcessorRouter implements ItemProcessor<Customer, Customer> {
8+
private final TypeAProcessor typeAProcessor;
9+
private final TypeBProcessor typeBProcessor;
10+
11+
public CustomerProcessorRouter(TypeAProcessor typeAProcessor,
12+
TypeBProcessor typeBProcessor) {
13+
this.typeAProcessor = typeAProcessor;
14+
this.typeBProcessor = typeBProcessor;
15+
}
16+
17+
@Override
18+
public Customer process(Customer customer) throws Exception {
19+
if ("A".equals(customer.getType())) {
20+
return typeAProcessor.process(customer);
21+
} else if ("B".equals(customer.getType())) {
22+
return typeBProcessor.process(customer);
23+
}
24+
return customer;
25+
}
26+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.baeldung.multiprocessorandwriter.processor;
2+
3+
import org.springframework.batch.item.ItemProcessor;
4+
5+
import com.baeldung.multiprocessorandwriter.model.Customer;
6+
7+
public class TypeAProcessor implements ItemProcessor<Customer, Customer> {
8+
@Override
9+
public Customer process(Customer customer) {
10+
customer.setName(customer.getName().toUpperCase());
11+
customer.setEmail("A_" + customer.getEmail());
12+
return customer;
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.baeldung.multiprocessorandwriter.processor;
2+
3+
import org.springframework.batch.item.ItemProcessor;
4+
5+
import com.baeldung.multiprocessorandwriter.model.Customer;
6+
7+
public class TypeBProcessor implements ItemProcessor<Customer, Customer> {
8+
@Override
9+
public Customer process(Customer customer) {
10+
customer.setName(customer.getName().toLowerCase());
11+
customer.setEmail("B_" + customer.getEmail());
12+
return customer;
13+
}
14+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
spring.datasource.url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
2+
spring.datasource.username=sa
3+
spring.datasource.password=
4+
spring.datasource.driver-class-name=org.h2.Driver
5+
6+
spring.jpa.hibernate.ddl-auto=create-drop
7+
spring.jpa.show-sql=true
8+
spring.jpa.properties.hibernate.format_sql=true
9+
10+
spring.batch.jdbc.initialize-schema=always
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
id,name,email,type
2+
3+
4+
5+
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.baeldung.multiprocessorandwriter;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
7+
import java.io.File;
8+
import java.io.IOException;
9+
import java.nio.file.Files;
10+
import java.nio.file.Path;
11+
import java.nio.file.Paths;
12+
import java.sql.Connection;
13+
import java.sql.SQLException;
14+
import java.util.List;
15+
16+
import javax.sql.DataSource;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
import org.springframework.batch.core.BatchStatus;
21+
import org.springframework.batch.core.ExitStatus;
22+
import org.springframework.batch.core.Job;
23+
import org.springframework.batch.core.JobExecution;
24+
import org.springframework.batch.core.JobParameters;
25+
import org.springframework.batch.core.JobParametersBuilder;
26+
import org.springframework.batch.core.StepExecution;
27+
import org.springframework.batch.core.launch.JobLauncher;
28+
import org.springframework.batch.item.ExecutionContext;
29+
import org.springframework.batch.item.ItemStreamReader;
30+
import org.springframework.batch.item.file.FlatFileItemReader;
31+
import org.springframework.batch.test.JobLauncherTestUtils;
32+
import org.springframework.batch.test.context.SpringBatchTest;
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
35+
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
36+
import org.springframework.boot.test.context.SpringBootTest;
37+
import org.springframework.boot.test.context.TestConfiguration;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Import;
40+
import org.springframework.core.io.ClassPathResource;
41+
import org.springframework.jdbc.core.JdbcTemplate;
42+
import org.springframework.jdbc.datasource.init.ScriptUtils;
43+
import org.springframework.test.context.ActiveProfiles;
44+
import org.springframework.test.context.ContextConfiguration;
45+
import org.springframework.test.context.TestPropertySource;
46+
47+
import com.baeldung.multiprocessorandwriter.config.BatchConfig;
48+
49+
import com.baeldung.multiprocessorandwriter.model.Customer;
50+
import com.baeldung.multiprocessorandwriter.processor.CustomerProcessorRouter;
51+
52+
@SpringBootTest
53+
@EnableAutoConfiguration
54+
@ContextConfiguration(classes = { BatchConfig.class, JpaTestConfig.class})
55+
@TestPropertySource(locations = "classpath:application-test.properties")
56+
@Import(BatchJobIntegrationTest.TestConfig.class)
57+
public class BatchJobIntegrationTest {
58+
59+
60+
@Autowired
61+
private JobLauncherTestUtils jobLauncherTestUtils;
62+
@TestConfiguration
63+
static class TestConfig {
64+
@Autowired
65+
private JobLauncher jobLauncher;
66+
67+
@Autowired
68+
private Job job;
69+
70+
@Bean
71+
public JobLauncherTestUtils jobLauncherTestUtils() {
72+
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
73+
jobLauncherTestUtils.setJobLauncher(jobLauncher);
74+
jobLauncherTestUtils.setJob(job);
75+
return jobLauncherTestUtils;
76+
}
77+
}
78+
@Autowired
79+
private DataSource dataSource;
80+
81+
private Path outputFile;
82+
83+
@BeforeEach
84+
public void setup() throws IOException {
85+
try (Connection connection = dataSource.getConnection()) {
86+
ScriptUtils.executeSqlScript(connection, new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
87+
} catch (SQLException e) {
88+
throw new RuntimeException(e);
89+
}
90+
outputFile = Paths.get("output/processed_customers.txt");
91+
Files.deleteIfExists(outputFile); // clean output file before test
92+
}
93+
94+
@Test
95+
public void givenTypeA_whenProcess_thenNameIsUppercaseAndEmailPrefixedWithA() throws Exception {
96+
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
97+
98+
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
99+
100+
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
101+
List<Customer> dbCustomers = jdbcTemplate.query(
102+
"SELECT id, name, email, type FROM customer WHERE type = 'A'",
103+
(rs, rowNum) -> new Customer(
104+
rs.getLong("id"),
105+
rs.getString("name"),
106+
rs.getString("email"),
107+
rs.getString("type"))
108+
);
109+
110+
assertFalse(dbCustomers.isEmpty());
111+
112+
dbCustomers.forEach(c -> {
113+
assertEquals(c.getName(), c.getName().toUpperCase());
114+
assertTrue(c.getEmail().startsWith("A_"));
115+
});
116+
}
117+
118+
@Test
119+
public void givenTypeB_whenProcess_thenNameIsLowercaseAndEmailPrefixedWithB() throws Exception {
120+
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
121+
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
122+
123+
assertTrue(Files.exists(outputFile));
124+
List<String> lines = Files.readAllLines(outputFile);
125+
126+
boolean hasTypeB = lines.stream().anyMatch(line -> line.endsWith(",B"));
127+
assertTrue(hasTypeB);
128+
129+
lines.forEach(line -> {
130+
String[] parts = line.split(",");
131+
if ("B".equals(parts[3])) {
132+
assertEquals(parts[1], parts[1].toLowerCase());
133+
assertTrue(parts[2].startsWith("B_"));
134+
}
135+
});
136+
}
137+
}

0 commit comments

Comments
 (0)