Skip to content

Commit 36e4fe1

Browse files
mths1artembilan
authored andcommitted
GH-3955: Mqtt adapter unsubscribe when cleanStart
Fixes #3955 `Mqttv5PahoMessageDrivenChannelAdapter` unsubscribes from all topics, even if `cleanStart/cleanSession` is set to `false`, thus not receiving offline messages after restart. * unsubscribe `Mqttv5PahoMessageDrivenChannelAdapter` only when `cleanStart` * add tests **Cherry-pick to `5.5.x`** # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java
1 parent 067de96 commit 36e4fe1

File tree

2 files changed

+105
-1
lines changed

2 files changed

+105
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* @author Artem Bilan
6868
* @author Mikhail Polivakha
6969
* @author Lucas Bowler
70+
* @author Matthias Thoma
7071
*
7172
* @since 5.5.5
7273
*
@@ -190,7 +191,9 @@ protected void doStop() {
190191
String[] topics = getTopic();
191192
try {
192193
if (this.mqttClient != null && this.mqttClient.isConnected()) {
193-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
194+
if (this.connectionOptions.isCleanStart()) {
195+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
196+
}
194197
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
195198
}
196199
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.BDDMockito.given;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.verify;
24+
25+
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
26+
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
27+
import org.eclipse.paho.mqttv5.client.IMqttToken;
28+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
29+
import org.eclipse.paho.mqttv5.common.MqttException;
30+
import org.eclipse.paho.mqttv5.common.MqttSubscription;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.BeanFactory;
34+
import org.springframework.context.ApplicationEventPublisher;
35+
import org.springframework.integration.channel.NullChannel;
36+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
37+
import org.springframework.test.util.ReflectionTestUtils;
38+
39+
/**
40+
* @author Gary Russell
41+
* @author Artem Bilan
42+
* @author Matthias Thoma
43+
*
44+
* @since 5.5.16
45+
*
46+
*/
47+
public class Mqttv5AdapterTests {
48+
49+
@Test
50+
public void testStop() throws Exception {
51+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
52+
Mqttv5PahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, true);
53+
54+
adapter.start();
55+
adapter.connectComplete(false, null);
56+
adapter.stop();
57+
58+
verify(client).connect(any(MqttConnectionOptions.class));
59+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
60+
verify(client).unsubscribe(any(String[].class));
61+
}
62+
63+
@Test
64+
public void testStopNotClean() throws Exception {
65+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
66+
Mqttv5PahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false);
67+
68+
adapter.start();
69+
adapter.connectComplete(false, null);
70+
adapter.stop();
71+
72+
verify(client).connect(any(MqttConnectionOptions.class));
73+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
74+
verify(client, never()).unsubscribe(any(String[].class));
75+
}
76+
77+
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(IMqttAsyncClient client,
78+
boolean cleanStart) throws MqttException {
79+
80+
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
81+
connectionOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
82+
connectionOptions.setCleanStart(cleanStart);
83+
84+
given(client.isConnected()).willReturn(true);
85+
IMqttToken token = mock(IMqttToken.class);
86+
given(client.disconnect()).willReturn(token);
87+
given(client.connect(any(MqttConnectionOptions.class))).willReturn(token);
88+
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any()))
89+
.willReturn(token);
90+
given(client.unsubscribe(any(String[].class))).willReturn(token);
91+
Mqttv5PahoMessageDrivenChannelAdapter adapter =
92+
new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
93+
ReflectionTestUtils.setField(adapter, "mqttClient", client);
94+
adapter.setBeanFactory(mock(BeanFactory.class));
95+
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));
96+
adapter.setOutputChannel(new NullChannel());
97+
adapter.afterPropertiesSet();
98+
return adapter;
99+
}
100+
101+
}

0 commit comments

Comments
 (0)