Skip to content

Commit 626af44

Browse files
committed
Adding KafkaItemWriter single step autoconfig
resolves #696 Updated to allow user to set the itemKeyMapper Set a very simple keymapper that returns the same result Updated based on code reviews Updated Kafka version
1 parent a841508 commit 626af44

File tree

5 files changed

+322
-2
lines changed

5 files changed

+322
-2
lines changed

spring-cloud-starter-single-step-batch-job/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<test.containers.version>1.15.0</test.containers.version>
1414
<test.rabbit.containers.version>1.15.0</test.rabbit.containers.version>
1515
<test.ducttape.version>1.0.8</test.ducttape.version>
16-
<spring-kafka-version>2.5.3.RELEASE</spring-kafka-version>
16+
<spring-kafka-version>2.6.3</spring-kafka-version>
1717
</properties>
1818

1919
<dependencies>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.kafka;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.batch.item.kafka.KafkaItemWriter;
23+
import org.springframework.batch.item.kafka.builder.KafkaItemWriterBuilder;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.beans.factory.annotation.Qualifier;
26+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
27+
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
29+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
30+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
31+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.core.convert.converter.Converter;
35+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
36+
import org.springframework.kafka.core.KafkaTemplate;
37+
import org.springframework.kafka.core.ProducerFactory;
38+
import org.springframework.kafka.support.serializer.JsonSerializer;
39+
import org.springframework.util.Assert;
40+
41+
/**
42+
*
43+
* Autconfiguration for a {@code KafkaItemReader}.
44+
*
45+
* @author Glenn Renfro
46+
* @since 2.3
47+
*/
48+
@Configuration
49+
@EnableConfigurationProperties({ KafkaProperties.class, KafkaItemWriterProperties.class })
50+
@AutoConfigureAfter(BatchAutoConfiguration.class)
51+
public class KafkaItemWriterAutoConfiguration {
52+
53+
@Autowired
54+
private KafkaProperties kafkaProperties;
55+
56+
@Bean
57+
@ConditionalOnMissingBean
58+
@ConditionalOnProperty(prefix = "spring.batch.job.kafkaitemwriter", name = "topic")
59+
public KafkaItemWriter<Object, Map<Object, Object>> kafkaItemWriter(
60+
KafkaItemWriterProperties kafkaItemWriterProperties,
61+
ProducerFactory<Object, Map<Object, Object>> producerFactory,
62+
@Qualifier("batchItemKeyMapper") Converter<Object, Object> itemKeyMapper) {
63+
64+
validateProperties(kafkaItemWriterProperties);
65+
KafkaTemplate template = new KafkaTemplate(producerFactory);
66+
template.setDefaultTopic(kafkaItemWriterProperties.getTopic());
67+
return new KafkaItemWriterBuilder<Object, Map<Object, Object>>()
68+
.delete(kafkaItemWriterProperties.isDelete()).kafkaTemplate(template)
69+
.itemKeyMapper(itemKeyMapper).build();
70+
}
71+
72+
@Bean
73+
@ConditionalOnMissingBean(name = "batchItemKeyMapper")
74+
public Converter<Object, Object> batchItemKeyMapper() {
75+
return new Converter<Object, Object>() {
76+
@Override
77+
public Object convert(Object source) {
78+
return source;
79+
}
80+
};
81+
}
82+
83+
@Bean
84+
@ConditionalOnMissingBean
85+
ProducerFactory<Object, Map<Object, Object>> producerFactory() {
86+
Map<String, Object> configs = new HashMap<>();
87+
configs.putAll(this.kafkaProperties.getProducer().buildProperties());
88+
return new DefaultKafkaProducerFactory<Object, Map<Object, Object>>(configs, null,
89+
new JsonSerializer<>());
90+
}
91+
92+
private void validateProperties(KafkaItemWriterProperties kafkaItemWriterProperties) {
93+
Assert.hasText(kafkaItemWriterProperties.getTopic(),
94+
"topic must not be empty or null");
95+
}
96+
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.kafka;
18+
19+
import org.springframework.boot.context.properties.ConfigurationProperties;
20+
21+
/**
22+
* Properties to configure a {@code KafkaItemWriter}.
23+
*
24+
* @author Glenn Renfro
25+
* @since 2.3
26+
*/
27+
@ConfigurationProperties(prefix = "spring.batch.job.kafkaitemwriter")
28+
public class KafkaItemWriterProperties {
29+
30+
private String topic;
31+
32+
private boolean delete;
33+
34+
/**
35+
* Returns the name of the topic from which messages will be written.
36+
* @return the name of the topic.
37+
*/
38+
public String getTopic() {
39+
return topic;
40+
}
41+
42+
/**
43+
* The topic name from which the messages will be read.
44+
* @param topic name of the topic
45+
*/
46+
public void setTopic(String topic) {
47+
this.topic = topic;
48+
}
49+
50+
/**
51+
* Indicate if the items being passed to the writer are all to be sent as delete
52+
* events to the topic. A delete event is made of a key with a null value. If set to
53+
* false (default), the items will be sent with provided value and key converter by
54+
* the itemKeyMapper. If set to true, the items will be sent with the key converter
55+
* from the value by the itemKeyMapper and a null value.
56+
* @return removal indicator.
57+
*/
58+
public boolean isDelete() {
59+
return delete;
60+
}
61+
62+
/**
63+
* Indicate if the items being passed to the writer are all to be sent as delete
64+
* events to the topic. A delete event is made of a key with a null value. If set to
65+
* false (default), the items will be sent with provided value and key converter by
66+
* the itemKeyMapper. If set to true, the items will be sent with the key converter
67+
* from the value by the itemKeyMapper and a null value.
68+
* @param delete removal indicator.
69+
*/
70+
public void setDelete(boolean delete) {
71+
this.delete = delete;
72+
}
73+
74+
}

spring-cloud-starter-single-step-batch-job/src/main/resources/META-INF/spring.factories

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframewo
66
org.springframework.cloud.task.batch.autoconfigure.jdbc.JdbcCursorItemReaderAutoConfiguration, \
77
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemReaderAutoConfiguration, \
88
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemWriterAutoConfiguration, \
9-
org.springframework.cloud.task.batch.autoconfigure.kafka.KafkaItemReaderAutoConfiguration
9+
org.springframework.cloud.task.batch.autoconfigure.kafka.KafkaItemReaderAutoConfiguration, \
10+
org.springframework.cloud.task.batch.autoconfigure.kafka.KafkaItemWriterAutoConfiguration
11+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.kafka;
18+
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.serialization.StringDeserializer;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.batch.core.Job;
31+
import org.springframework.batch.core.JobExecution;
32+
import org.springframework.batch.core.JobParameters;
33+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
34+
import org.springframework.batch.core.explore.JobExplorer;
35+
import org.springframework.batch.core.launch.JobLauncher;
36+
import org.springframework.batch.item.support.ListItemReader;
37+
import org.springframework.boot.autoconfigure.AutoConfigurations;
38+
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
39+
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
40+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
41+
import org.springframework.cloud.task.batch.autoconfigure.SingleStepJobAutoConfiguration;
42+
import org.springframework.context.ApplicationContext;
43+
import org.springframework.context.annotation.Bean;
44+
import org.springframework.context.annotation.Configuration;
45+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
46+
import org.springframework.kafka.support.serializer.JsonDeserializer;
47+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
48+
import org.springframework.kafka.test.context.EmbeddedKafka;
49+
import org.springframework.kafka.test.utils.KafkaTestUtils;
50+
51+
import static java.util.Collections.singleton;
52+
import static org.assertj.core.api.Assertions.assertThat;
53+
54+
@EmbeddedKafka(partitions = 1, topics = { "topic1" })
55+
public class KafkaItemWriterTests {
56+
57+
private static EmbeddedKafkaBroker embeddedKafkaBroker;
58+
59+
@BeforeAll
60+
public static void setupTest(EmbeddedKafkaBroker embeddedKafka) {
61+
embeddedKafkaBroker = embeddedKafka;
62+
embeddedKafka.addTopics("topic2");
63+
}
64+
65+
@Test
66+
public void testBaseKafkaItemWriter() {
67+
final String topicName = "topic1";
68+
ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner()
69+
.withUserConfiguration(CustomMappingConfiguration.class)
70+
.withConfiguration(
71+
AutoConfigurations.of(PropertyPlaceholderAutoConfiguration.class,
72+
BatchAutoConfiguration.class,
73+
SingleStepJobAutoConfiguration.class,
74+
KafkaItemWriterAutoConfiguration.class))
75+
.withPropertyValues("spring.batch.job.jobName=job",
76+
"spring.batch.job.stepName=step1", "spring.batch.job.chunkSize=5",
77+
"spring.kafka.producer.bootstrap-servers="
78+
+ embeddedKafkaBroker.getBrokersAsString(),
79+
"spring.kafka.producer.keySerializer=org.springframework.kafka.support.serializer.JsonSerializer",
80+
"spring.batch.job.kafkaitemwriter.topic=" + topicName);
81+
82+
applicationContextRunner.run((context) -> {
83+
waitForTopicPopulation(context);
84+
validateResults(topicName);
85+
});
86+
}
87+
88+
private void validateResults(String topicName) {
89+
Map<String, Object> configs = new HashMap<>(
90+
KafkaTestUtils.consumerProps("1", "false", embeddedKafkaBroker));
91+
Consumer<String, Object> consumer = new DefaultKafkaConsumerFactory<>(configs,
92+
new StringDeserializer(), new JsonDeserializer<>()).createConsumer();
93+
consumer.subscribe(singleton(topicName));
94+
95+
ConsumerRecords<String, Object> consumerRecords = KafkaTestUtils
96+
.getRecords(consumer);
97+
assertThat(consumerRecords.count()).isEqualTo(5);
98+
List<Map<Object, Object>> result = new ArrayList<>();
99+
consumerRecords.forEach(cs -> {
100+
result.add((Map<Object, Object>) cs.value());
101+
});
102+
List<String> firstNames = new ArrayList<>();
103+
result.forEach(s -> firstNames.add((String) s.get("first_name")));
104+
assertThat(firstNames.size()).isEqualTo(5);
105+
assertThat(firstNames).contains("Jane");
106+
assertThat(firstNames).contains("John");
107+
assertThat(firstNames).contains("Liz");
108+
assertThat(firstNames).contains("Cameron");
109+
assertThat(firstNames).contains("Judy");
110+
}
111+
112+
private void waitForTopicPopulation(ApplicationContext context) throws Exception {
113+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
114+
Job job = context.getBean(Job.class);
115+
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
116+
JobExplorer jobExplorer = context.getBean(JobExplorer.class);
117+
118+
while (jobExplorer.getJobExecution(jobExecution.getJobId()).isRunning()) {
119+
Thread.sleep(1000);
120+
}
121+
}
122+
123+
@EnableBatchProcessing
124+
@Configuration
125+
public static class CustomMappingConfiguration {
126+
127+
@Bean
128+
public ListItemReader<Map<Object, Object>> itemWriter() {
129+
List<Map<Object, Object>> list = new ArrayList<>(5);
130+
addNameToReaderList(list, "Jane");
131+
addNameToReaderList(list, "John");
132+
addNameToReaderList(list, "Liz");
133+
addNameToReaderList(list, "Cameron");
134+
addNameToReaderList(list, "Judy");
135+
return new ListItemReader<>(list);
136+
}
137+
138+
private void addNameToReaderList(List<Map<Object, Object>> itemReaderList,
139+
String value) {
140+
Map<Object, Object> prepMap = new HashMap<>();
141+
prepMap.put("first_name", value);
142+
itemReaderList.add(prepMap);
143+
}
144+
145+
}
146+
147+
}

0 commit comments

Comments
 (0)