1919#import " AWSMQTTEncoder.h"
2020#import " AWSMQttTxFlow.h"
2121#import " AWSIoTMessage.h"
22+ #import " AWSMQTTTimerRing.h"
2223#import " AWSIoTMessage+AWSMQTTMessage.h"
2324
2425@interface AWSMQTTSession () <AWSMQTTDecoderDelegate,AWSMQTTEncoderDelegate> {
@@ -58,7 +59,7 @@ - (void)send:(AWSMQTTMessage*)msg;
5859- (UInt16)nextMsgId ;
5960
6061@property (strong ,atomic ) NSMutableArray * queue; // Queue to temporarily hold messages if encoder is busy sending another message
61- @property (strong ,atomic ) NSMutableArray * timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
62+ @property (strong ,atomic ) AWSMQTTTimerRing * timerRing; // A collection of messages that need to be retried.
6263@property (nonatomic , strong ) dispatch_queue_t drainSenderSerialQueue;
6364@property (nonatomic , strong ) AWSMQTTEncoder* encoder; // Low level protocol handler that converts a message into out bound network data
6465@property (nonatomic , strong ) AWSMQTTDecoder* decoder; // Low level protocol handler that converts in bound network data into a Message
@@ -103,11 +104,7 @@ - (id)initWithClientId:(NSString*)theClientId
103104 txMsgId = 1 ;
104105 txFlows = [[NSMutableDictionary alloc ] init ];
105106 rxFlows = [[NSMutableDictionary alloc ] init ];
106- self.timerRing = [[NSMutableArray alloc ] initWithCapacity: 60 ];
107- int i;
108- for (i = 0 ; i < 60 ; i++) {
109- [self .timerRing addObject: [NSMutableSet new ]];
110- }
107+ self.timerRing = [[AWSMQTTTimerRing alloc ] init ];
111108 serialQueue = dispatch_queue_create (" com.amazon.aws.iot.test-queue" , DISPATCH_QUEUE_SERIAL);
112109 ticks = 0 ;
113110 status = AWSMQTTSessionStatusCreated;
@@ -233,7 +230,7 @@ - (UInt16)publishDataAtLeastOnce:(NSData*)data
233230 AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg: msg
234231 deadline: deadline];
235232 [txFlows setObject: flow forKey: [NSNumber numberWithUnsignedInt: msgId]];
236- [[ self .timerRing objectAtIndex: ([flow deadline ] % 60 )] addObject: [ NSNumber numberWithUnsignedInt: msgId ]];
233+ [self .timerRing addMsgId: [ NSNumber numberWithUnsignedInt: msgId] atTick: [flow deadline ]];
237234 AWSDDLogDebug (@" Published message %hu for QOS 1" , msgId);
238235 [self send: msg];
239236 return msgId;
@@ -267,7 +264,7 @@ - (UInt16)publishDataExactlyOnce:(NSData*)data
267264 AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg: msg
268265 deadline: (ticks + 60 )];
269266 [txFlows setObject: flow forKey: [NSNumber numberWithUnsignedInt: msgId]];
270- [[ self .timerRing objectAtIndex: ([flow deadline ] % 60 )] addObject: [ NSNumber numberWithUnsignedInt: msgId ]];
267+ [self .timerRing addMsgId: [ NSNumber numberWithUnsignedInt: msgId] atTick: [flow deadline ]];
271268 [self send: msg];
272269 return msgId;
273270}
@@ -299,7 +296,7 @@ - (void)timerHandler:(NSTimer*)theTimer {
299296 dispatch_sync (serialQueue, ^{
300297 ticks++;
301298 });
302- NSEnumerator *e = [[[ self .timerRing objectAtIndex: ( ticks % 60 )] allObjects ] objectEnumerator ];
299+ NSEnumerator *e = [[self .timerRing allMsgIdsAtTick: ticks] objectEnumerator ];
303300 id msgId;
304301
305302 // Stay under the throttle here and move the work to the next tick if throttle is breached.
@@ -321,8 +318,8 @@ - (void)timerHandler:(NSTimer*)theTimer {
321318 while ((msgId = [e nextObject ])) {
322319 AWSMQttTxFlow *flow = [txFlows objectForKey: msgId];
323320 [flow setDeadline: ((ticks +1 ) % 60 )];
324- [[ self .timerRing objectAtIndex: (( ticks + 1 ) % 60 )] addObject: msgId ];
325- [[ self .timerRing objectAtIndex: (ticks % 60 )] removeObject: msgId ];
321+ [self .timerRing addMsgId: msgId atTick: ( ticks + 1 )];
322+ [self .timerRing removeMsgId: msgId atTick: ticks ];
326323 }
327324
328325 if (count > 0 ) {
@@ -567,8 +564,8 @@ - (void)handlePuback:(AWSMQTTMessage*)msg {
567564 if ([[flow msg ] type ] != AWSMQTTPublish || [[flow msg ] qos ] != 1 ) {
568565 return ;
569566 }
570-
571- [[ self .timerRing objectAtIndex: ( [flow deadline ] % 60 )] removeObject: msgId ];
567+
568+ [self .timerRing removeMsgId: msgId atTick: [flow deadline ]];
572569 [txFlows removeObjectForKey: msgId];
573570 AWSDDLogDebug (@" Removing msgID %@ from internal store for QOS1 guarantee" , msgId);
574571 [self .delegate session: self newAckForMessageId: msgId.unsignedShortValue];
@@ -594,10 +591,10 @@ - (void)handlePubrec:(AWSMQTTMessage*)msg {
594591 }
595592 msg = [AWSMQTTMessage pubrelMessageWithMessageId: [msgId unsignedIntValue ]];
596593 [flow setMsg: msg];
597- [[ self .timerRing objectAtIndex: ( [flow deadline ] % 60 )] removeObject: msgId ];
594+ [self .timerRing removeMsgId: msgId atTick: [flow deadline ]];
598595 [flow setDeadline: (ticks + 60 )];
599- [[ self .timerRing objectAtIndex: ( [flow deadline ] % 60 )] addObject: msgId ];
600-
596+ [self .timerRing addMsgId: msgId atTick: [flow deadline ]];
597+
601598 [self send: msg];
602599}
603600
@@ -638,8 +635,8 @@ - (void)handlePubcomp:(AWSMQTTMessage*)msg {
638635 if (flow == nil || [[flow msg ] type ] != AWSMQTTPubrel) {
639636 return ;
640637 }
641-
642- [[ self .timerRing objectAtIndex: ( [flow deadline ] % 60 )] removeObject: msgId ];
638+
639+ [self .timerRing removeMsgId: msgId atTick: [flow deadline ]];
643640 [txFlows removeObjectForKey: msgId];
644641
645642 AWSDDLogDebug (@" Removing msgID %@ from internal store for QOS2 guarantee" , msgId);
0 commit comments