Skip to content

Commit b79657d

Browse files
cppwfsmminella
authored andcommitted
Establish RabbitMQ Item Writer Single Step Batch
resolves #698
1 parent 5fa4a20 commit b79657d

File tree

5 files changed

+354
-1
lines changed

5 files changed

+354
-1
lines changed

spring-cloud-starter-single-step-batch-job/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<dependency>
5151
<groupId>org.springframework.amqp</groupId>
5252
<artifactId>spring-amqp</artifactId>
53+
<optional>true</optional>
5354
</dependency>
5455
<dependency>
5556
<groupId>org.springframework.amqp</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.rabbit;
18+
19+
import java.util.Map;
20+
21+
import org.springframework.amqp.core.AmqpTemplate;
22+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
23+
import org.springframework.amqp.support.converter.MessageConverter;
24+
import org.springframework.batch.item.amqp.AmqpItemWriter;
25+
import org.springframework.batch.item.amqp.builder.AmqpItemWriterBuilder;
26+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
27+
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
28+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
29+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
33+
/**
34+
* Autconfiguration for a {@code AmqpItemWriter}.
35+
*
36+
* @author Glenn Renfro
37+
* @since 2.3
38+
*/
39+
@Configuration
40+
@EnableConfigurationProperties(AmqpItemWriterProperties.class)
41+
@AutoConfigureAfter(BatchAutoConfiguration.class)
42+
@ConditionalOnProperty(name = "spring.batch.job.amqpitemwriter.enabled",
43+
havingValue = "true", matchIfMissing = false)
44+
public class AmqpItemWriterAutoConfiguration {
45+
46+
@Bean
47+
public AmqpItemWriter<Map<Object, Object>> amqpItemWriter(AmqpTemplate amqpTemplate) {
48+
return new AmqpItemWriterBuilder<Map<Object, Object>>().amqpTemplate(amqpTemplate)
49+
.build();
50+
}
51+
52+
@Bean
53+
public AmqpItemWriterProperties amqpItemWriterProperties() {
54+
return new AmqpItemWriterProperties();
55+
}
56+
57+
@ConditionalOnProperty(name = "spring.batch.job.amqpitemwriter.jsonConverterEnabled",
58+
havingValue = "true", matchIfMissing = true)
59+
@Bean
60+
public MessageConverter messageConverter() {
61+
return new Jackson2JsonMessageConverter();
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.rabbit;
18+
19+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
20+
import org.springframework.boot.context.properties.ConfigurationProperties;
21+
22+
/**
23+
* @author Glenn Renfro
24+
* @since 2.3
25+
*/
26+
@ConfigurationProperties(prefix = "spring.batch.job.amqpitemwriter")
27+
public class AmqpItemWriterProperties {
28+
29+
private boolean enabled;
30+
31+
private boolean jsonConverterEnabled = true;
32+
33+
/**
34+
* The state of the enabled flag.
35+
* @return true if AmqpItemWriter is enabled. Otherwise false.
36+
*/
37+
public boolean isEnabled() {
38+
return enabled;
39+
}
40+
41+
/**
42+
* Enables or disables the AmqpItemReader.
43+
* @param enabled if true then AmqpItemWriter will be enabled. Defaults to false.
44+
*/
45+
public void setEnabled(boolean enabled) {
46+
this.enabled = enabled;
47+
}
48+
49+
/**
50+
* States whether the {@link Jackson2JsonMessageConverter} is used as a message
51+
* converter.
52+
* @return true if enabled else false.
53+
*/
54+
public boolean isJsonConverterEnabled() {
55+
return jsonConverterEnabled;
56+
}
57+
58+
/**
59+
* Establishes whether the {@link Jackson2JsonMessageConverter} is to be used as a
60+
* message converter.
61+
* @param jsonConverterEnabled true if it is to be enabled else false. Defaults to
62+
* true.
63+
*/
64+
public void setJsonConverterEnabled(boolean jsonConverterEnabled) {
65+
this.jsonConverterEnabled = jsonConverterEnabled;
66+
}
67+
68+
}

spring-cloud-starter-single-step-batch-job/src/main/resources/META-INF/spring.factories

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframewo
44
org.springframework.cloud.task.batch.autoconfigure.flatfile.FlatFileItemWriterAutoConfiguration, \
55
org.springframework.cloud.task.batch.autoconfigure.jdbc.JdbcItemWriterAutoConfiguration, \
66
org.springframework.cloud.task.batch.autoconfigure.jdbc.JdbcCursorItemReaderAutoConfiguration, \
7-
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemReaderAutoConfiguration
7+
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemReaderAutoConfiguration, \
8+
org.springframework.cloud.task.batch.autoconfigure.rabbit.AmqpItemWriterAutoConfiguration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright 2020-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.task.batch.autoconfigure.rabbit;
18+
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.mockito.Mockito;
28+
import org.testcontainers.containers.GenericContainer;
29+
30+
import org.springframework.amqp.core.AmqpAdmin;
31+
import org.springframework.amqp.core.AmqpTemplate;
32+
import org.springframework.amqp.core.Binding;
33+
import org.springframework.amqp.core.Queue;
34+
import org.springframework.amqp.core.TopicExchange;
35+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
36+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
37+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
38+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
39+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
40+
import org.springframework.batch.core.Job;
41+
import org.springframework.batch.core.JobExecution;
42+
import org.springframework.batch.core.JobParameters;
43+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
44+
import org.springframework.batch.core.explore.JobExplorer;
45+
import org.springframework.batch.core.launch.JobLauncher;
46+
import org.springframework.batch.item.ItemReader;
47+
import org.springframework.batch.item.support.ListItemReader;
48+
import org.springframework.boot.autoconfigure.AutoConfigurations;
49+
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
50+
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
51+
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
52+
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
53+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
54+
import org.springframework.cloud.task.batch.autoconfigure.SingleStepJobAutoConfiguration;
55+
import org.springframework.context.annotation.Bean;
56+
import org.springframework.context.annotation.Configuration;
57+
import org.springframework.jdbc.core.RowMapper;
58+
59+
import static org.assertj.core.api.Assertions.assertThat;
60+
61+
public class AmqpItemWriterAutoConfigurationTests {
62+
63+
private final static String QUEUE_NAME = "foo";
64+
65+
private final static String EXCHANGE_NAME = "fooexchange";
66+
67+
private static int amqpPort;
68+
69+
private static String host;
70+
71+
private static List<Map<Object, Object>> sampleData;
72+
73+
private RabbitTemplate template;
74+
75+
private ConnectionFactory connectionFactory;
76+
77+
private String[] configurations;
78+
79+
static {
80+
GenericContainer rabbitmq = new GenericContainer("rabbitmq:3.5.3")
81+
.withExposedPorts(5672);
82+
rabbitmq.start();
83+
final Integer mappedPort = rabbitmq.getMappedPort(5672);
84+
host = rabbitmq.getContainerIpAddress();
85+
amqpPort = mappedPort;
86+
sampleData = new ArrayList<>(5);
87+
addNameToReaderList(sampleData, "Jane");
88+
addNameToReaderList(sampleData, "John");
89+
addNameToReaderList(sampleData, "Liz");
90+
addNameToReaderList(sampleData, "Cameron");
91+
addNameToReaderList(sampleData, "Judy");
92+
}
93+
94+
private static void addNameToReaderList(List<Map<Object, Object>> itemReaderList,
95+
String value) {
96+
Map<Object, Object> prepMap = new HashMap<>();
97+
prepMap.put("first_name", value);
98+
itemReaderList.add(prepMap);
99+
}
100+
101+
@BeforeEach
102+
void setupTest() {
103+
this.connectionFactory = new CachingConnectionFactory(host, amqpPort);
104+
this.template = new RabbitTemplate(this.connectionFactory);
105+
this.template.setMessageConverter(new Jackson2JsonMessageConverter());
106+
AmqpAdmin admin = new RabbitAdmin(this.connectionFactory);
107+
admin.declareQueue(new Queue(QUEUE_NAME));
108+
admin.declareExchange(new TopicExchange(EXCHANGE_NAME));
109+
admin.declareBinding(new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE,
110+
EXCHANGE_NAME, "#", null));
111+
this.configurations = new String[] { "spring.batch.job.jobName=integrationJob",
112+
"spring.batch.job.stepName=step1", "spring.batch.job.chunkSize=5",
113+
"spring.rabbitmq.template.exchange=" + EXCHANGE_NAME,
114+
"spring.rabbitmq.host=" + host,
115+
"spring.batch.job.amqpitemwriter.enabled=true",
116+
"spring.rabbitmq.port=" + amqpPort };
117+
}
118+
119+
@AfterEach
120+
void teardownTest() {
121+
AmqpAdmin admin = new RabbitAdmin(this.connectionFactory);
122+
admin.deleteQueue(QUEUE_NAME);
123+
this.template.destroy();
124+
}
125+
126+
@Test
127+
void basicTest() {
128+
ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner()
129+
.withUserConfiguration(BaseConfiguration.class)
130+
.withConfiguration(
131+
AutoConfigurations.of(PropertyPlaceholderAutoConfiguration.class,
132+
BatchAutoConfiguration.class,
133+
SingleStepJobAutoConfiguration.class,
134+
AmqpItemWriterAutoConfiguration.class,
135+
RabbitAutoConfiguration.class))
136+
.withPropertyValues(this.configurations);
137+
138+
applicationContextRunner.run((context) -> {
139+
JobExecution jobExecution = runJob(context);
140+
JobExplorer jobExplorer = context.getBean(JobExplorer.class);
141+
142+
while (jobExplorer.getJobExecution(jobExecution.getJobId()).isRunning()) {
143+
Thread.sleep(1000);
144+
}
145+
146+
for (Map<Object, Object> sampleEntry : sampleData) {
147+
Map<Object, Object> map = (Map<Object, Object>) template
148+
.receiveAndConvert(QUEUE_NAME);
149+
assertThat(map.get("first_name"))
150+
.isEqualTo(sampleEntry.get("first_name"));
151+
}
152+
});
153+
}
154+
155+
@Test
156+
void useAmqpTemplateTest() {
157+
ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner()
158+
.withUserConfiguration(MockConfiguration.class)
159+
.withConfiguration(
160+
AutoConfigurations.of(PropertyPlaceholderAutoConfiguration.class,
161+
BatchAutoConfiguration.class,
162+
SingleStepJobAutoConfiguration.class,
163+
AmqpItemWriterAutoConfiguration.class))
164+
.withPropertyValues(this.configurations);
165+
166+
applicationContextRunner.run((context) -> {
167+
runJob(context);
168+
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
169+
Mockito.verify(amqpTemplate, Mockito.times(5)).convertAndSend(Mockito.any());
170+
});
171+
}
172+
173+
private JobExecution runJob(AssertableApplicationContext context) throws Exception {
174+
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
175+
176+
Job job = context.getBean(Job.class);
177+
178+
return jobLauncher.run(job, new JobParameters());
179+
}
180+
181+
@EnableBatchProcessing
182+
@Configuration
183+
public static class BaseConfiguration extends ItemWriterConfiguration {
184+
185+
}
186+
187+
@EnableBatchProcessing
188+
@Configuration
189+
public static class MockConfiguration extends ItemWriterConfiguration {
190+
191+
@Bean
192+
AmqpTemplate amqpTemplateBean() {
193+
return Mockito.mock(AmqpTemplate.class);
194+
}
195+
196+
}
197+
198+
public static class ItemWriterConfiguration {
199+
200+
@Bean
201+
public RowMapper<Map<Object, Object>> rowMapper() {
202+
return (rs, rowNum) -> {
203+
Map<Object, Object> item = new HashMap<>();
204+
205+
item.put("item", rs.getString("item_name"));
206+
207+
return item;
208+
};
209+
}
210+
211+
@Bean
212+
public ItemReader<Map<Object, Object>> itemWriter() {
213+
214+
return new ListItemReader<>(sampleData);
215+
}
216+
217+
}
218+
219+
}

0 commit comments

Comments
 (0)