Skip to content

Commit a841508

Browse files
committed
Introducing KafkaItemReader for single step jobs
resolves #679 Rebase update rebase update pom cleanup Cleaned up tests Updated Added updates based on code review polish
1 parent 0edef4e commit a841508

File tree

5 files changed

+504
-2
lines changed

5 files changed

+504
-2
lines changed

spring-cloud-starter-single-step-batch-job/pom.xml

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<test.containers.version>1.15.0</test.containers.version>
1414
<test.rabbit.containers.version>1.15.0</test.rabbit.containers.version>
1515
<test.ducttape.version>1.0.8</test.ducttape.version>
16+
<spring-kafka-version>2.5.3.RELEASE</spring-kafka-version>
1617
</properties>
1718

1819
<dependencies>
@@ -31,7 +32,6 @@
3132
<optional>true</optional>
3233
<version>${spring-boot.version}</version>
3334
</dependency>
34-
3535
<dependency>
3636
<groupId>org.springframework.boot</groupId>
3737
<artifactId>spring-boot-starter-test</artifactId>
@@ -82,6 +82,47 @@
8282
<groupId>com.fasterxml.jackson.core</groupId>
8383
<artifactId>jackson-annotations</artifactId>
8484
</dependency>
85+
<dependency>
86+
<groupId>org.springframework.cloud</groupId>
87+
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
88+
<version>3.1.0-SNAPSHOT</version>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.junit.jupiter</groupId>
92+
<artifactId>junit-jupiter</artifactId>
93+
<scope>test</scope>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.junit.jupiter</groupId>
97+
<artifactId>junit-jupiter-engine</artifactId>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.junit.jupiter</groupId>
102+
<artifactId>junit-jupiter-params</artifactId>
103+
<scope>test</scope>
104+
</dependency>
105+
<dependency>
106+
<groupId>org.springframework.kafka</groupId>
107+
<artifactId>spring-kafka-test</artifactId>
108+
<version>${spring-kafka-version}</version>
109+
<scope>test</scope>
110+
</dependency>
111+
<dependency>
112+
<groupId>org.springframework.kafka</groupId>
113+
<artifactId>spring-kafka</artifactId>
114+
<version>${spring-kafka-version}</version>
115+
</dependency>
116+
<dependency>
117+
<groupId>org.junit.jupiter</groupId>
118+
<artifactId>junit-jupiter-api</artifactId>
119+
<scope>test</scope>
120+
</dependency>
121+
<dependency>
122+
<groupId>org.junit.platform</groupId>
123+
<artifactId>junit-platform-launcher</artifactId>
124+
<scope>test</scope>
125+
</dependency>
85126
</dependencies>
86127

87128
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.kafka;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Map;
22+
import java.util.Properties;
23+
24+
import org.springframework.batch.item.kafka.KafkaItemReader;
25+
import org.springframework.batch.item.kafka.builder.KafkaItemReaderBuilder;
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
28+
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
29+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
30+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
31+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
32+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.util.StringUtils;
36+
37+
/**
38+
*
39+
* AutoConfiguration for a {@code KafkaItemReader}.
40+
*
41+
* @author Glenn Renfro
42+
* @since 2.3
43+
*/
44+
@Configuration
45+
@EnableConfigurationProperties({ KafkaProperties.class, KafkaItemReaderProperties.class })
46+
@AutoConfigureAfter(BatchAutoConfiguration.class)
47+
public class KafkaItemReaderAutoConfiguration {
48+
49+
@Autowired
50+
private KafkaProperties kafkaProperties;
51+
52+
@Bean
53+
@ConditionalOnMissingBean
54+
@ConditionalOnProperty(prefix = "spring.batch.job.kafkaitemreader", name = "name")
55+
public KafkaItemReader<Object, Map<Object, Object>> kafkaItemReader(
56+
KafkaItemReaderProperties kafkaItemReaderProperties) {
57+
Properties consumerProperties = new Properties();
58+
consumerProperties.putAll(this.kafkaProperties.getConsumer().buildProperties());
59+
validateProperties(kafkaItemReaderProperties);
60+
if (kafkaItemReaderProperties.getPartitions() == null
61+
|| kafkaItemReaderProperties.getPartitions().size() == 0) {
62+
kafkaItemReaderProperties.setPartitions(new ArrayList<>(1));
63+
kafkaItemReaderProperties.getPartitions().add(0);
64+
}
65+
return new KafkaItemReaderBuilder<Object, Map<Object, Object>>()
66+
.partitions(kafkaItemReaderProperties.getPartitions())
67+
.consumerProperties(consumerProperties)
68+
.name(kafkaItemReaderProperties.getName())
69+
.pollTimeout(Duration
70+
.ofSeconds(kafkaItemReaderProperties.getPollTimeOutInSeconds()))
71+
.saveState(kafkaItemReaderProperties.isSaveState()).topic(kafkaItemReaderProperties.getTopic()).build();
72+
}
73+
74+
private void validateProperties(KafkaItemReaderProperties kafkaItemReaderProperties) {
75+
if (!StringUtils.hasText(kafkaItemReaderProperties.getName())) {
76+
throw new IllegalArgumentException("Name must not be empty or null");
77+
}
78+
if (!StringUtils.hasText(kafkaItemReaderProperties.getTopic())) {
79+
throw new IllegalArgumentException("Topic must not be empty or null");
80+
}
81+
if (!StringUtils.hasText(this.kafkaProperties.getConsumer().getGroupId())) {
82+
throw new IllegalArgumentException("GroupId must not be empty or null");
83+
}
84+
if (this.kafkaProperties.getBootstrapServers() == null
85+
|| this.kafkaProperties.getBootstrapServers().size() == 0) {
86+
throw new IllegalArgumentException("Bootstrap Servers must be configured");
87+
}
88+
}
89+
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.kafka;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import org.springframework.boot.context.properties.ConfigurationProperties;
23+
24+
/**
25+
* Properties to configure a {@code KafkaItemReader}.
26+
*
27+
* @author Glenn Renfro
28+
* @since 2.3
29+
*/
30+
@ConfigurationProperties(prefix = "spring.batch.job.kafkaitemreader")
31+
public class KafkaItemReaderProperties {
32+
33+
private String name;
34+
35+
private String topic;
36+
37+
private List<Integer> partitions = new ArrayList<>();
38+
39+
private long pollTimeOutInSeconds = 30L;
40+
41+
private boolean saveState = true;
42+
43+
/**
44+
* Returns the configured value of the name used to calculate {@code ExecutionContext}
45+
* keys.
46+
* @return the name
47+
*/
48+
public String getName() {
49+
return name;
50+
}
51+
52+
/**
53+
* The name used to calculate the key within the
54+
* {@link org.springframework.batch.item.ExecutionContext}.
55+
* @param name name of the writer instance
56+
* @see org.springframework.batch.item.ItemStreamSupport#setName(String)
57+
*/
58+
public void setName(String name) {
59+
this.name = name;
60+
}
61+
62+
/**
63+
* Returns the name of the topic from which messages will be read.
64+
* @return the name of the topic.
65+
*/
66+
public String getTopic() {
67+
return topic;
68+
}
69+
70+
/**
71+
* The topic name from which the messages will be read.
72+
* @param topic name of the topic
73+
*/
74+
public void setTopic(String topic) {
75+
this.topic = topic;
76+
}
77+
78+
/**
79+
* A list of partitions to manually assign to the consumer. Defaults to a single entry
80+
* value of 1.
81+
* @return the list of partitions.
82+
*/
83+
public List<Integer> getPartitions() {
84+
return partitions;
85+
}
86+
87+
/**
88+
* A list of partitions to manually assign to the consumer. Defaults to a single entry
89+
* value of 1.
90+
* @param partitions list of partitions
91+
*/
92+
public void setPartitions(List<Integer> partitions) {
93+
this.partitions = partitions;
94+
}
95+
96+
/**
97+
* Get the pollTimeout for the poll() operations. Defaults to 30 seconds.
98+
* @return long containing the poll timeout.
99+
*/
100+
public long getPollTimeOutInSeconds() {
101+
return pollTimeOutInSeconds;
102+
}
103+
104+
/**
105+
* Set the pollTimeout for the poll() operations. Defaults to 30 seconds.
106+
* @param pollTimeOutInSeconds the number of seconds to wait before timing out.
107+
*/
108+
public void setPollTimeOutInSeconds(long pollTimeOutInSeconds) {
109+
this.pollTimeOutInSeconds = pollTimeOutInSeconds;
110+
}
111+
/**
112+
* Configure if the state of the {@link org.springframework.batch.item.ItemStreamSupport}
113+
* should be persisted within the {@link org.springframework.batch.item.ExecutionContext}
114+
* for restart purposes. Defaults to true.
115+
* @return current status of the saveState flag.
116+
*/
117+
public boolean isSaveState() {
118+
return saveState;
119+
}
120+
121+
/**
122+
* Configure if the state of the {@link org.springframework.batch.item.ItemStreamSupport}
123+
* should be persisted within the {@link org.springframework.batch.item.ExecutionContext}
124+
* for restart purposes.
125+
* @param saveState true if state should be persisted. Defaults to true.
126+
*/
127+
public void setSaveState(boolean saveState) {
128+
this.saveState = saveState;
129+
}
130+
}

spring-cloud-starter-single-step-batch-job/src/main/resources/META-INF/spring.factories

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframewo
55
org.springframework.cloud.task.batch.autoconfigure.jdbc.JdbcItemWriterAutoConfiguration, \
66
org.springframework.cloud.task.batch.autoconfigure.jdbc.JdbcCursorItemReaderAutoConfiguration, \
77
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemReaderAutoConfiguration, \
8-
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemWriterAutoConfiguration
8+
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemWriterAutoConfiguration, \
9+
org.springframework.cloud.task.batch.autoconfigure.kafka.KafkaItemReaderAutoConfiguration

0 commit comments

Comments
 (0)