Skip to content

Commit bf66bae

Browse files
committed
Add an example for bulk imports with SpringBatch
1 parent 67c069f commit bf66bae

File tree

9 files changed

+289
-24
lines changed

9 files changed

+289
-24
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Copyright 2016-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package examples.springbatch.bulkinsert;
17+
18+
import static examples.springbatch.mapper.PersonDynamicSqlSupport.firstName;
19+
import static examples.springbatch.mapper.PersonDynamicSqlSupport.forPagingTest;
20+
import static examples.springbatch.mapper.PersonDynamicSqlSupport.lastName;
21+
22+
import javax.sql.DataSource;
23+
24+
import org.apache.ibatis.session.SqlSessionFactory;
25+
import org.mybatis.dynamic.sql.insert.InsertDSL;
26+
import org.mybatis.dynamic.sql.render.RenderingStrategies;
27+
import org.mybatis.spring.SqlSessionFactoryBean;
28+
import org.mybatis.spring.annotation.MapperScan;
29+
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
30+
import org.springframework.batch.core.Job;
31+
import org.springframework.batch.core.Step;
32+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
33+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
34+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
35+
import org.springframework.batch.core.launch.support.RunIdIncrementer;
36+
import org.springframework.batch.item.ItemProcessor;
37+
import org.springframework.batch.item.ItemWriter;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.context.annotation.Bean;
40+
import org.springframework.context.annotation.ComponentScan;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
43+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
44+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
45+
import org.springframework.transaction.PlatformTransactionManager;
46+
47+
import examples.springbatch.common.PersonRecord;
48+
import examples.springbatch.mapper.PersonDynamicSqlSupport;
49+
import examples.springbatch.mapper.PersonMapper;
50+
51+
@EnableBatchProcessing
52+
@Configuration
53+
@ComponentScan("examples.springbatch.bulkinsert")
54+
@ComponentScan("examples.springbatch.common")
55+
@MapperScan("examples.springbatch.mapper")
56+
public class BulkInsertConfiguration {
57+
58+
@Autowired
59+
private JobBuilderFactory jobBuilderFactory;
60+
61+
@Autowired
62+
private StepBuilderFactory stepBuilderFactory;
63+
64+
@Bean
65+
public DataSource dataSource() {
66+
return new EmbeddedDatabaseBuilder()
67+
.setType(EmbeddedDatabaseType.HSQL)
68+
.addScript("classpath:/org/springframework/batch/core/schema-drop-hsqldb.sql")
69+
.addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
70+
.addScript("classpath:/examples/springbatch/schema.sql")
71+
.build();
72+
}
73+
74+
@Bean
75+
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
76+
SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
77+
sessionFactory.setDataSource(dataSource);
78+
return sessionFactory.getObject();
79+
}
80+
81+
@Bean
82+
public PlatformTransactionManager transactionManager(DataSource dataSource) {
83+
return new DataSourceTransactionManager(dataSource);
84+
}
85+
86+
@Bean
87+
public MyBatisBatchItemWriter<PersonRecord> writer(SqlSessionFactory sqlSessionFactory) {
88+
MyBatisBatchItemWriter<PersonRecord> writer = new MyBatisBatchItemWriter<>();
89+
writer.setSqlSessionFactory(sqlSessionFactory);
90+
91+
writer.setItemToParameterConverter(record -> InsertDSL.insert(record)
92+
.into(PersonDynamicSqlSupport.person)
93+
.map(firstName).toProperty("firstName")
94+
.map(lastName).toProperty("lastName")
95+
.map(forPagingTest).toStringConstant("false")
96+
.build()
97+
.render(RenderingStrategies.MYBATIS3));
98+
99+
writer.setStatementId(PersonMapper.class.getName() + ".insert");
100+
return writer;
101+
}
102+
103+
@Bean
104+
public Step step1(ItemProcessor<PersonRecord, PersonRecord> processor, ItemWriter<PersonRecord> writer) {
105+
return stepBuilderFactory.get("step1")
106+
.<PersonRecord, PersonRecord>chunk(10)
107+
.reader(new TestRecordGenerator())
108+
.processor(processor)
109+
.writer(writer)
110+
.build();
111+
}
112+
113+
@Bean
114+
public Job insertRecords(Step step1) {
115+
return jobBuilderFactory.get("insertRecords")
116+
.incrementer(new RunIdIncrementer())
117+
.flow(step1)
118+
.end()
119+
.build();
120+
}
121+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2016-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package examples.springbatch.bulkinsert;
17+
18+
import static examples.springbatch.mapper.PersonDynamicSqlSupport.*;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.apache.ibatis.session.SqlSession;
22+
import org.apache.ibatis.session.SqlSessionFactory;
23+
import org.junit.jupiter.api.Test;
24+
import org.mybatis.dynamic.sql.render.RenderingStrategies;
25+
import org.mybatis.dynamic.sql.select.CountDSL;
26+
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
27+
import org.springframework.batch.core.ExitStatus;
28+
import org.springframework.batch.core.JobExecution;
29+
import org.springframework.batch.core.StepExecution;
30+
import org.springframework.batch.item.ExecutionContext;
31+
import org.springframework.batch.test.JobLauncherTestUtils;
32+
import org.springframework.batch.test.context.SpringBatchTest;
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
35+
36+
import examples.springbatch.mapper.PersonMapper;
37+
38+
@SpringBatchTest
39+
@SpringJUnitConfig(classes=BulkInsertConfiguration.class)
40+
public class SpringBatchBulkInsertTest {
41+
42+
@Autowired
43+
private JobLauncherTestUtils jobLauncherTestUtils;
44+
45+
@Autowired
46+
private SqlSessionFactory sqlSessionFactory;
47+
48+
@Test
49+
public void testThatRowsAreInserted() throws Exception {
50+
// starting condition
51+
assertThat(rowCount()).isEqualTo(0);
52+
53+
JobExecution execution = jobLauncherTestUtils.launchJob();
54+
assertThat(execution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
55+
assertThat(numberOfRowsProcessed(execution)).isEqualTo(TestRecordGenerator.recordCount());
56+
57+
// ending condition
58+
assertThat(rowCount()).isEqualTo(TestRecordGenerator.recordCount());
59+
}
60+
61+
private int numberOfRowsProcessed(JobExecution jobExecution) {
62+
return jobExecution.getStepExecutions().stream()
63+
.map(StepExecution::getExecutionContext)
64+
.mapToInt(this::getRowCount)
65+
.sum();
66+
}
67+
68+
private int getRowCount(ExecutionContext executionContext) {
69+
return executionContext.getInt("row_count", 0);
70+
}
71+
72+
private long rowCount() throws Exception {
73+
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
74+
PersonMapper personMapper = sqlSession.getMapper(PersonMapper.class);
75+
76+
SelectStatementProvider selectStatement = CountDSL.countFrom(person)
77+
.build()
78+
.render(RenderingStrategies.MYBATIS3);
79+
80+
return personMapper.count(selectStatement);
81+
}
82+
}
83+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright 2016-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package examples.springbatch.bulkinsert;
17+
18+
import org.springframework.batch.item.ItemReader;
19+
20+
import examples.springbatch.common.PersonRecord;
21+
22+
public class TestRecordGenerator implements ItemReader<PersonRecord> {
23+
24+
private int index = 0;
25+
26+
private static PersonRecord[] testRecords = {
27+
new PersonRecord("Fred", "Flintstone"),
28+
new PersonRecord("Wilma", "Flintstone"),
29+
new PersonRecord("Pebbles", "Flintstone"),
30+
new PersonRecord("Barney", "Rubble"),
31+
new PersonRecord("Betty", "Rubble"),
32+
new PersonRecord("Bamm Bamm", "Rubble")
33+
};
34+
35+
@Override
36+
public PersonRecord read() {
37+
if (index < testRecords.length) {
38+
return(testRecords[index++]);
39+
} else {
40+
return null;
41+
}
42+
}
43+
44+
public static int recordCount() {
45+
return testRecords.length;
46+
}
47+
}

src/test/java/examples/springbatch/common/PersonProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
import org.springframework.stereotype.Component;
2525

2626
@Component
27-
public class PersonProcessor implements ItemProcessor<Person, Person> {
27+
public class PersonProcessor implements ItemProcessor<PersonRecord, PersonRecord> {
2828

2929
private ExecutionContext executionContext;
3030

3131
@Override
32-
public Person process(Person person) throws Exception {
32+
public PersonRecord process(PersonRecord person) throws Exception {
3333
incrementRowCount();
3434

35-
Person transformed = new Person();
35+
PersonRecord transformed = new PersonRecord();
3636
transformed.setId(person.getId());
3737
transformed.setFirstName(person.getFirstName().toUpperCase());
3838
transformed.setLastName(person.getLastName().toUpperCase());

src/test/java/examples/springbatch/common/Person.java renamed to src/test/java/examples/springbatch/common/PersonRecord.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,20 @@
1515
*/
1616
package examples.springbatch.common;
1717

18-
public class Person {
18+
public class PersonRecord {
1919
private Integer id;
2020
private String firstName;
2121
private String lastName;
2222

23+
public PersonRecord() {
24+
super();
25+
}
26+
27+
public PersonRecord(String firstName, String lastName) {
28+
this.firstName = firstName;
29+
this.lastName = lastName;
30+
}
31+
2332
public Integer getId() {
2433
return id;
2534
}

src/test/java/examples/springbatch/common/UpdateStatementConvertor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import org.springframework.stereotype.Component;
2626

2727
@Component
28-
public class UpdateStatementConvertor implements Converter<Person, UpdateStatementProvider> {
28+
public class UpdateStatementConvertor implements Converter<PersonRecord, UpdateStatementProvider> {
2929

3030
@Override
31-
public UpdateStatementProvider convert(Person source) {
31+
public UpdateStatementProvider convert(PersonRecord source) {
3232
return UpdateDSL.update(person)
3333
.set(firstName).equalTo(source::getFirstName)
3434
.set(lastName).equalTo(source::getLastName)

src/test/java/examples/springbatch/cursor/CursorReaderBatchConfiguration.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
4949
import org.springframework.transaction.PlatformTransactionManager;
5050

51-
import examples.springbatch.common.Person;
51+
import examples.springbatch.common.PersonRecord;
5252
import examples.springbatch.mapper.PersonMapper;
5353

5454
@EnableBatchProcessing
@@ -87,34 +87,34 @@ public PlatformTransactionManager transactionManager(DataSource dataSource) {
8787
}
8888

8989
@Bean
90-
public MyBatisCursorItemReader<Person> reader(SqlSessionFactory sqlSessionFactory) {
90+
public MyBatisCursorItemReader<PersonRecord> reader(SqlSessionFactory sqlSessionFactory) {
9191
SelectStatementProvider selectStatement = SpringBatchUtility.selectForCursor(person.allColumns())
9292
.from(person)
9393
.where(lastName, isEqualTo("flintstone"))
9494
.build()
9595
.render();
9696

97-
MyBatisCursorItemReader<Person> reader = new MyBatisCursorItemReader<>();
97+
MyBatisCursorItemReader<PersonRecord> reader = new MyBatisCursorItemReader<>();
9898
reader.setQueryId(PersonMapper.class.getName() + ".selectMany");
9999
reader.setSqlSessionFactory(sqlSessionFactory);
100100
reader.setParameterValues(SpringBatchUtility.toParameterValues(selectStatement));
101101
return reader;
102102
}
103103

104104
@Bean
105-
public MyBatisBatchItemWriter<Person> writer(SqlSessionFactory sqlSessionFactory,
106-
Converter<Person, UpdateStatementProvider> convertor) {
107-
MyBatisBatchItemWriter<Person> writer = new MyBatisBatchItemWriter<>();
105+
public MyBatisBatchItemWriter<PersonRecord> writer(SqlSessionFactory sqlSessionFactory,
106+
Converter<PersonRecord, UpdateStatementProvider> convertor) {
107+
MyBatisBatchItemWriter<PersonRecord> writer = new MyBatisBatchItemWriter<>();
108108
writer.setSqlSessionFactory(sqlSessionFactory);
109109
writer.setItemToParameterConverter(convertor);
110110
writer.setStatementId(PersonMapper.class.getName() + ".update");
111111
return writer;
112112
}
113113

114114
@Bean
115-
public Step step1(ItemReader<Person> reader, ItemProcessor<Person, Person> processor, ItemWriter<Person> writer) {
115+
public Step step1(ItemReader<PersonRecord> reader, ItemProcessor<PersonRecord, PersonRecord> processor, ItemWriter<PersonRecord> writer) {
116116
return stepBuilderFactory.get("step1")
117-
.<Person, Person>chunk(10)
117+
.<PersonRecord, PersonRecord>chunk(10)
118118
.reader(reader)
119119
.processor(processor)
120120
.writer(writer)

src/test/java/examples/springbatch/mapper/PersonMapper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
import java.util.List;
1919
import java.util.Map;
2020

21+
import org.apache.ibatis.annotations.InsertProvider;
2122
import org.apache.ibatis.annotations.Mapper;
2223
import org.apache.ibatis.annotations.Result;
2324
import org.apache.ibatis.annotations.Results;
2425
import org.apache.ibatis.annotations.SelectProvider;
2526
import org.apache.ibatis.annotations.UpdateProvider;
27+
import org.mybatis.dynamic.sql.insert.render.InsertStatementProvider;
2628
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
2729
import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
2830
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
2931
import org.mybatis.dynamic.sql.util.springbatch.SpringBatchProviderAdapter;
3032

31-
import examples.springbatch.common.Person;
33+
import examples.springbatch.common.PersonRecord;
3234

3335
@Mapper
3436
public interface PersonMapper {
@@ -39,11 +41,14 @@ public interface PersonMapper {
3941
@Result(column="first_name", property="firstName"),
4042
@Result(column="last_name", property="lastName")
4143
})
42-
List<Person> selectMany(Map<String, Object> parameterValues);
44+
List<PersonRecord> selectMany(Map<String, Object> parameterValues);
4345

4446
@UpdateProvider(type=SqlProviderAdapter.class, method="update")
4547
int update(UpdateStatementProvider updateStatement);
4648

4749
@SelectProvider(type=SqlProviderAdapter.class, method="select")
4850
long count(SelectStatementProvider selectStatement);
51+
52+
@InsertProvider(type=SqlProviderAdapter.class, method="insert")
53+
int insert(InsertStatementProvider<PersonRecord> insertStatement);
4954
}

0 commit comments

Comments
 (0)