Skip to content

Commit 5d00416

Browse files
authored
Fix race condition when IoT message arrives immediately after subscribe (#2794)
1 parent 8d9005f commit 5d00416

File tree

2 files changed

+49
-4
lines changed

2 files changed

+49
-4
lines changed

aws-android-sdk-iot/src/main/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,6 +1477,9 @@ public void subscribeToTopic(final String topic, final AWSIotMqttQos qos,
14771477

14781478
if (null != mqttClient) {
14791479
try {
1480+
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
1481+
topicListeners.put(topic, topicModel);
1482+
14801483
if (subscriptionStatusCallback != null) {
14811484
mqttClient.subscribe(topic, qos.asInt(), null, new IMqttActionListener() {
14821485
@Override
@@ -1495,14 +1498,14 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
14951498
mqttClient.subscribe(topic, qos.asInt());
14961499
}
14971500
} catch (final MqttException e) {
1498-
if(subscriptionStatusCallback != null) {
1501+
topicListeners.remove(topic);
1502+
1503+
if (subscriptionStatusCallback != null) {
14991504
subscriptionStatusCallback.onFailure(e);
15001505
} else {
15011506
throw new AmazonClientException("Client error when subscribing.", e);
15021507
}
15031508
}
1504-
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
1505-
topicListeners.put(topic, topicModel);
15061509
}
15071510
}
15081511

@@ -2051,4 +2054,4 @@ enum AuthenticationMode {
20512054
public boolean getSessionPresent() {
20522055
return sessionPresent;
20532056
}
2054-
}
2057+
}

aws-android-sdk-iot/src/test/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManagerTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,48 @@ public void testSubscribeToTopic() throws Exception {
11311131
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
11321132
}
11331133

1134+
@Test
1135+
public void testSubscribeToTopicWithImmediateMessageArriving() throws Exception {
1136+
final MockMqttClient mockClient = new MockMqttClient();
1137+
1138+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1139+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1140+
testClient.setMqttClient(mockClient);
1141+
1142+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1143+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1144+
testClient.connect(testKeystore, null);
1145+
1146+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1147+
1148+
AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback = new AWSIotMqttSubscriptionStatusCallback() {
1149+
@Override
1150+
public void onSuccess() {
1151+
MqttMessage msg = new MqttMessage();
1152+
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
1153+
try {
1154+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1155+
} catch (Exception e) {
1156+
fail("Could not simulate arriving message: " + e);
1157+
}
1158+
}
1159+
1160+
@Override
1161+
public void onFailure(Throwable exception) {
1162+
fail("Subscribing failed while simulate arriving message: " + exception);
1163+
}
1164+
};
1165+
1166+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, subscriptionStatusCallback, mcb);
1167+
1168+
assertEquals(1, mockClient.subscribeCalls);
1169+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1170+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1171+
1172+
assertEquals(1, mcb.receivedMessages.size());
1173+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1174+
}
1175+
11341176
@Test(expected = IllegalArgumentException.class)
11351177
public void testSubscribeToTopicNullTopic() throws Exception {
11361178
MockMqttClient mockClient = new MockMqttClient();

0 commit comments

Comments
 (0)