Skip to content

Commit f176d65

Browse files
committed
Add CompositeItemReader feature
1 parent 678b8bb commit f176d65

File tree

8 files changed

+290
-1
lines changed

8 files changed

+290
-1
lines changed

README.md

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33
This repository contains experimental features in Spring Batch.
44
Experimental features are *not* intended to be used in production.
55
They are shared here to be explored by the community and to gather feedback.
6+
Please refer to the [Enabling experimental features](#enabling-experimental-features) section for more details about how to enable experimental features.
7+
8+
The currently available experimental features are the following:
9+
10+
* [MongoDB job repository](#mongodb-job-repository)
11+
* [Composite item reader](#composite-item-reader)
612

713
**Important note:** The versioning in this repository follows the [semantic versioning specification](https://semver.org/#spec-item-4).
814
Public APIs as well as the implementations should not be considered stable and may change at any time :exclamation:
@@ -44,7 +50,7 @@ To test experimental features, you need to add the following dependency in your
4450

4551
Depending on the feature you are testing, other dependencies might be required.
4652

47-
# MongoDB as data store for batch meta-data
53+
# MongoDB job repository
4854

4955
*Original issue:* https://github.com/spring-projects/spring-batch/issues/877
5056

@@ -90,6 +96,48 @@ Those can be defined as Spring beans in the application context as described in
9096

9197
You can find a complete example in the [MongoDBJobRepositoryIntegrationTests](./src/test/java/org/springframework/batch/experimental/core/repository/support/MongoDBJobRepositoryIntegrationTests.java) file.
9298

99+
# Composite item reader
100+
101+
*Original issue:* https://github.com/spring-projects/spring-batch/issues/757
102+
103+
This feature introduces a composite `ItemReader` implementation. Similar to the `CompositeItemProcessor` and `CompositeItemWriter`, the idea is to delegate reading to a list of item readers in order.
104+
This is useful when there is a requirement to read data having the same format from different sources (files, databases, etc). Here is an example:
105+
106+
```java
107+
record Person(int id, String name) {}
108+
109+
@Bean
110+
public FlatFileItemReader<Person> fileItemReader() {
111+
return new FlatFileItemReaderBuilder<Person>()
112+
.name("fileItemReader")
113+
.resource(new ClassPathResource("persons.csv"))
114+
.delimited()
115+
.names("id", "name")
116+
.targetType(Person.class)
117+
.build();
118+
}
119+
120+
@Bean
121+
public JdbcCursorItemReader<Person> databaseItemReader() {
122+
String sql = "select * from persons";
123+
return new JdbcCursorItemReaderBuilder<Person>()
124+
.name("databaseItemReader")
125+
.dataSource(dataSource())
126+
.sql(sql)
127+
.rowMapper(new DataClassRowMapper<>(Person.class))
128+
.build();
129+
}
130+
131+
@Bean
132+
public CompositeItemReader<Person> itemReader() {
133+
return new CompositeItemReader<>(Arrays.asList(fileItemReader(), databaseItemReader()));
134+
}
135+
```
136+
137+
This snippet configures a `CompositeItemReader` with two delegates to read the same data from a flat file and a database table.
138+
139+
You can find a complete example in the [CompositeItemReaderIntegrationTests](./src/test/java/org/springframework/batch/experimental/item/support/CompositeItemReaderIntegrationTests.java) file.
140+
93141
# Contribute
94142

95143
The best way to contribute to this project is by trying out the experimental features and sharing your feedback!

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
<junit-jupiter.version>5.10.0</junit-jupiter.version>
5454
<spring-test.version>6.0.13</spring-test.version>
5555
<testcontainers.version>1.19.1</testcontainers.version>
56+
<h2.version>2.2.224</h2.version>
5657
<slf4j.version>2.0.9</slf4j.version>
5758
</properties>
5859

@@ -99,6 +100,12 @@
99100
<version>${testcontainers.version}</version>
100101
<scope>test</scope>
101102
</dependency>
103+
<dependency>
104+
<groupId>com.h2database</groupId>
105+
<artifactId>h2</artifactId>
106+
<version>${h2.version}</version>
107+
<scope>test</scope>
108+
</dependency>
102109
<dependency>
103110
<groupId>org.slf4j</groupId>
104111
<artifactId>slf4j-simple</artifactId>
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.experimental.item.support;
17+
18+
import java.util.Iterator;
19+
import java.util.List;
20+
21+
import org.springframework.batch.item.ExecutionContext;
22+
import org.springframework.batch.item.ItemStreamException;
23+
import org.springframework.batch.item.ItemStreamReader;
24+
25+
/**
26+
* Composite reader that delegates reading to a list of {@link ItemStreamReader}s.
27+
* This implementation is not thread-safe and not restartable.
28+
*
29+
* @author Mahmoud Ben Hassine
30+
* @param <T> type of objects to read
31+
*/
32+
public class CompositeItemReader<T> implements ItemStreamReader<T> {
33+
34+
private final List<ItemStreamReader<T>> delegates;
35+
36+
private final Iterator<ItemStreamReader<T>> delegatesIterator;
37+
38+
private ItemStreamReader<T> currentDelegate;
39+
40+
public CompositeItemReader(List<ItemStreamReader<T>> delegates) {
41+
this.delegates = delegates;
42+
this.delegatesIterator = this.delegates.iterator();
43+
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
44+
}
45+
46+
@Override
47+
public void open(ExecutionContext executionContext) throws ItemStreamException {
48+
for (ItemStreamReader<T> delegate : delegates) {
49+
delegate.open(executionContext);
50+
}
51+
}
52+
53+
@Override
54+
public T read() throws Exception {
55+
if (this.currentDelegate == null) {
56+
return null;
57+
}
58+
T item = currentDelegate.read();
59+
if (item == null) {
60+
currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
61+
return read();
62+
}
63+
return item;
64+
}
65+
66+
@Override
67+
public void close() throws ItemStreamException {
68+
for (ItemStreamReader<T> delegate : delegates) {
69+
delegate.close();
70+
}
71+
}
72+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.experimental.item.support;
17+
18+
import java.util.Arrays;
19+
20+
import javax.sql.DataSource;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.batch.core.ExitStatus;
26+
import org.springframework.batch.core.Job;
27+
import org.springframework.batch.core.JobExecution;
28+
import org.springframework.batch.core.JobParameters;
29+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
30+
import org.springframework.batch.core.job.builder.JobBuilder;
31+
import org.springframework.batch.core.launch.JobLauncher;
32+
import org.springframework.batch.core.repository.JobRepository;
33+
import org.springframework.batch.core.step.builder.StepBuilder;
34+
import org.springframework.batch.item.database.JdbcBatchItemWriter;
35+
import org.springframework.batch.item.database.JdbcCursorItemReader;
36+
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
37+
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
38+
import org.springframework.batch.item.file.FlatFileItemReader;
39+
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
40+
import org.springframework.context.ApplicationContext;
41+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
42+
import org.springframework.context.annotation.Bean;
43+
import org.springframework.context.annotation.Configuration;
44+
import org.springframework.core.io.ClassPathResource;
45+
import org.springframework.jdbc.core.DataClassRowMapper;
46+
import org.springframework.jdbc.core.JdbcTemplate;
47+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
48+
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
49+
import org.springframework.jdbc.support.JdbcTransactionManager;
50+
import org.springframework.test.jdbc.JdbcTestUtils;
51+
52+
public class CompositeItemReaderIntegrationTests {
53+
54+
record Person(int id, String name) {
55+
}
56+
57+
@Test
58+
void testCompositeItemReader() throws Exception {
59+
// given
60+
ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class);
61+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
62+
Job job = context.getBean(Job.class);
63+
64+
// when
65+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
66+
67+
// then
68+
Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
69+
JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getBean(DataSource.class));
70+
int personsCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "person_target");
71+
Assertions.assertEquals(6, personsCount);
72+
}
73+
74+
@Configuration
75+
@EnableBatchProcessing
76+
static class JobConfiguration {
77+
78+
@Bean
79+
public FlatFileItemReader<Person> itemReader1() {
80+
return new FlatFileItemReaderBuilder<Person>()
81+
.name("personItemReader1")
82+
.resource(new ClassPathResource("persons1.csv"))
83+
.delimited()
84+
.names("id", "name")
85+
.targetType(Person.class)
86+
.build();
87+
}
88+
89+
@Bean
90+
public FlatFileItemReader<Person> itemReader2() {
91+
return new FlatFileItemReaderBuilder<Person>()
92+
.name("personItemReader2")
93+
.resource(new ClassPathResource("persons2.csv"))
94+
.delimited()
95+
.names("id", "name")
96+
.targetType(Person.class)
97+
.build();
98+
}
99+
100+
@Bean
101+
public JdbcCursorItemReader<Person> itemReader3() {
102+
String sql = "select * from person_source";
103+
return new JdbcCursorItemReaderBuilder<Person>()
104+
.name("personItemReader3")
105+
.dataSource(dataSource())
106+
.sql(sql)
107+
.rowMapper(new DataClassRowMapper<>(Person.class))
108+
.build();
109+
}
110+
111+
@Bean
112+
public CompositeItemReader<Person> itemReader() {
113+
return new CompositeItemReader<>(Arrays.asList(itemReader1(), itemReader2(), itemReader3()));
114+
}
115+
116+
@Bean
117+
public JdbcBatchItemWriter<Person> itemWriter() {
118+
String sql = "insert into person_target (id, name) values (:id, :name)";
119+
return new JdbcBatchItemWriterBuilder<Person>()
120+
.dataSource(dataSource())
121+
.sql(sql)
122+
.beanMapped()
123+
.build();
124+
}
125+
126+
@Bean
127+
public Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
128+
return new JobBuilder("job", jobRepository)
129+
.start(new StepBuilder("step", jobRepository)
130+
.<Person, Person>chunk(5, transactionManager)
131+
.reader(itemReader())
132+
.writer(itemWriter())
133+
.build())
134+
.build();
135+
}
136+
137+
@Bean
138+
public DataSource dataSource() {
139+
return new EmbeddedDatabaseBuilder()
140+
.setType(EmbeddedDatabaseType.H2)
141+
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
142+
.addScript("/org/springframework/batch/core/schema-h2.sql")
143+
.addScript("schema.sql")
144+
.addScript("data.sql")
145+
.build();
146+
}
147+
148+
@Bean
149+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
150+
return new JdbcTransactionManager(dataSource);
151+
}
152+
153+
}
154+
}

src/test/resources/data.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
insert into person_source values (5, 'baz1');
2+
insert into person_source values (6, 'baz2');

src/test/resources/persons1.csv

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1,foo1
2+
2,foo2

src/test/resources/persons2.csv

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
3,bar1
2+
4,bar2

src/test/resources/schema.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
create table person_source (id int primary key, name varchar(20));
2+
create table person_target (id int primary key, name varchar(20));

0 commit comments

Comments
 (0)