Skip to content

Commit 6543349

Browse files
committed
Add MQTT5 sample application with integration flows and tests
Signed-off-by: Minyoung Noh <[email protected]> #323
1 parent 99055a6 commit 6543349

File tree

8 files changed

+416
-0
lines changed

8 files changed

+416
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ This is a good place to get started. The samples here are technically motivated
6161
* **jpa** - Shows the usage of the JPA Components
6262
* **mail** - Example showing **IMAP** and **POP3** support
6363
* **mqtt** - Demonstrates the functionality of inbound and outbound **MQTT Adapters**
64+
* **mqtt5** - Demonstrates the functionality of inbound and outbound **MQTT5 Adapters**
6465
* **mongodb** - Shows how to persist a Message payload to a **MongoDb** document store and how to read documents from **MongoDb**
6566
* **oddeven** - Example combining the functionality of **Inbound Channel Adapter**, **Filter**, **Router** and **Poller**
6667
* **quote** - Example demoing core EIP support using **Channel Adapter (Inbound and Stdout)**, **Poller** with Interval Triggers, **Service Activator**

basic/mqtt5/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Spring Integration - MQTT5 Sample
2+
================================
3+
4+
# Overview
5+
6+
This sample demonstrates basic functionality of the **Spring Integration MQTT5 Adapters**.
7+
8+
It assumes a broker is running on localhost on port 1883.
9+
10+
Once the application is started, you enter some text on the command prompt and a message containing that entered text is
11+
dispatched to the MQTT topic. In return that message is retrieved by Spring Integration and then logged.
12+
13+
# How to Run the Sample
14+
15+
If you imported the example into your IDE, you can just run class **org.springframework.integration.samples.mqtt5.Application**.
16+
For example in [SpringSource Tool Suite](https://www.springsource.com/developer/sts) (STS) do:
17+
18+
* Right-click on SampleSimple class --> Run As --> Spring Boot App
19+
20+
(or run from the boot console).
21+
22+
Alternatively, you can start the sample from the command line:
23+
24+
* ./gradlew :mqtt5:run
25+
26+
Enter some data (e.g. `foo`) on the console; you will see `foo sent to MQTT5, received from MQTT5`
27+
28+
Ctrl-C to terminate.
29+
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.samples.mqtt5;
18+
19+
20+
import org.apache.commons.logging.Log;
21+
import org.apache.commons.logging.LogFactory;
22+
23+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
24+
import org.springframework.boot.SpringApplication;
25+
import org.springframework.boot.autoconfigure.SpringBootApplication;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.integration.dsl.IntegrationFlow;
28+
import org.springframework.integration.dsl.Pollers;
29+
import org.springframework.integration.endpoint.MessageProducerSupport;
30+
import org.springframework.integration.handler.LoggingHandler;
31+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
32+
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
33+
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
34+
import org.springframework.messaging.MessageHandler;
35+
36+
import java.nio.charset.StandardCharsets;
37+
38+
/**
39+
* Starts the Spring Context and will initialize the Spring Integration message flow.
40+
*
41+
* @author Minyoung Noh
42+
*
43+
*/
44+
@SpringBootApplication
45+
public class Application {
46+
47+
private static final Log LOGGER = LogFactory.getLog(Application.class);
48+
49+
/**
50+
* Load the Spring Integration Application Context
51+
*
52+
* @param args - command line arguments
53+
*/
54+
public static void main(final String... args) {
55+
56+
LOGGER.info("\n========================================================="
57+
+ "\n "
58+
+ "\n Welcome to Spring Integration! "
59+
+ "\n "
60+
+ "\n For more information please visit: "
61+
+ "\n https://spring.io/projects/spring-integration "
62+
+ "\n "
63+
+ "\n=========================================================");
64+
65+
LOGGER.info("\n========================================================="
66+
+ "\n "
67+
+ "\n This is the MQTT5 Sample - "
68+
+ "\n "
69+
+ "\n Please enter some text and press return. The entered "
70+
+ "\n Message will be sent to the configured MQTT topic, "
71+
+ "\n then again immediately retrieved from the Message "
72+
+ "\n Broker and ultimately printed to the command line. "
73+
+ "\n "
74+
+ "\n=========================================================");
75+
76+
SpringApplication.run(Application.class, args);
77+
}
78+
79+
@Bean
80+
public MqttConnectionOptions mqttConnectionOptions() {
81+
MqttConnectionOptions options = new MqttConnectionOptions();
82+
options.setServerURIs(new String[]{ "tcp://localhost:1883" });
83+
options.setUserName("guest");
84+
options.setPassword("guest".getBytes(StandardCharsets.UTF_8));
85+
return options;
86+
}
87+
88+
// publisher
89+
90+
@Bean
91+
public IntegrationFlow mqttOutFlow() {
92+
return IntegrationFlow.from(CharacterStreamReadingMessageSource.stdin(),
93+
e -> e.poller(Pollers.fixedDelay(1000)))
94+
.transform(p -> p + " sent to MQTT5")
95+
.handle(mqttOutbound())
96+
.get();
97+
}
98+
99+
@Bean
100+
public MessageHandler mqttOutbound() {
101+
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(mqttConnectionOptions(), "siSamplePublisher");
102+
messageHandler.setAsync(true);
103+
messageHandler.setAsyncEvents(true);
104+
messageHandler.setDefaultTopic("siSampleTopic");
105+
return messageHandler;
106+
}
107+
108+
// consumer
109+
110+
@Bean
111+
public IntegrationFlow mqttInFlow() {
112+
return IntegrationFlow.from(mqttInbound())
113+
.transform(p -> p + ", received from MQTT5")
114+
.handle(logger())
115+
.get();
116+
}
117+
118+
private LoggingHandler logger() {
119+
LoggingHandler loggingHandler = new LoggingHandler("INFO");
120+
loggingHandler.setLoggerName("siSample");
121+
return loggingHandler;
122+
}
123+
124+
@Bean
125+
public MessageProducerSupport mqttInbound() {
126+
Mqttv5PahoMessageDrivenChannelAdapter adapter =
127+
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectionOptions(),"siSampleConsumer", "siSampleTopic");
128+
adapter.setCompletionTimeout(5000);
129+
adapter.setPayloadType(String.class);
130+
adapter.setMessageConverter(new MqttStringToBytesConverter());
131+
adapter.setQos(1);
132+
return adapter;
133+
}
134+
135+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.samples.mqtt5;
17+
18+
import org.springframework.messaging.Message;
19+
import org.springframework.messaging.MessageHeaders;
20+
import org.springframework.messaging.converter.AbstractMessageConverter;
21+
22+
import java.nio.charset.StandardCharsets;
23+
24+
/**
25+
* A simple {@link AbstractMessageConverter} that converts
26+
*
27+
* @author Minyoung Noh
28+
* @since 5.2
29+
*
30+
*/
31+
public class MqttStringToBytesConverter extends AbstractMessageConverter {
32+
@Override
33+
protected boolean supports(Class<?> clazz) {
34+
return true;
35+
}
36+
37+
@Override
38+
protected Object convertFromInternal(Message<?> message, Class<?> targetClass,
39+
Object conversionHint) {
40+
41+
return message.getPayload().toString().getBytes(StandardCharsets.UTF_8);
42+
}
43+
44+
@Override
45+
protected Object convertToInternal(Object payload, MessageHeaders headers,
46+
Object conversionHint) {
47+
48+
return new String((byte[]) payload);
49+
}
50+
51+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<configuration>
2+
3+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
4+
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder
5+
by default -->
6+
<encoder>
7+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
8+
</pattern>
9+
</encoder>
10+
</appender>
11+
12+
<root level="debug">
13+
<appender-ref ref="STDOUT" />
14+
</root>
15+
16+
<logger name="org.springframework" level="WARN" />
17+
18+
<logger name="org.springframework.integration" level="ERROR" />
19+
20+
<logger name="org.springframework.integration.samples" level="INFO" />
21+
22+
<logger name="siSample" level="INFO" />
23+
24+
</configuration>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.springframework.integration.samples.mqtt5;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.Mockito.verify;
6+
import static org.springframework.integration.test.mock.MockIntegration.messageArgumentCaptor;
7+
import static org.springframework.integration.test.mock.MockIntegration.mockMessageHandler;
8+
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.junit.jupiter.api.BeforeAll;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.mockito.ArgumentCaptor;
16+
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.boot.test.context.SpringBootTest;
19+
import org.springframework.integration.dsl.IntegrationFlow;
20+
import org.springframework.integration.test.context.MockIntegrationContext;
21+
import org.springframework.integration.test.context.SpringIntegrationTest;
22+
import org.springframework.messaging.Message;
23+
import org.springframework.messaging.MessageHandler;
24+
import org.springframework.messaging.support.GenericMessage;
25+
import org.springframework.test.context.junit.jupiter.SpringExtension;
26+
27+
@ExtendWith(SpringExtension.class)
28+
@SpringBootTest
29+
@SpringIntegrationTest
30+
public class ApplicationTest {
31+
32+
@BeforeAll
33+
static void setupBroker() {
34+
BrokerRunning brokerRunning = BrokerRunning.isRunning(1883);
35+
}
36+
37+
@Autowired
38+
private MockIntegrationContext mockIntegrationContext;
39+
40+
@Autowired
41+
private IntegrationFlow mqttOutFlow;
42+
43+
@Test
44+
void testMqttFlow() throws InterruptedException {
45+
ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
46+
CountDownLatch receiveLatch = new CountDownLatch(1);
47+
MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
48+
49+
mockIntegrationContext.substituteMessageHandlerFor(
50+
"mqttInFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1",
51+
mockMessageHandler);
52+
53+
mqttOutFlow.getInputChannel().send(new GenericMessage<>("foo"));
54+
55+
assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
56+
verify(mockMessageHandler).handleMessage(any());
57+
assertThat(captor.getValue().getPayload())
58+
.isEqualTo("foo sent to MQTT5, received from MQTT5");
59+
}
60+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.samples.mqtt5;
18+
19+
import static org.junit.Assume.assumeNoException;
20+
import static org.junit.Assume.assumeTrue;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.eclipse.paho.mqttv5.client.IMqttClient;
28+
import org.eclipse.paho.mqttv5.client.MqttClient;
29+
import org.eclipse.paho.mqttv5.common.MqttException;
30+
import org.junit.rules.TestWatcher;
31+
import org.junit.runner.Description;
32+
import org.junit.runners.model.Statement;
33+
34+
35+
/**
36+
* @author Minyoung Noh
37+
*
38+
* @since 5.2
39+
*
40+
*/
41+
public class BrokerRunning extends TestWatcher {
42+
43+
private static final Log logger = LogFactory.getLog(BrokerRunning.class);
44+
45+
// Static so that we only test once on failure: speeds up test suite
46+
private static final Map<Integer, Boolean> brokerOnline = new HashMap<>();
47+
48+
private final int port;
49+
50+
private BrokerRunning(int port) {
51+
this.port = port;
52+
brokerOnline.put(port, true);
53+
}
54+
55+
@Override
56+
public Statement apply(Statement base, Description description) {
57+
assumeTrue(brokerOnline.get(port));
58+
String url = "tcp://localhost:" + port;
59+
IMqttClient client = null;
60+
try {
61+
client = new MqttClient(url, "junit-" + System.currentTimeMillis());
62+
client.connect();
63+
}
64+
catch (MqttException e) {
65+
logger.warn("Tests not running because no broker on " + url + ":", e);
66+
assumeNoException(e);
67+
}
68+
finally {
69+
if (client != null) {
70+
try {
71+
client.disconnect();
72+
client.close();
73+
}
74+
catch (MqttException e) {
75+
}
76+
}
77+
}
78+
return super.apply(base, description);
79+
}
80+
81+
82+
public static BrokerRunning isRunning(int port) {
83+
return new BrokerRunning(port);
84+
}
85+
86+
}

0 commit comments

Comments
 (0)