4545import java .security .NoSuchAlgorithmException ;
4646import java .security .UnrecoverableKeyException ;
4747import java .util .concurrent .ConcurrentHashMap ;
48- import java .util .LinkedList ;
49- import java .util .List ;
48+ import java .util .concurrent .ConcurrentLinkedQueue ;
5049import java .util .Map ;
5150
5251import javax .net .SocketFactory ;
@@ -127,7 +126,7 @@ public class AWSIotMqttManager {
127126 * Queue for messages attempted to publish while MQTT client was offline.
128127 * Republished upon reconnect.
129128 */
130- private final List <AWSIotMqttQueueMessage > mqttMessageQueue ;
129+ private final ConcurrentLinkedQueue <AWSIotMqttQueueMessage > mqttMessageQueue ;
131130 /** KeepAlive interval specified by the user. */
132131 private int userKeepAlive ;
133132 /** MQTT Will parameters. */
@@ -488,7 +487,7 @@ void setMqttClient(MqttAsyncClient client) {
488487 *
489488 * @return offline message queue.
490489 */
491- List <AWSIotMqttQueueMessage > getMqttMessageQueue () {
490+ ConcurrentLinkedQueue <AWSIotMqttQueueMessage > getMqttMessageQueue () {
492491 return mqttMessageQueue ;
493492 }
494493
@@ -554,7 +553,7 @@ public AWSIotMqttManager(String mqttClientId, String endpoint) {
554553 }
555554
556555 this .topicListeners = new ConcurrentHashMap <String , AWSIotMqttTopic >();
557- this .mqttMessageQueue = new LinkedList <AWSIotMqttQueueMessage >();
556+ this .mqttMessageQueue = new ConcurrentLinkedQueue <AWSIotMqttQueueMessage >();
558557 this .accountEndpointPrefix = AwsIotEndpointUtility .getAccountPrefixFromEndpont (endpoint );
559558 this .mqttClientId = mqttClientId ;
560559 this .region = AwsIotEndpointUtility .getRegionFromIotEndpoint (endpoint );
@@ -585,7 +584,7 @@ public AWSIotMqttManager(String mqttClientId, Region region, String accountEndpo
585584 }
586585
587586 this .topicListeners = new ConcurrentHashMap <String , AWSIotMqttTopic >();
588- this .mqttMessageQueue = new LinkedList <AWSIotMqttQueueMessage >();
587+ this .mqttMessageQueue = new ConcurrentLinkedQueue <AWSIotMqttQueueMessage >();
589588
590589 this .accountEndpointPrefix = accountEndpointPrefix ;
591590 this .mqttClientId = mqttClientId ;
@@ -1232,9 +1231,8 @@ boolean putMessageInQueue(byte[] data, String topic, AWSIotMqttQos qos,
12321231 * Called to handle publishing messages accumulated in the message queue when the client was unable to publish.
12331232 */
12341233 void publishMessagesFromQueue () {
1235- if (connectionState == MqttManagerConnectionState .Connected && mqttMessageQueue != null
1236- && !mqttMessageQueue .isEmpty ()) {
1237- final AWSIotMqttQueueMessage message = mqttMessageQueue .remove (0 );
1234+ if (connectionState == MqttManagerConnectionState .Connected && mqttMessageQueue != null ) {
1235+ final AWSIotMqttQueueMessage message = mqttMessageQueue .poll ();
12381236 if (message != null ) {
12391237 try {
12401238 if (message .getUserData () != null && message .getUserData ().getUserCallback () != null ) {
@@ -1261,7 +1259,12 @@ void publishMessagesFromQueue() {
12611259 }
12621260 }
12631261
1264- (new Handler (Looper .getMainLooper ())).postDelayed (new Runnable () {
1262+ //Start a separate thread to publish remaining messages
1263+ //There is no need to run this on the main thread
1264+ final HandlerThread ht = new HandlerThread ("message queue thread" );
1265+ ht .start ();
1266+ Looper looper = ht .getLooper ();
1267+ (new Handler (looper )).postDelayed (new Runnable () {
12651268 @ Override
12661269 public void run () {
12671270 if (!mqttMessageQueue .isEmpty ()) {
0 commit comments