Skip to content

Commit df5f6b9

Browse files
authored
Add subscription status callback to subscribeToTopic (#1086)
1 parent 4fbcde3 commit df5f6b9

File tree

6 files changed

+157
-5
lines changed

6 files changed

+157
-5
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Change Log - AWS SDK for Android
2-
2+
3+
## [Release 2.14.2](https://github.com/aws/aws-sdk-android/releases/tag/release_v2.14.2)
4+
5+
### New Features
6+
7+
- **AWS IoT**
8+
- Added an overloaded version of `subscribeToTopic()` method, `public void subscribeToTopic(final String topic, final AWSIotMqttQos qos, final AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback, final AWSIotMqttNewMessageCallback callback);`, in `AWSIotMqttManager` which accepts subscription status callback to notify users of the status of subscription operation. See [Issue#1005](https://github.com/aws-amplify/aws-sdk-android/issues/1005) for details.
9+
310
## [Release 2.14.1](https://github.com/aws/aws-sdk-android/releases/tag/release_v2.14.1)
411

512
### New Features

aws-android-sdk-iot/src/androidTest/java/com/amazonaws/mobileconnectors/iot/MqttManagerIntegrationTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import org.junit.After;
4343
import org.junit.Before;
44-
import org.junit.Ignore;
4544
import org.junit.Test;
4645

4746
import java.io.File;
@@ -705,6 +704,23 @@ public void onMessageArrived(String topic, byte[] data) {
705704
assertEquals((int)ONE_TWENTY_KB, messages.get(0).length());
706705
}
707706

707+
/**
708+
* Test Subscribe status callback
709+
*/
710+
private class TestSubscriptionStatusCallback implements AWSIotMqttSubscriptionStatusCallback {
711+
String subscriptionStatus = null;
712+
713+
@Override
714+
public void onSuccess() {
715+
subscriptionStatus = "Subscription successful";
716+
}
717+
718+
@Override
719+
public void onFailure(Throwable exception) {
720+
subscriptionStatus = "Subscription failed";
721+
}
722+
}
723+
708724
@Test
709725
public void mqttWebSocket() throws Exception {
710726

@@ -728,15 +744,18 @@ public void onStatusChanged(AWSIotMqttClientStatus status, Throwable throwable)
728744
assertEquals(AWSIotMqttClientStatusCallback.AWSIotMqttClientStatus.Connecting, statuses.get(0));
729745
assertEquals(AWSIotMqttClientStatusCallback.AWSIotMqttClientStatus.Connected, statuses.get(1));
730746

747+
TestSubscriptionStatusCallback sscb = new TestSubscriptionStatusCallback();
748+
731749
// subscribe to MQTT topic
732-
mqttManager.subscribeToTopic("sdk/test/integration/ws", AWSIotMqttQos.QOS0, new AWSIotMqttNewMessageCallback() {
750+
mqttManager.subscribeToTopic("sdk/test/integration/ws", AWSIotMqttQos.QOS0, sscb, new AWSIotMqttNewMessageCallback() {
733751
@Override
734752
public void onMessageArrived(String topic, byte[] data) {
735753
messages.add(new String(data));
736754
}
737755
});
738756
// ensure subscription propagates
739757
Thread.sleep(2000);
758+
assertEquals("Subscription successful", sscb.subscriptionStatus);
740759
// publish 20 messages
741760
for (int i=0; i<20; ++i) {
742761
mqttManager.publishString("integration test " + i, "sdk/test/integration/ws", AWSIotMqttQos.QOS0);

aws-android-sdk-iot/src/main/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManager.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,23 @@ public void resetReconnect() {
11851185
*/
11861186
public void subscribeToTopic(String topic, AWSIotMqttQos qos,
11871187
AWSIotMqttNewMessageCallback callback) {
1188+
subscribeToTopic(topic, qos, null, callback);
1189+
}
1190+
1191+
/**
1192+
* Subscribes to an MQTT topic
1193+
*
1194+
* @param topic The topic to which to subscribe.
1195+
* @param qos Quality of Service Level of the subscription.
1196+
* @param subscriptionStatusCallback Callback that will be notified when the subscribe has completed.
1197+
* Any exception encountered during the subscribe operation is reported on the callback
1198+
* if avalialble, else AmazonClientException is thrown by this method.
1199+
* @param callback Callback to be called when new message is received on this
1200+
* topic for this subscription.
1201+
*/
1202+
public void subscribeToTopic(final String topic, final AWSIotMqttQos qos,
1203+
final AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback,
1204+
final AWSIotMqttNewMessageCallback callback) {
11881205

11891206
if (topic == null || topic.isEmpty()) {
11901207
throw new IllegalArgumentException("topic is null or empty");
@@ -1196,9 +1213,27 @@ public void subscribeToTopic(String topic, AWSIotMqttQos qos,
11961213

11971214
if (null != mqttClient) {
11981215
try {
1199-
mqttClient.subscribe(topic, qos.asInt());
1216+
if (subscriptionStatusCallback != null) {
1217+
mqttClient.subscribe(topic, qos.asInt(), null, new IMqttActionListener() {
1218+
@Override
1219+
public void onSuccess(IMqttToken asyncActionToken) {
1220+
subscriptionStatusCallback.onSuccess();
1221+
}
1222+
1223+
@Override
1224+
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
1225+
subscriptionStatusCallback.onFailure(exception);
1226+
}
1227+
});
1228+
} else {
1229+
mqttClient.subscribe(topic, qos.asInt());
1230+
}
12001231
} catch (final MqttException e) {
1201-
throw new AmazonClientException("Client error when subscribing.", e);
1232+
if(subscriptionStatusCallback != null) {
1233+
subscriptionStatusCallback.onFailure(e);
1234+
} else {
1235+
throw new AmazonClientException("Client error when subscribing.", e);
1236+
}
12021237
}
12031238
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
12041239
topicListeners.put(topic, topicModel);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at:
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
11+
* OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
* License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package com.amazonaws.mobileconnectors.iot;
17+
18+
/**
19+
* Enables an application to be notified of status of call to subscribe to a topic.
20+
*/
21+
public interface AWSIotMqttSubscriptionStatusCallback {
22+
/**
23+
* This method is invoked when a susbcription has completed successfully.
24+
*/
25+
void onSuccess();
26+
27+
/**
28+
* This method is invoked when subscription fails.
29+
* If a client is disconnected while an subscription is in progress
30+
* onFailure will be called.
31+
*/
32+
void onFailure(Throwable exception);
33+
}

aws-android-sdk-iot/src/test/java/com/amazonaws/mobileconnectors/iot/AWSIotMqttManagerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,37 @@ public void testDisconnectException() throws Exception {
996996
testClient.disconnect();
997997
}
998998

999+
@Test
1000+
public void testSubscribeToTopicWithSubscriptionCallback() throws Exception {
1001+
MockMqttClient mockClient = new MockMqttClient();
1002+
1003+
AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
1004+
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
1005+
testClient.setMqttClient(mockClient);
1006+
1007+
KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
1008+
KEYSTORE_NAME, KEYSTORE_PASSWORD);
1009+
testClient.connect(testKeystore, null);
1010+
1011+
TestNewMessageCallback mcb = new TestNewMessageCallback();
1012+
1013+
TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();
1014+
1015+
testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);
1016+
1017+
assertEquals(1, mockClient.subscribeCalls);
1018+
assertTrue(sscb.subscribed);
1019+
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
1020+
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));
1021+
1022+
MqttMessage msg = new MqttMessage();
1023+
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
1024+
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
1025+
1026+
assertEquals(1, mcb.receivedMessages.size());
1027+
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
1028+
}
1029+
9991030
@Test
10001031
public void testSubscribeToTopic() throws Exception {
10011032
MockMqttClient mockClient = new MockMqttClient();
@@ -2951,6 +2982,20 @@ public void onMessageArrived(String topic, byte[] data) {
29512982
}
29522983
}
29532984

2985+
private class TestAWSIotMqttSubscriptionStatusCallback implements AWSIotMqttSubscriptionStatusCallback {
2986+
boolean subscribed;
2987+
2988+
@Override
2989+
public void onSuccess(){
2990+
subscribed = true;
2991+
}
2992+
2993+
@Override
2994+
public void onFailure(Throwable exception){
2995+
subscribed = false;
2996+
}
2997+
}
2998+
29542999
/**
29553000
* Test Publish Status Callback
29563001
*/

aws-android-sdk-iot/src/test/java/com/amazonaws/mobileconnectors/iot/MockMqttClient.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class MockMqttClient extends MqttAsyncClient {
2121
public boolean isConnected;
2222
public MqttCallback mockCallback;
2323
public IMqttActionListener mockConnectionStatusCallback;
24+
public IMqttActionListener mockSubscriptionStatusCallback;
2425
public boolean throwsExceptionOnConnect;
2526
public MqttException connectException;
2627
public boolean throwsExceptionOnPublish;
@@ -102,6 +103,18 @@ public IMqttToken subscribe(String topicFilter, int qos) throws MqttException {
102103
return testToken;
103104
}
104105

106+
public IMqttToken subscribe(String topicFilter, int qos, Object userContext,
107+
IMqttActionListener callback) throws MqttException {
108+
if (throwsExceptionOnSubscribe) {
109+
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);
110+
}
111+
++subscribeCalls;
112+
mockSubscriptionStatusCallback = callback;
113+
mockSubscriptions.put(topicFilter, qos);
114+
callback.onSuccess(testToken);
115+
return testToken;
116+
}
117+
105118
public IMqttToken unsubscribe(String topicFilter) throws MqttException {
106119
if (throwsExceptionOnUnsubscribe) {
107120
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);

0 commit comments

Comments
 (0)