11/*
2- * Copyright 2002-2020 the original author or authors.
2+ * Copyright 2002-2021 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1717package org .springframework .integration .mqtt ;
1818
1919import static org .assertj .core .api .Assertions .assertThat ;
20+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
2021import static org .assertj .core .api .Assertions .fail ;
2122import static org .mockito .Mockito .mock ;
2223
23- import java .util . Arrays ;
24+ import java .io . File ;
2425import java .util .Collections ;
2526import java .util .concurrent .CountDownLatch ;
2627import java .util .concurrent .TimeUnit ;
2728
2829import org .eclipse .paho .client .mqttv3 .MqttClientPersistence ;
2930import org .eclipse .paho .client .mqttv3 .persist .MqttDefaultFilePersistence ;
30- import org .junit .ClassRule ;
31- import org .junit .Test ;
32- import org .junit .rules . TemporaryFolder ;
33- import org .junit .runner . RunWith ;
31+ import org .junit .jupiter . api . AfterAll ;
32+ import org .junit .jupiter . api . BeforeAll ;
33+ import org .junit .jupiter . api . Test ;
34+ import org .junit .jupiter . api . io . TempDir ;
3435
3536import org .springframework .beans .factory .BeanFactory ;
3637import org .springframework .beans .factory .annotation .Autowired ;
5051import org .springframework .integration .support .MessageBuilder ;
5152import org .springframework .integration .support .json .EmbeddedJsonHeadersMessageMapper ;
5253import org .springframework .integration .support .json .JacksonJsonUtils ;
54+ import org .springframework .integration .test .condition .LongRunningTest ;
5355import org .springframework .messaging .Message ;
5456import org .springframework .messaging .MessageChannel ;
5557import org .springframework .messaging .MessagingException ;
5658import org .springframework .messaging .PollableChannel ;
5759import org .springframework .messaging .support .GenericMessage ;
5860import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
5961import org .springframework .test .annotation .DirtiesContext ;
60- import org .springframework .test .context .junit4 . SpringRunner ;
62+ import org .springframework .test .context .junit . jupiter . SpringJUnitConfig ;
6163
6264/**
6365 * @author Gary Russell
6668 * @since 4.0
6769 *
6870 */
69- @ RunWith (SpringRunner .class )
71+ @ LongRunningTest
72+ @ SpringJUnitConfig
7073@ DirtiesContext
71- public class BackToBackAdapterTests {
74+ public class BackToBackAdapterTests implements MosquittoContainerTest {
7275
73- @ ClassRule
74- public static final BrokerRunning brokerRunning = BrokerRunning . isRunning ( 1883 ) ;
76+ @ TempDir
77+ static File folder ;
7578
76- @ ClassRule
77- public static final TemporaryFolder folder = new TemporaryFolder ();
79+ static ThreadPoolTaskScheduler taskScheduler ;
7880
7981 @ Autowired
8082 private MessageChannel out ;
@@ -85,31 +87,40 @@ public class BackToBackAdapterTests {
8587 @ Autowired
8688 private EventsListener listener ;
8789
90+ @ BeforeAll
91+ static void setup () {
92+ taskScheduler = new ThreadPoolTaskScheduler ();
93+ taskScheduler .initialize ();
94+ }
95+
96+ @ AfterAll
97+ static void teardown () {
98+ taskScheduler .destroy ();
99+ }
100+
88101 @ Test
89102 public void testSingleTopic () {
90- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
103+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
91104 adapter .setDefaultTopic ("mqtt-foo" );
92105 adapter .setBeanFactory (mock (BeanFactory .class ));
93106 adapter .afterPropertiesSet ();
94107 adapter .start ();
95- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" ,
108+ MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest . mqttUrl () ,
96109 "si-test-in" , "mqtt-foo" );
97110 QueueChannel outputChannel = new QueueChannel ();
98111 inbound .setOutputChannel (outputChannel );
99- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
100- taskScheduler .initialize ();
101112 inbound .setTaskScheduler (taskScheduler );
102113 inbound .setBeanFactory (mock (BeanFactory .class ));
103114 inbound .afterPropertiesSet ();
104115 inbound .start ();
105- adapter .handleMessage (new GenericMessage <String >("foo" ));
116+ adapter .handleMessage (new GenericMessage <>("foo" ));
106117 Message <?> out = outputChannel .receive (20000 );
107118 assertThat (out ).isNotNull ();
108119 adapter .stop ();
109120 inbound .stop ();
110121 assertThat (out .getPayload ()).isEqualTo ("foo" );
111122 assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-foo" );
112- assertThat (adapter .getConnectionInfo ().getServerURIs ()[0 ]).isEqualTo ("tcp://localhost:1883" );
123+ assertThat (adapter .getConnectionInfo ().getServerURIs ()[0 ]).isEqualTo (MosquittoContainerTest . mqttUrl () );
113124 }
114125
115126 @ Test
@@ -123,7 +134,7 @@ public void testJsonNoTrust() {
123134 }
124135
125136 private void testJsonCommon (String ... trusted ) {
126- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
137+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
127138 adapter .setDefaultTopic ("mqtt-foo" );
128139 adapter .setBeanFactory (mock (BeanFactory .class ));
129140 EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper (
@@ -133,18 +144,16 @@ private void testJsonCommon(String... trusted) {
133144 adapter .setConverter (converter );
134145 adapter .afterPropertiesSet ();
135146 adapter .start ();
136- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ( "tcp://localhost:1883" ,
137- "si-test-in" , "mqtt-foo" );
147+ MqttPahoMessageDrivenChannelAdapter inbound =
148+ new MqttPahoMessageDrivenChannelAdapter ( MosquittoContainerTest . mqttUrl (), "si-test-in" , "mqtt-foo" );
138149 QueueChannel outputChannel = new QueueChannel ();
139150 inbound .setOutputChannel (outputChannel );
140- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
141- taskScheduler .initialize ();
142151 inbound .setTaskScheduler (taskScheduler );
143152 inbound .setBeanFactory (mock (BeanFactory .class ));
144153 inbound .setConverter (converter );
145154 inbound .afterPropertiesSet ();
146155 inbound .start ();
147- adapter .handleMessage (new GenericMessage <Foo >(new Foo ("bar" ), Collections .singletonMap ("baz" , "qux" )));
156+ adapter .handleMessage (new GenericMessage <>(new Foo ("bar" ), Collections .singletonMap ("baz" , "qux" )));
148157 Message <?> out = outputChannel .receive (20000 );
149158 assertThat (out ).isNotNull ();
150159 adapter .stop ();
@@ -161,22 +170,21 @@ private void testJsonCommon(String... trusted) {
161170
162171 @ Test
163172 public void testAddRemoveTopic () {
164- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
173+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
165174 adapter .setDefaultTopic ("mqtt-foo" );
166175 adapter .setBeanFactory (mock (BeanFactory .class ));
167176 adapter .afterPropertiesSet ();
168177 adapter .start ();
169- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" );
178+ MqttPahoMessageDrivenChannelAdapter inbound =
179+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (), "si-test-in" );
170180 QueueChannel outputChannel = new QueueChannel ();
171181 inbound .setOutputChannel (outputChannel );
172- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
173- taskScheduler .initialize ();
174182 inbound .setTaskScheduler (taskScheduler );
175183 inbound .setBeanFactory (mock (BeanFactory .class ));
176184 inbound .afterPropertiesSet ();
177185 inbound .start ();
178186 inbound .addTopic ("mqtt-foo" );
179- adapter .handleMessage (new GenericMessage <String >("foo" ));
187+ adapter .handleMessage (new GenericMessage <>("foo" ));
180188 Message <?> out = outputChannel .receive (20_000 );
181189 assertThat (out ).isNotNull ();
182190 assertThat (out .getPayload ()).isEqualTo ("foo" );
@@ -194,17 +202,13 @@ public void testAddRemoveTopic() {
194202 out = outputChannel .receive (1 );
195203 assertThat (out ).isNull ();
196204
197- try {
198- inbound .addTopic ("mqtt-foo" );
199- fail ("Expected exception" );
200- }
201- catch (MessagingException e ) {
202- assertThat (e .getMessage ()).isEqualTo ("Topic 'mqtt-foo' is already subscribed." );
203- }
205+ assertThatExceptionOfType (MessagingException .class )
206+ .isThrownBy (() -> inbound .addTopic ("mqtt-foo" ))
207+ .withMessage ("Topic 'mqtt-foo' is already subscribed." );
204208
205209 inbound .addTopic ("mqqt-bar" , "mqqt-baz" );
206210 inbound .removeTopic ("mqqt-bar" , "mqqt-baz" );
207- inbound .addTopics (new String [] { "mqqt-bar" , "mqqt-baz" }, new int [] { 0 , 0 });
211+ inbound .addTopics (new String []{ "mqqt-bar" , "mqqt-baz" }, new int []{ 0 , 0 });
208212 inbound .removeTopic ("mqqt-bar" , "mqqt-baz" );
209213
210214 adapter .stop ();
@@ -213,22 +217,21 @@ public void testAddRemoveTopic() {
213217
214218 @ Test
215219 public void testTwoTopics () {
216- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
220+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
217221 adapter .setDefaultTopic ("mqtt-foo" );
218222 adapter .setBeanFactory (mock (BeanFactory .class ));
219223 adapter .afterPropertiesSet ();
220224 adapter .start ();
221- MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" ,
222- "si-test-in" , "mqtt-foo" , "mqtt-bar" );
225+ MqttPahoMessageDrivenChannelAdapter inbound =
226+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (),
227+ "si-test-in" , "mqtt-foo" , "mqtt-bar" );
223228 QueueChannel outputChannel = new QueueChannel ();
224229 inbound .setOutputChannel (outputChannel );
225- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
226- taskScheduler .initialize ();
227230 inbound .setTaskScheduler (taskScheduler );
228231 inbound .setBeanFactory (mock (BeanFactory .class ));
229232 inbound .afterPropertiesSet ();
230233 inbound .start ();
231- adapter .handleMessage (new GenericMessage <String >("foo" ));
234+ adapter .handleMessage (new GenericMessage <>("foo" ));
232235 Message <?> message = MessageBuilder .withPayload ("bar" ).setHeader (MqttHeaders .TOPIC , "mqtt-bar" ).build ();
233236 adapter .handleMessage (message );
234237 Message <?> out = outputChannel .receive (20000 );
@@ -240,12 +243,13 @@ public void testTwoTopics() {
240243 inbound .stop ();
241244 assertThat (out .getPayload ()).isEqualTo ("bar" );
242245 assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-bar" );
246+
243247 adapter .stop ();
244248 }
245249
246250 @ Test
247251 public void testAsync () throws Exception {
248- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ("tcp://localhost:1883" , "si-test-out" );
252+ MqttPahoMessageHandler adapter = new MqttPahoMessageHandler (MosquittoContainerTest . mqttUrl () , "si-test-out" );
249253 adapter .setDefaultTopic ("mqtt-foo" );
250254 adapter .setBeanFactory (mock (BeanFactory .class ));
251255 adapter .setAsync (true );
@@ -255,33 +259,32 @@ public void testAsync() throws Exception {
255259 adapter .afterPropertiesSet ();
256260 adapter .start ();
257261 MqttPahoMessageDrivenChannelAdapter inbound =
258- new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" , "mqtt-foo" );
262+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest . mqttUrl () , "si-test-in" , "mqtt-foo" );
259263 QueueChannel outputChannel = new QueueChannel ();
260264 inbound .setOutputChannel (outputChannel );
261- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
262- taskScheduler .initialize ();
263265 inbound .setTaskScheduler (taskScheduler );
264266 inbound .setBeanFactory (mock (BeanFactory .class ));
265267 inbound .afterPropertiesSet ();
266268 inbound .start ();
267- GenericMessage <String > message = new GenericMessage <String >("foo" );
269+ GenericMessage <String > message = new GenericMessage <>("foo" );
268270 adapter .handleMessage (message );
269271 verifyEvents (adapter , publisher , message );
270272 Message <?> out = outputChannel .receive (20000 );
271273 assertThat (out ).isNotNull ();
272274 adapter .stop ();
273275 inbound .stop ();
276+
274277 assertThat (out .getPayload ()).isEqualTo ("foo" );
275278 assertThat (out .getHeaders ().get (MqttHeaders .RECEIVED_TOPIC )).isEqualTo ("mqtt-foo" );
276279 }
277280
278281 @ Test
279282 public void testAsyncPersisted () throws Exception {
280283 DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory ();
281- MqttClientPersistence persistence = new MqttDefaultFilePersistence (folder .getRoot (). getAbsolutePath ());
284+ MqttClientPersistence persistence = new MqttDefaultFilePersistence (folder .getAbsolutePath ());
282285 factory .setPersistence (persistence );
283- MqttPahoMessageHandler adapter = new MqttPahoMessageHandler ( "tcp://localhost:1883" , "si-test-out" ,
284- factory );
286+ MqttPahoMessageHandler adapter =
287+ new MqttPahoMessageHandler ( MosquittoContainerTest . mqttUrl (), "si-test-out" , factory );
285288 adapter .setDefaultTopic ("mqtt-foo" );
286289 adapter .setBeanFactory (mock (BeanFactory .class ));
287290 adapter .setAsync (true );
@@ -293,16 +296,15 @@ public void testAsyncPersisted() throws Exception {
293296 adapter .start ();
294297
295298 MqttPahoMessageDrivenChannelAdapter inbound =
296- new MqttPahoMessageDrivenChannelAdapter ("tcp://localhost:1883" , "si-test-in" , "mqtt-foo" , "mqtt-bar" );
299+ new MqttPahoMessageDrivenChannelAdapter (MosquittoContainerTest .mqttUrl (),
300+ "si-test-in" , "mqtt-foo" , "mqtt-bar" );
297301 QueueChannel outputChannel = new QueueChannel ();
298302 inbound .setOutputChannel (outputChannel );
299- ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
300- taskScheduler .initialize ();
301303 inbound .setTaskScheduler (taskScheduler );
302304 inbound .setBeanFactory (mock (BeanFactory .class ));
303305 inbound .afterPropertiesSet ();
304306 inbound .start ();
305- Message <String > message1 = new GenericMessage <String >("foo" );
307+ Message <String > message1 = new GenericMessage <>("foo" );
306308 adapter .handleMessage (message1 );
307309 verifyEvents (adapter , publisher1 , message1 );
308310
@@ -334,7 +336,7 @@ public void testAsyncPersisted() throws Exception {
334336
335337 assertThat (publisher1 .delivered .getClientInstance ()).isNotEqualTo (clientInstance );
336338
337- Message <?> out = null ;
339+ Message <?> out ;
338340 for (int i = 0 ; i < 4 ; i ++) {
339341 out = outputChannel .receive (20000 );
340342 assertThat (out ).isNotNull ();
@@ -354,6 +356,7 @@ else if ("bar".equals(out.getPayload())) {
354356
355357 private void verifyEvents (MqttPahoMessageHandler adapter , EventPublisher publisher1 , Message <String > message1 )
356358 throws InterruptedException {
359+
357360 assertThat (publisher1 .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
358361 assertThat (publisher1 .sent ).isNotNull ();
359362 assertThat (publisher1 .delivered ).isNotNull ();
@@ -373,13 +376,13 @@ private void verifyMessageIds(EventPublisher publisher1, EventPublisher publishe
373376
374377 @ Test
375378 public void testMultiURIs () {
376- out .send (new GenericMessage <String >("foo" ));
379+ out .send (new GenericMessage <>("foo" ));
377380 Message <?> message = in .receive (20000 );
378381 assertThat (message ).isNotNull ();
379382 assertThat (message .getPayload ()).isEqualTo ("foo" );
380383 MqttPahoComponent source = this .listener .event .getSourceAsType ();
381- assertThat (Arrays . toString ( source .getConnectionInfo ().getServerURIs () ))
382- .isEqualTo ( "[tcp://localhost:1883, tcp://localhost:1883]" );
384+ assertThat (source .getConnectionInfo ().getServerURIs ())
385+ .contains ( MosquittoContainerTest . mqttUrl (), MosquittoContainerTest . mqttUrl () );
383386 }
384387
385388 public static class EventsListener implements ApplicationListener <MqttSubscribedEvent > {
0 commit comments