Skip to content

Commit e53ee1b

Browse files
fix(iot): Keep topicListeners if cleanSession = false (#3352)
* Keep topic subscriptions for persistent connections. * Add tests * fix typo * Update aws-android-sdk-iot/src/main/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManager.java Co-authored-by: Erica Eaton <[email protected]> * Update aws-android-sdk-iot/src/test/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManagerTest.java Co-authored-by: Erica Eaton <[email protected]> --------- Co-authored-by: Erica Eaton <[email protected]>
1 parent 22a767c commit e53ee1b

File tree

2 files changed

+177
-2
lines changed

2 files changed

+177
-2
lines changed

aws-android-sdk-iot/src/main/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManager.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,14 @@ private void mqttConnect(MqttConnectOptions options) {
11581158
(isMetricsEnabled() ? "enabled" : "disabled") +
11591159
", username: " + options.getUserName());
11601160

1161-
topicListeners.clear();
1161+
/*
1162+
If cleanSession is set to false, the server will treat the subscriptions as durable.
1163+
We should not clear out the client listeners, because subscriptions are still maintained
1164+
on the Paho client.
1165+
*/
1166+
if (cleanSession) {
1167+
topicListeners.clear();
1168+
}
11621169
mqttMessageQueue.clear();
11631170

11641171
resetReconnect();
@@ -1272,7 +1279,10 @@ private String getEndpointWithHttpPort() {
12721279
public boolean disconnect() {
12731280
userDisconnect = true;
12741281
reset();
1275-
topicListeners.clear();
1282+
// do not clear topic listeners if persistent connection is enabled
1283+
if (cleanSession) {
1284+
topicListeners.clear();
1285+
}
12761286
connectionState = MqttManagerConnectionState.Disconnected;
12771287
userConnectionCallback();
12781288
return true;
@@ -1546,6 +1556,9 @@ public void unsubscribeTopic(String topic) {
15461556
* Resubscribe to previously subscribed topics on reconnecting.
15471557
*/
15481558
void resubscribeToTopics() {
1559+
// we do not need to resubscribe to topics if using persistent connections
1560+
if (!cleanSession) return;
1561+
15491562
LOGGER.info("Auto-resubscribe is enabled. Resubscribing to previous topics.");
15501563
for (final AWSIotMqttTopic topic : topicListeners.values()) {
15511564
if (mqttClient != null) {
@@ -2080,4 +2093,9 @@ private void fixTLSPre21(MqttConnectOptions options) {
20802093
options.setSocketFactory(tls12SocketFactory);
20812094
}
20822095
}
2096+
2097+
// for testing
2098+
protected Map<String, AWSIotMqttTopic> getTopicListeners() {
2099+
return topicListeners;
2100+
}
20832101
}

aws-android-sdk-iot/src/test/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManagerTest.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,163 @@ public void testSubscribeToTopicWithSubscriptionCallback() throws Exception {
11041104
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
11051105
}
11061106

1107+
@Test
1108+
public void testSubscriptionsNotRetainedForStandardConnectionOnDisconnect() throws Exception {
1109+
MockMqttClient mockClient = new MockMqttClient();
1110+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1111+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1112+
testClient.setMqttClient(mockClient);
1113+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1114+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1115+
testClient.setCleanSession(true);
1116+
testClient.setAutoReconnect(false);
1117+
testClient.connect(testKeystore, null);
1118+
mockClient.mockConnectSuccess();
1119+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1120+
TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();
1121+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);
1122+
1123+
assertEquals(1, testClient.getTopicListeners().size());
1124+
1125+
testClient.disconnect();
1126+
1127+
assertEquals(0, testClient.getTopicListeners().size());
1128+
}
1129+
1130+
@Test
1131+
public void testSubscriptionsRetainedForPersistentConnectionOnDisconnect() throws Exception {
1132+
MockMqttClient mockClient = new MockMqttClient();
1133+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1134+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1135+
testClient.setMqttClient(mockClient);
1136+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1137+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1138+
testClient.setCleanSession(false);
1139+
testClient.setAutoReconnect(false);
1140+
testClient.connect(testKeystore, null);
1141+
mockClient.mockConnectSuccess();
1142+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1143+
TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();
1144+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);
1145+
1146+
assertEquals(1, testClient.getTopicListeners().size());
1147+
1148+
testClient.disconnect();
1149+
1150+
assertEquals(1, testClient.getTopicListeners().size());
1151+
}
1152+
1153+
@Test
1154+
public void testSubscriptionsNotRetainedOnStandardConnection() throws Exception {
1155+
MockMqttClient mockClient = new MockMqttClient();
1156+
1157+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1158+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1159+
testClient.setMqttClient(mockClient);
1160+
1161+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1162+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1163+
testClient.setCleanSession(true);
1164+
testClient.setAutoReconnect(false);
1165+
1166+
testClient.connect(testKeystore, null);
1167+
1168+
mockClient.mockConnectSuccess();
1169+
1170+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1171+
1172+
TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();
1173+
1174+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);
1175+
1176+
assertEquals(1, testClient.getTopicListeners().size());
1177+
assertEquals(1, mockClient.connectCalls);
1178+
assertEquals(1, mockClient.subscribeCalls);
1179+
assertTrue(sscb.subscribed);
1180+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1181+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1182+
1183+
MqttMessage msg = new MqttMessage();
1184+
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
1185+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1186+
1187+
assertEquals(1, mcb.receivedMessages.size());
1188+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1189+
1190+
mockClient.mockDisconnect();
1191+
1192+
testClient.connect(testKeystore, null);
1193+
1194+
assertEquals(0, testClient.getTopicListeners().size());
1195+
assertEquals(2, mockClient.connectCalls);
1196+
assertEquals(1, mockClient.subscribeCalls);
1197+
assertTrue(sscb.subscribed);
1198+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1199+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1200+
1201+
MqttMessage msg2 = new MqttMessage();
1202+
msg2.setPayload("test payload".getBytes(StringUtils.UTF8));
1203+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1204+
1205+
// received still only 1
1206+
assertEquals(1, mcb.receivedMessages.size());
1207+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1208+
}
1209+
1210+
@Test
1211+
public void testSubscriptionsRetainedOnPersistentConnection() throws Exception {
1212+
MockMqttClient mockClient = new MockMqttClient();
1213+
1214+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1215+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1216+
testClient.setMqttClient(mockClient);
1217+
1218+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1219+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1220+
testClient.setCleanSession(false);
1221+
testClient.setAutoReconnect(false);
1222+
1223+
testClient.connect(testKeystore, null);
1224+
1225+
mockClient.mockConnectSuccess();
1226+
1227+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1228+
1229+
TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();
1230+
1231+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);
1232+
1233+
assertEquals(1, mockClient.connectCalls);
1234+
assertEquals(1, mockClient.subscribeCalls);
1235+
assertTrue(sscb.subscribed);
1236+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1237+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1238+
1239+
MqttMessage msg = new MqttMessage();
1240+
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
1241+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1242+
1243+
assertEquals(1, mcb.receivedMessages.size());
1244+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1245+
1246+
mockClient.mockDisconnect();
1247+
1248+
testClient.connect(testKeystore, null);
1249+
1250+
assertEquals(2, mockClient.connectCalls);
1251+
assertEquals(1, mockClient.subscribeCalls);
1252+
assertTrue(sscb.subscribed);
1253+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1254+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1255+
1256+
MqttMessage msg2 = new MqttMessage();
1257+
msg2.setPayload("test payload".getBytes(StringUtils.UTF8));
1258+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1259+
1260+
assertEquals(2, mcb.receivedMessages.size());
1261+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1262+
}
1263+
11071264
@Test
11081265
public void testSubscribeToTopic() throws Exception {
11091266
MockMqttClient mockClient = new MockMqttClient();

0 commit comments

Comments
 (0)