Skip to content

Commit 628d235

Browse files
artembilangaryrussell
authored andcommitted
GH-2275: Fix MqttMDCA for callbacks after stop
Resolves: #2275 * Remove the `MqttPahoMessageDrivenChannelAdapter` as a callback from the `IMqttClient` during `stop()` * Check the `isRunning()` from the `connectionLost` callback to avoid unexpected `scheduleReconnect()` when we are not running **Cherry-pick to 4.3.x** Conflicts: spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java Resolved.
1 parent 26819f5 commit 628d235

File tree

2 files changed

+26
-10
lines changed

2 files changed

+26
-10
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
* Eclipse Paho Implementation.
4444
*
4545
* @author Gary Russell
46+
* @author Artem Bilan
47+
*
4648
* @since 1.0
4749
*
4850
*/
@@ -172,6 +174,9 @@ protected synchronized void doStop() {
172174
catch (MqttException e) {
173175
logger.error("Exception while disconnecting", e);
174176
}
177+
178+
this.client.setCallback(null);
179+
175180
try {
176181
this.client.close();
177182
}
@@ -313,11 +318,13 @@ public void run() {
313318

314319
@Override
315320
public synchronized void connectionLost(Throwable cause) {
316-
this.logger.error("Lost connection:" + cause.getMessage() + "; retrying...");
317-
this.connected = false;
318-
scheduleReconnect();
319-
if (this.applicationEventPublisher != null) {
320-
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
321+
if (isRunning()) {
322+
this.logger.error("Lost connection: " + cause.getMessage() + "; retrying...");
323+
this.connected = false;
324+
scheduleReconnect();
325+
if (this.applicationEventPublisher != null) {
326+
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
327+
}
321328
}
322329
}
323330

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
2828
import static org.mockito.BDDMockito.willReturn;
29-
import static org.mockito.Mockito.any;
30-
import static org.mockito.Mockito.anyLong;
31-
import static org.mockito.Mockito.anyString;
29+
import static org.mockito.Matchers.any;
30+
import static org.mockito.Matchers.anyLong;
31+
import static org.mockito.Matchers.anyString;
3232
import static org.mockito.Mockito.mock;
3333
import static org.mockito.Mockito.never;
3434
import static org.mockito.Mockito.spy;
3535
import static org.mockito.Mockito.times;
3636
import static org.mockito.Mockito.verify;
3737

38+
import java.util.Date;
3839
import java.util.Properties;
3940
import java.util.concurrent.BlockingQueue;
4041
import java.util.concurrent.CountDownLatch;
@@ -56,7 +57,6 @@
5657
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
5758
import org.eclipse.paho.client.mqttv3.MqttException;
5859
import org.eclipse.paho.client.mqttv3.MqttMessage;
59-
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
6060
import org.eclipse.paho.client.mqttv3.MqttToken;
6161
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
6262
import org.junit.Test;
@@ -83,6 +83,8 @@
8383

8484
/**
8585
* @author Gary Russell
86+
* @author Artem Bilan
87+
*
8688
* @since 4.0
8789
*
8890
*/
@@ -331,6 +333,13 @@ public void testStopActionAlways() throws Exception {
331333
adapter.start();
332334
adapter.stop();
333335
verifyUnsubscribe(client);
336+
337+
adapter.connectionLost(new RuntimeException("Intentional"));
338+
339+
TaskScheduler taskScheduler = TestUtils.getPropertyValue(adapter, "taskScheduler", TaskScheduler.class);
340+
341+
verify(taskScheduler, never())
342+
.schedule(any(Runnable.class), any(Date.class));
334343
}
335344

336345
@Test
@@ -372,7 +381,7 @@ public void testReconnect() throws Exception {
372381
}
373382

374383
private MqttPahoMessageDrivenChannelAdapter buildAdapter(final IMqttClient client, Boolean cleanSession,
375-
ConsumerStopAction action) throws MqttException, MqttSecurityException {
384+
ConsumerStopAction action) throws MqttException {
376385
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory() {
377386

378387
@Override

0 commit comments

Comments
 (0)