diff --git a/basic/mqtt5/README.md b/basic/mqtt5/README.md new file mode 100644 index 000000000..5b18b1fe7 --- /dev/null +++ b/basic/mqtt5/README.md @@ -0,0 +1,29 @@ +Spring Integration - MQTT5 Sample +================================ + +# Overview + +This sample demonstrates basic functionality of the **Spring Integration MQTT Adapters**. + +It assumes a broker is running on localhost on port 1883. + +Once the application is started, you enter some text on the command prompt and a message containing that entered text is +dispatched to the MQTT topic. In return that message is retrieved by Spring Integration and then logged. + +# How to Run the Sample + +If you imported the example into your IDE, you can just run class **org.springframework.integration.samples.mqtt5.Application**. +For example in [SpringSource Tool Suite](https://www.springsource.com/developer/sts) (STS) do: + +* Right-click on SampleSimple class --> Run As --> Spring Boot App + +(or run from the boot console). + +Alternatively, you can start the sample from the command line: + +* ./gradlew :mqtt5:run + +Enter some data (e.g. `foo`) on the console; you will see `foo sent to MQTT, received from MQTT` + +Ctrl-C to terminate. + diff --git a/basic/mqtt5/pom.xml b/basic/mqtt5/pom.xml new file mode 100644 index 000000000..5b5fbeec6 --- /dev/null +++ b/basic/mqtt5/pom.xml @@ -0,0 +1,392 @@ + + + 4.0.0 + org.springframework.integration.samples + mqtt + 6.4.0 + jar + https://github.com/spring-projects/spring-integration-samples + + Spring IO + https://spring.io/projects/spring-integration + + + + Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + artembilan + Artem Bilan + artem.bilan@broadcom.com + + project lead + + + + garyrussell + Gary Russell + github@gprussell.net + + project lead emeritus + + + + markfisher + Mark Fisher + mark.ryan.fisher@gmail.com + + project founder and lead emeritus + + + + + scm:git:scm:git:git://github.com/spring-projects/spring-integration-samples.git + scm:git:scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git + https://github.com/spring-projects/spring-integration-samples + + + GitHub + https://github.com/spring-projects/spring-integration-samples/issues + + + + org.springframework.boot + spring-boot-starter-integration + compile + + + org.springframework.integration + spring-integration-stream + compile + + + org.springframework.integration + spring-integration-mqtt + compile + + + junit + junit + 4.13.2 + test + + + org.hamcrest + hamcrest-library + 2.2 + test + + + org.mockito + mockito-core + 5.12.0 + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.springframework.integration + spring-integration-test + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.jupiter + junit-jupiter-engine + runtime + + + org.junit.platform + junit-platform-launcher + runtime + + + org.junit.vintage + junit-vintage-engine + runtime + + + + + + org.springframework.integration + spring-integration-file + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-syslog + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-smb + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-jdbc + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-jmx + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-ws + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-redis + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-r2dbc + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-webflux + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-sftp + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-rsocket + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-mqtt + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-event + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-ftp + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-kafka + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-groovy + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-cassandra + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-scripting + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-xmpp + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-websocket + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-graphql + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-feed + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-debezium + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-jpa + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-test + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-zeromq + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-http + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-zookeeper + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-ip + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-hazelcast + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-test-support + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-amqp + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-mail + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-camel + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-xml + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-core + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-zip + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-jms + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-mongodb + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-stream + 6.4.0-SNAPSHOT + + + org.springframework.integration + spring-integration-stomp + 6.4.0-SNAPSHOT + + + org.springframework.boot + spring-boot-dependencies + 3.4.0-SNAPSHOT + import + pom + + + org.junit + junit-bom + 5.11.1 + import + pom + + + com.fasterxml.jackson + jackson-bom + 2.18.0 + import + pom + + + org.springframework + spring-framework-bom + 6.2.0-SNAPSHOT + import + pom + + + org.springframework.integration + spring-integration-bom + 6.4.0-SNAPSHOT + import + pom + + + + + 17 + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + org.springframework.boot + spring-boot-starter-parent + 3.4.0-SNAPSHOT + + + + repo.spring.io.milestone + Spring Framework Maven Milestone Repository + https://repo.spring.io/milestone + + + repo.spring.io.snapshot + Spring Framework Maven Snapshot Repository + https://repo.spring.io/snapshot + + + diff --git a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java new file mode 100644 index 000000000..c083e2d80 --- /dev/null +++ b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java @@ -0,0 +1,135 @@ +/* + * Copyright 2016-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.mqtt5; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.integration.stream.CharacterStreamReadingMessageSource; +import org.springframework.messaging.MessageHandler; + +import java.nio.charset.StandardCharsets; + +/** + * Starts the Spring Context and will initialize the Spring Integration message flow. + * + * @author nmy6452 + * + */ +@SpringBootApplication +public class Application { + + private static final Log LOGGER = LogFactory.getLog(Application.class); + + /** + * Load the Spring Integration Application Context + * + * @param args - command line arguments + */ + public static void main(final String... args) { + + LOGGER.info("\n=========================================================" + + "\n " + + "\n Welcome to Spring Integration! " + + "\n " + + "\n For more information please visit: " + + "\n https://spring.io/projects/spring-integration " + + "\n " + + "\n========================================================="); + + LOGGER.info("\n=========================================================" + + "\n " + + "\n This is the MQTT5 Sample - " + + "\n " + + "\n Please enter some text and press return. The entered " + + "\n Message will be sent to the configured MQTT topic, " + + "\n then again immediately retrieved from the Message " + + "\n Broker and ultimately printed to the command line. " + + "\n " + + "\n========================================================="); + + SpringApplication.run(Application.class, args); + } + + @Bean + public MqttConnectionOptions mqttConnectionOptions() { + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{ "tcp://localhost:1883" }); + options.setUserName("guest"); + options.setPassword("guest".getBytes(StandardCharsets.UTF_8)); + return options; + } + + // publisher + + @Bean + public IntegrationFlow mqttOutFlow() { + return IntegrationFlow.from(CharacterStreamReadingMessageSource.stdin(), + e -> e.poller(Pollers.fixedDelay(1000))) + .transform(p -> p + " sent to MQTT5") + .handle(mqttOutbound()) + .get(); + } + + @Bean + public MessageHandler mqttOutbound() { + Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(mqttConnectionOptions(), "siSamplePublisher"); + messageHandler.setAsync(true); + messageHandler.setAsyncEvents(true); + messageHandler.setDefaultTopic("siSampleTopic"); + return messageHandler; + } + + // consumer + + @Bean + public IntegrationFlow mqttInFlow() { + return IntegrationFlow.from(mqttInbound()) + .transform(p -> p + ", received from MQTT5") + .handle(logger()) + .get(); + } + + private LoggingHandler logger() { + LoggingHandler loggingHandler = new LoggingHandler("INFO"); + loggingHandler.setLoggerName("siSample"); + return loggingHandler; + } + + @Bean + public MessageProducerSupport mqttInbound() { + Mqttv5PahoMessageDrivenChannelAdapter adapter = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectionOptions(),"siSampleConsumer", "siSampleTopic"); + adapter.setCompletionTimeout(5000); + adapter.setPayloadType(String.class); + adapter.setMessageConverter(new MqttStringToBytesConverter()); + adapter.setQos(1); + return adapter; + } + +} diff --git a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java new file mode 100644 index 000000000..7e48b14aa --- /dev/null +++ b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.mqtt5; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; + +import java.nio.charset.StandardCharsets; + +/** + * A simple {@link AbstractMessageConverter} that converts + * + * @author nmy6452 + * @since 5.2 + * + */ +public class MqttStringToBytesConverter extends AbstractMessageConverter { + @Override + protected boolean supports(Class clazz) { + return true; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, + Object conversionHint) { + + return message.getPayload().toString().getBytes(StandardCharsets.UTF_8); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + + return new String((byte[]) payload); + } + +} diff --git a/basic/mqtt5/src/main/resources/logback.xml b/basic/mqtt5/src/main/resources/logback.xml new file mode 100644 index 000000000..11bc14c60 --- /dev/null +++ b/basic/mqtt5/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + diff --git a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java new file mode 100644 index 000000000..11514c6b7 --- /dev/null +++ b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.mqtt5; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.springframework.integration.test.mock.MockIntegration.messageArgumentCaptor; +import static org.springframework.integration.test.mock.MockIntegration.mockMessageHandler; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.test.context.MockIntegrationContext; +import org.springframework.integration.test.context.SpringIntegrationTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author nmy6452 + * + * @since 5.2 + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest +@SpringIntegrationTest +public class ApplicationTest { + + @ClassRule + public static final BrokerRunning brokerRunning = BrokerRunning.isRunning(1883); + + @Autowired + private MockIntegrationContext mockIntegrationContext; + + @Autowired + private IntegrationFlow mqttOutFlow; + + @Test + public void test() throws InterruptedException { + ArgumentCaptor> captor = messageArgumentCaptor(); + CountDownLatch receiveLatch = new CountDownLatch(1); + MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown()); + this.mockIntegrationContext + .substituteMessageHandlerFor( + "mqttInFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1", + mockMessageHandler); + this.mqttOutFlow.getInputChannel().send(new GenericMessage<>("foo")); + assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(mockMessageHandler).handleMessage(any()); + assertThat(captor.getValue().getPayload()) + .isEqualTo("foo sent to MQTT5, received from MQTT5"); + } + +} diff --git a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java new file mode 100644 index 000000000..560569898 --- /dev/null +++ b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java @@ -0,0 +1,86 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.mqtt5; + +import static org.junit.Assume.assumeNoException; +import static org.junit.Assume.assumeTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.mqttv5.client.IMqttClient; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + + +/** + * @author nmy6452 + * + * @since 5.2 + * + */ +public class BrokerRunning extends TestWatcher { + + private static final Log logger = LogFactory.getLog(BrokerRunning.class); + + // Static so that we only test once on failure: speeds up test suite + private static final Map brokerOnline = new HashMap<>(); + + private final int port; + + private BrokerRunning(int port) { + this.port = port; + brokerOnline.put(port, true); + } + + @Override + public Statement apply(Statement base, Description description) { + assumeTrue(brokerOnline.get(port)); + String url = "tcp://localhost:" + port; + IMqttClient client = null; + try { + client = new MqttClient(url, "junit-" + System.currentTimeMillis()); + client.connect(); + } + catch (MqttException e) { + logger.warn("Tests not running because no broker on " + url + ":", e); + assumeNoException(e); + } + finally { + if (client != null) { + try { + client.disconnect(); + client.close(); + } + catch (MqttException e) { + } + } + } + return super.apply(base, description); + } + + + public static BrokerRunning isRunning(int port) { + return new BrokerRunning(port); + } + +} diff --git a/build.gradle b/build.gradle index 268a83eaf..bf35cc280 100644 --- a/build.gradle +++ b/build.gradle @@ -692,6 +692,36 @@ project('mqtt') { } +project('mqtt5') { + description = 'MQTT5 Basic Sample' + + apply plugin: 'org.springframework.boot' + + dependencies { + api 'org.springframework.boot:spring-boot-starter-integration' + api 'org.springframework.integration:spring-integration-stream' + api 'org.springframework.integration:spring-integration-mqtt' + api 'org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5' + + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation "org.springframework.integration:spring-integration-test" + } + + springBoot { + mainClass = 'org.springframework.integration.samples.mqtt5.Application' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.mqtt5.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } + +} + project('si4demo') { description = 'Java Configuration/DSL Sample'