diff --git a/AWSIoT/Internal/AWSIoTMQTTClient.m b/AWSIoT/Internal/AWSIoTMQTTClient.m index 8cce64a6446..01800b5cfcd 100644 --- a/AWSIoT/Internal/AWSIoTMQTTClient.m +++ b/AWSIoT/Internal/AWSIoTMQTTClient.m @@ -37,6 +37,8 @@ @implementation AWSIoTMQTTQueueMessage @interface AWSIoTMQTTClient() +@property (atomic, assign) BOOL isBeingDeallocated; + @property(atomic, assign, readwrite) AWSIoTMQTTStatus mqttStatus; @property(nonatomic, strong) AWSMQTTSession* session; @property(nonatomic, strong) AWSIoTAtomicDictionary *topicListeners; @@ -61,7 +63,8 @@ @interface AWSIoTMQTTClient() )delegate { @@ -137,6 +179,8 @@ - (NSData *)getDerivedKeyForSecretKey:(NSString *)secretKey regionName:(NSString *)regionName serviceName:(NSString *)serviceName; { + if (self.isBeingDeallocated) return nil; + // AWS4 uses a series of derived keys, formed by hashing different pieces of data NSString *kSecret = [NSString stringWithFormat:@"AWS4%@", secretKey]; NSData *kDate = [AWSSignatureSignerUtility sha256HMacWithData:[dateStamp dataUsingEncoding:NSUTF8StringEncoding] @@ -164,6 +208,8 @@ - (NSString *)signWebSocketUrlForMethod:(NSString *)method now:(NSString *)now sessionKey:(NSString *)sessionKey; { + if (self.isBeingDeallocated) return nil; + NSString *payloadHash = [AWSSignatureSignerUtility hexEncode:[AWSSignatureSignerUtility hashString:payload]]; NSString *canonicalRequest = [NSString stringWithFormat:@"%@\n%@\n%@\nhost:%@\n\nhost\n%@", method, @@ -214,6 +260,7 @@ - (NSString *)prepareWebSocketUrlWithHostName:(NSString *)hostName secretKey:(NSString *)secretKey sessionKey:(NSString *)sessionKey { + if (self.isBeingDeallocated) return nil; NSDate *date = [NSDate aws_clockSkewFixedDate]; NSString *now = [date aws_stringValue:AWSDateISO8601DateFormat2]; NSString *today = [date aws_stringValue:AWSDateShortDateFormat1]; @@ -257,6 +304,7 @@ - (BOOL)connectWithClientId:(NSString*)clientId willQoS:(UInt8)willQoS willRetainFlag:(BOOL)willRetainFlag statusCallback:(void (^)(AWSIoTMQTTStatus status))callback { + if (self.isBeingDeallocated) return NO; if (self.userDidIssueConnect ) { //Issuing connect multiple times. Not allowed. @@ -292,6 +340,8 @@ - (BOOL)connectWithClientId:(NSString*)clientId } - (BOOL) connectWithCert { + if (self.isBeingDeallocated) return NO; + self.mqttStatus = AWSIoTMQTTStatusConnecting; if (self.cleanSession) { @@ -356,9 +406,9 @@ - (BOOL) connectWithCert { &kCFTypeDictionaryKeyCallBacks, &kCFTypeDictionaryValueCallBacks); } - CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, sslSettings); - CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, sslSettings); if (sslSettings) { + CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, sslSettings); + CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, sslSettings); CFRelease(sslSettings); } @@ -368,15 +418,20 @@ - (BOOL) connectWithCert { if (@available(iOS 11.0, *)) { //Get the SSL Context SSLContextRef context = (__bridge SSLContextRef) [inputStream propertyForKey: (__bridge NSString *) kCFStreamPropertySSLContext ]; - - //Set ALPN protocol list - CFStringRef strs[1]; - strs[0] = CFSTR("x-amzn-mqtt-ca"); - CFArrayRef protocols = CFArrayCreate(NULL, (void *)strs, 1, &kCFTypeArrayCallBacks); - - SSLSetALPNProtocols(context, protocols); - if (protocols) { - CFRelease(protocols); + if (context != NULL) { + //Set ALPN protocol list + CFStringRef strs[1]; + strs[0] = CFSTR("x-amzn-mqtt-ca"); + CFArrayRef protocols = CFArrayCreate(NULL, (void *)strs, 1, &kCFTypeArrayCallBacks); + if (protocols != NULL) { + OSStatus alpnStatus = SSLSetALPNProtocols(context, protocols); + if (alpnStatus != noErr) { + AWSDDLogWarn(@"SSLSetALPNProtocols failed with status: %d", (int)alpnStatus); + } + CFRelease(protocols); + } + } else { + AWSDDLogWarn(@"kCFStreamPropertySSLContext returned NULL; skipping ALPN setup"); } } } @@ -405,6 +460,8 @@ - (BOOL) connectWithClientId:(NSString *)clientId willRetainFlag:(BOOL)willRetainFlag statusCallback:(void (^)(AWSIoTMQTTStatus status))callback; { + if (self.isBeingDeallocated) return NO; + if (self.userDidIssueConnect ) { //Issuing connect multiple times. Not allowed. return NO; @@ -430,6 +487,8 @@ - (BOOL) connectWithClientId:(NSString *)clientId - (BOOL)connectWithClientId:(NSString *)clientId presignedURL:(NSString *)presignedURL statusCallback:(void (^)(AWSIoTMQTTStatus status))callback { + if (self.isBeingDeallocated) return NO; + if (clientId != nil && presignedURL != nil) { // currently using the last given URL on subscribe call self.presignedURL = presignedURL; @@ -465,6 +524,8 @@ - (BOOL)connectWithClientId:(NSString *)clientId willQoS:(UInt8)willQoS willRetainFlag:(BOOL)willRetainFlag statusCallback:(void (^)(AWSIoTMQTTStatus status))callback { + if (self.isBeingDeallocated) return NO; + if (self.userDidIssueConnect ) { // Issuing connect multiple times. Not allowed. return NO; @@ -494,9 +555,11 @@ - (BOOL)connectWithClientId:(NSString *)clientId } - (BOOL) webSocketConnectWithClientId { + if (self.isBeingDeallocated) return NO; AWSDDLogInfo(@"AWSIoTMQTTClient: connecting via websocket. "); if (self.webSocket) { + self.webSocket.delegate = nil; [self.webSocket close]; self.webSocket = nil; } @@ -526,13 +589,9 @@ - (BOOL) webSocketConnectWithClientId { //Get Credentials from credentials provider. [[self.configuration.credentialsProvider credentials] continueWithBlock:^id _Nullable(AWSTask * _Nonnull task) { - //If an error occured when trying to get credentials, setup a timer to retry the connection after self.currentReconnectTime seconds and schedule it on the reconnect Thread. + //If an error occured when trying to get credentials, schedule a GCD reconnect. if (task.error) { - @synchronized(self) { - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:self]; - [self.reconnectThread start]; - } - + [self scheduleReconnectGCD]; AWSDDLogError(@"Unable to connect to MQTT due to an error fetching credentials from the Credentials Provider. Will try again in %f seconds", self.currentReconnectTime); return nil; } @@ -556,6 +615,8 @@ - (BOOL) webSocketConnectWithClientId { } - (void)initWebSocketConnectionForURL:(NSString *)urlString { + if (self.isBeingDeallocated) return; + // Set status to "Connecting" self.mqttStatus = AWSIoTMQTTStatusConnecting; @@ -592,18 +653,9 @@ - (void)initWebSocketConnectionForURL:(NSString *)urlString { // Add the HTTP headers for custom authorizer name, token value and signature if (self.customAuthorizerName != nil) { - // Add IoT custom authorizer headers to the request - // x-amz-customauthorizer-name: - [urlRequest addValue:self.customAuthorizerName - forHTTPHeaderField:@"x-amz-customauthorizer-name"]; - - // : - [urlRequest addValue:self.tokenValue - forHTTPHeaderField:self.tokenKeyName]; - - // x-amz-customauthorizer-signature: - [urlRequest addValue:self.tokenSignature - forHTTPHeaderField:@"x-amz-customauthorizer-signature"]; + [urlRequest addValue:self.customAuthorizerName forHTTPHeaderField:@"x-amz-customauthorizer-name"]; + [urlRequest addValue:self.tokenValue forHTTPHeaderField:self.tokenKeyName]; + [urlRequest addValue:self.tokenSignature forHTTPHeaderField:@"x-amz-customauthorizer-signature"]; } //Create the webSocket and setup the MQTTClient object as the delegate @@ -615,13 +667,13 @@ - (void)initWebSocketConnectionForURL:(NSString *)urlString { //Open the web socket [self.webSocket open]; - // Now that the WebSocket is created and opened, it will send its delegate, i.e., this MQTTclient object the messages. AWSDDLogVerbose(@"Websocket is created and opened."); } - (void)disconnect { + if (self.isBeingDeallocated) return; + if (self.userDidIssueDisconnect ) { - //Issuing disconnect multiple times. Turn this function into a noop by returning here. return; } @@ -647,24 +699,21 @@ - (void)disconnect { __weak AWSIoTMQTTClient *weakSelf = self; self.streamsThread.onStop = ^{ __strong AWSIoTMQTTClient *strongSelf = weakSelf; - //If the userDidIssueDisconnect has been set to NO, it means a new connection has been requested, - //so we should disregard these updates if (!strongSelf || !strongSelf.userDidIssueDisconnect) { return; } - //Invalidate connection age timer and close socket if (strongSelf.connectionAgeTimer != nil) { [strongSelf.connectionAgeTimer invalidate]; strongSelf.connectionAgeTimer = nil; } if (strongSelf.webSocket) { + strongSelf.webSocket.delegate = nil; [strongSelf.webSocket close]; strongSelf.webSocket = nil; } - //Notify disconnected status. strongSelf.mqttStatus = AWSIoTMQTTStatusDisconnected; [strongSelf notifyConnectionStatus]; }; @@ -673,55 +722,33 @@ - (void)disconnect { } /** - Invalidates and removes reference to the reconnect timer on the correct thread to avoid - creating a memory leak. - - @discussion If called on any thread other than the reconnect thread the work is queued up on - the reconnect thread but the method returns without waiting for the invalidation to be completed. - This is called initially on the thread the consumer is calling the client's disconnect method on. + Cleanup reconnect timer source safely. */ - (void)cleanupReconnectTimer { - if (self.reconnectTimer == nil) { - return; - } - - if (self.reconnectThread) { - if (!self.reconnectThread.isFinished && ![[NSThread currentThread] isEqual:self.reconnectThread]) { - // Move to reconnect thread to cleanup only if it's still running - [self performSelector:@selector(cleanupReconnectTimer) - onThread:self.reconnectThread - withObject:nil - waitUntilDone:NO]; - return; - } - - [self invalidateReconnectTimer]; - } -} - -- (void)invalidateReconnectTimer { - __weak AWSIoTMQTTClient *weakSelf = self; + if (self.isBeingDeallocated) return; dispatch_async(self.timerQueue, ^{ - [weakSelf.reconnectTimer invalidate]; - weakSelf.reconnectTimer = nil; + if (self.reconnectTimerSource) { + dispatch_source_cancel(self.reconnectTimerSource); + self.reconnectTimerSource = nil; + } }); } - (void)cleanUpWebsocketOutputStream { + if (self.isBeingDeallocated) return; + @synchronized(self) { if (self.websocketOutputStream) { self.websocketOutputStream.delegate = nil; [self.websocketOutputStream close]; - [self.websocketOutputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode]; self.websocketOutputStream = nil; } } } - (void)reconnectToSession { + if (self.isBeingDeallocated) return; - self.reconnectTimer = nil; - //Check if the user has issued a disconnect. If so, don't retry. if (self.userDidIssueDisconnect ) { return; @@ -734,10 +761,7 @@ - (void)reconnectToSession { AWSDDLogInfo(@"Attempting to reconnect."); - // Double the reconnect time which will be used on the next reconnection if this one fails to connect. - // Note that there is a maximum reconnection time beyond which - // it can no longer increase, and that the base (default) reconnection time will - // be restored once the connection remains up for the minimum connection time. + // Exponential backoff self.currentReconnectTime *= 2; if (self.currentReconnectTime > self.maximumReconnectTime) { self.currentReconnectTime = self.maximumReconnectTime; @@ -757,12 +781,14 @@ - (void)reconnectToSession { } - (void)notifyConnectionStatus { - //Set the connection status on the callback. AWSIoTMQTTStatus mqttStatus = self.mqttStatus; __weak AWSIoTMQTTClient *weakSelf = self; __weak StatusCallback connectStatusCallback = weakSelf.connectStatusCallback; __weak id clientDelegate = weakSelf.clientDelegate; - dispatch_barrier_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + __strong AWSIoTMQTTClient *strongSelf = weakSelf; + if (!strongSelf) return; + __strong StatusCallback callback = connectStatusCallback; if (callback != nil) { callback(mqttStatus); @@ -777,19 +803,11 @@ - (void)notifyConnectionStatus { - (void)initiateReconnectTimer: (id) sender { + if (self.isBeingDeallocated) return; if (_userDidIssueDisconnect ) { return; } - - //Make sure that only one thread can setup the timer at one time. - //Set the timeout to 1800 seconds, which is 1.5x of the max keep-alive 1200 seconds. - //The unit of measure for the dispatch_time function is nano seconds. - - dispatch_assert_queue_not(self.timerQueue); - __weak AWSIoTMQTTClient *weakSelf = self; - dispatch_async(self.timerQueue, ^{ - [weakSelf scheduleReconnection]; - }); + [self scheduleReconnectGCD]; } #pragma mark - publish methods - @@ -797,11 +815,12 @@ - (void)initiateReconnectTimer: (id) sender - (void)publishString:(NSString*)str onTopic:(NSString*)topic ackCallback:(AWSIoTMQTTAckBlock)ackCallBack { + if (self.isBeingDeallocated) return; [self publishData:[str dataUsingEncoding:NSUTF8StringEncoding] onTopic:topic]; - } - (void)publishString:(NSString*)str onTopic:(NSString*)topic { + if (self.isBeingDeallocated) return; [self publishData:[str dataUsingEncoding:NSUTF8StringEncoding] onTopic:topic]; } @@ -809,9 +828,11 @@ - (void)publishString:(NSString*)str qos:(UInt8)qos onTopic:(NSString*)topic ackCallback:(AWSIoTMQTTAckBlock)ackCallback { + if (self.isBeingDeallocated) return; + if (qos == 0 && ackCallback != nil) { - [NSException raise:NSInvalidArgumentException - format:@"Cannot specify `ackCallback` block for QoS = 0."]; + AWSDDLogError(@"Cannot specify `ackCallback` block for QoS = 0."); + return; } [self publishData:[str dataUsingEncoding:NSUTF8StringEncoding] qos:qos @@ -820,17 +841,24 @@ - (void)publishString:(NSString*)str } - (void)publishString:(NSString*)str qos:(UInt8)qos onTopic:(NSString*)topic { + if (self.isBeingDeallocated) return; [self publishData:[str dataUsingEncoding:NSUTF8StringEncoding] qos:qos onTopic:topic]; } - (void)publishData:(NSData*)data onTopic:(NSString*)topic { + if (self.isBeingDeallocated) return; + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"publishData called while not connected; ignoring"); + return; + } [self.session publishData:data onTopic:topic]; } - (void)publishData:(NSData *)data qos:(UInt8)qos onTopic:(NSString *)topic { + if (self.isBeingDeallocated) return; [self publishData:data qos:qos onTopic:topic ackCallback:nil]; } @@ -838,6 +866,7 @@ - (void)publishData:(NSData*)data qos:(UInt8)qos onTopic:(NSString*)topic ackCallback:(nullable AWSIoTMQTTAckBlock)ackCallback { + if (self.isBeingDeallocated) return; [self publishData:data qos:qos onTopic:topic retain:NO ackCallback:ackCallback]; } @@ -846,14 +875,11 @@ - (void)publishData:(NSData*)data onTopic:(NSString*)topic retain:(BOOL)retain ackCallback:(nullable AWSIoTMQTTAckBlock)ackCallback { - if (!_userDidIssueConnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call publish before connecting to the server"]; - } + if (self.isBeingDeallocated) return; - if (_userDidIssueDisconnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call publish after disconnecting from the server"]; + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"Cannot call publish when not connected."); + return; } if (qos < 0 || qos > 2) { @@ -861,8 +887,8 @@ - (void)publishData:(NSData*)data return; } if (qos == AWSIoTMQTTQoSMessageDeliveryAttemptedAtMostOnce && ackCallback != nil) { - [NSException raise:NSInvalidArgumentException - format:@"Cannot specify `ackCallback` block for QoS = 0."]; + AWSDDLogError(@"Cannot specify `ackCallback` block for QoS = 0."); + return; } AWSDDLogVerbose(@"isReadyToPublish: %i",[self.session isReadyToPublish]); @@ -888,24 +914,21 @@ - (void)publishData:(NSData*)data #pragma mark - subscribe methods - - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos messageCallback:(AWSIoTMQTTNewMessageBlock)callback { + if (self.isBeingDeallocated) return; [self subscribeToTopic:topic qos:qos messageCallback:callback ackCallback:nil]; - } - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos messageCallback:(AWSIoTMQTTNewMessageBlock)callback ackCallback:(AWSIoTMQTTAckBlock)ackCallback { - if (!_userDidIssueConnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe before connecting to the server"]; - } + if (self.isBeingDeallocated) return; - if (_userDidIssueDisconnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe after disconnecting from the server"]; + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"Cannot call subscribe before connecting to the server"); + return; } AWSDDLogInfo(@"Subscribing to topic %@ with messageCallback", topic); AWSIoTMQTTTopicModel *topicModel = [AWSIoTMQTTTopicModel new]; @@ -919,6 +942,7 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos extendedCallback:(AWSIoTMQTTExtendedNewMessageBlock)callback { + if (self.isBeingDeallocated) return; [self subscribeToTopic:topic qos:qos extendedCallback:callback @@ -929,14 +953,11 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos extendedCallback:(AWSIoTMQTTExtendedNewMessageBlock)callback ackCallback:(AWSIoTMQTTAckBlock)ackCallback{ - if (!_userDidIssueConnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe before connecting to the server"]; - } + if (self.isBeingDeallocated) return; - if (_userDidIssueDisconnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe after disconnecting from the server"]; + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"Cannot call subscribe before connecting to the server"); + return; } AWSDDLogInfo(@"Subscribing to topic %@ with ExtendedmessageCallback", topic); @@ -951,6 +972,7 @@ - (void)subscribeToTopic:(NSString*)topic - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos fullCallback:(AWSIoTMQTTFullMessageBlock)callback { + if (self.isBeingDeallocated) return; [self subscribeToTopic:topic qos:qos fullCallback:callback @@ -961,14 +983,11 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos fullCallback:(AWSIoTMQTTFullMessageBlock)callback ackCallback:(AWSIoTMQTTAckBlock)ackCallback { - if (!_userDidIssueConnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe before connecting to the server"]; - } - - if (_userDidIssueDisconnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call subscribe after disconnecting from the server"]; + if (self.isBeingDeallocated) return; + + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"Cannot call subscribe before connecting to the server"); + return; } AWSDDLogInfo(@"Subscribing to topic %@ with ExtendedmessageCallback", topic); @@ -983,6 +1002,8 @@ - (void)subscribeToTopic:(NSString*)topic // Private - (void)subscribeWithTopicModel:(AWSIoTMQTTTopicModel *)topicModel ackCallback:(AWSIoTMQTTAckBlock)ackCallback { + if (self.isBeingDeallocated) return; + [self.topicListeners setObject:topicModel forKey:topicModel.topic]; UInt16 messageId = [self.session subscribeToTopic:topicModel.topic atLevel:topicModel.qos]; @@ -995,14 +1016,11 @@ - (void)subscribeWithTopicModel:(AWSIoTMQTTTopicModel *)topicModel - (void)unsubscribeTopic:(NSString*)topic ackCallback:(AWSIoTMQTTAckBlock)ackCallback { - if (!_userDidIssueConnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call unsubscribe before connecting to the server"]; - } + if (self.isBeingDeallocated) return; - if (_userDidIssueDisconnect) { - [NSException raise:NSInternalInconsistencyException - format:@"Cannot call unsubscribe after disconnecting from the server"]; + if (!_userDidIssueConnect || _userDidIssueDisconnect) { + AWSDDLogError(@"Cannot call unsubscribe before connecting to the server"); + return; } AWSDDLogInfo(@"Unsubscribing from topic %@", topic); UInt16 messageId = [self.session unsubscribeTopic:topic]; @@ -1014,117 +1032,118 @@ - (void)unsubscribeTopic:(NSString*)topic } - (void)unsubscribeTopic:(NSString*)topic { + if (self.isBeingDeallocated) return; [self unsubscribeTopic:topic ackCallback:nil]; } #pragma mark - MQTTSessionDelegate - - (void)connectionAgeTimerHandler:(NSTimer*)theTimer { - self.connectionAgeInSeconds++; - AWSDDLogVerbose(@"Connection Age: %ld", (long)self.connectionAgeInSeconds); - if (self.connectionAgeInSeconds >= self.minimumConnectionTime) { - AWSDDLogVerbose(@"Connection Age threshold reached. Resetting reconnect time to [%fs]", self.baseReconnectTime); - self.currentReconnectTime = self.baseReconnectTime; - [theTimer invalidate]; - } + __weak AWSIoTMQTTClient *weakSelf = self; + dispatch_async(dispatch_get_main_queue(), ^{ + __strong AWSIoTMQTTClient *strongSelf = weakSelf; + if (!strongSelf || !theTimer.isValid) return; + + strongSelf.connectionAgeInSeconds++; + AWSDDLogVerbose(@"Connection Age: %ld", (long)strongSelf.connectionAgeInSeconds); + if (strongSelf.connectionAgeInSeconds >= strongSelf.minimumConnectionTime) { + AWSDDLogVerbose(@"Connection Age threshold reached. Resetting reconnect time to [%fs]", strongSelf.baseReconnectTime); + strongSelf.currentReconnectTime = strongSelf.baseReconnectTime; + [theTimer invalidate]; + } + }); } - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventCode { - AWSDDLogVerbose(@"MQTTSessionDelegate handleEvent: %i",eventCode); - - switch (eventCode) { - case AWSMQTTSessionEventConnected: - AWSDDLogInfo(@"MQTT session connected."); - self.mqttStatus = AWSIoTMQTTStatusConnected; - [self notifyConnectionStatus]; - - if (self.connectionAgeTimer != nil) { - [self.connectionAgeTimer invalidate]; - } - self.connectionAgeTimer = [ NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(connectionAgeTimerHandler:) userInfo:nil repeats:YES]; - - //Subscribe to prior topics - if (_autoResubscribe) { - AWSDDLogInfo(@"Auto-resubscribe is enabled. Resubscribing to topics."); - for (AWSIoTMQTTTopicModel *topic in self.topicListeners.allValues) { - [self.session subscribeToTopic:topic.topic atLevel:topic.qos]; + __weak AWSIoTMQTTClient *weakSelf = self; + dispatch_async(dispatch_get_main_queue(), ^{ + __strong AWSIoTMQTTClient *strongSelf = weakSelf; + if (!strongSelf) return; + + AWSDDLogVerbose(@"MQTTSessionDelegate handleEvent: %i",eventCode); + + switch (eventCode) { + case AWSMQTTSessionEventConnected: { + AWSDDLogInfo(@"MQTT session connected."); + strongSelf.mqttStatus = AWSIoTMQTTStatusConnected; + [strongSelf notifyConnectionStatus]; + + if (strongSelf.connectionAgeTimer != nil) { + [strongSelf.connectionAgeTimer invalidate]; } - } - break; - - case AWSMQTTSessionEventConnectionRefused: - AWSDDLogWarn(@"MQTT session refused."); - self.mqttStatus = AWSIoTMQTTStatusConnectionRefused; - [self notifyConnectionStatus]; - break; - case AWSMQTTSessionEventConnectionClosed: - AWSDDLogInfo(@"MQTTSessionEventConnectionClosed: MQTT session closed."); - - self.connectionAgeInSeconds = 0; - if (self.connectionAgeTimer != nil ) { - [self.connectionAgeTimer invalidate]; - self.connectionAgeTimer = nil; - } + __weak AWSIoTMQTTClient *weakSelf2 = strongSelf; + strongSelf.connectionAgeTimer = [NSTimer scheduledTimerWithTimeInterval:1.0 repeats:YES block:^(NSTimer *timer) { + __strong AWSIoTMQTTClient *strongSelf2 = weakSelf2; + if (!strongSelf2) { + [timer invalidate]; + return; + } + [strongSelf2 connectionAgeTimerHandler:timer]; + }]; - //Check if user issued a disconnect - if (self.userDidIssueDisconnect ) { - //Clear all session state here. - [self.topicListeners removeAllObjects]; - self.mqttStatus = AWSIoTMQTTStatusDisconnected; - [self notifyConnectionStatus]; - } - else { - //Connection was closed unexpectedly. - - //Notify - self.mqttStatus = AWSIoTMQTTStatusConnectionError; - [self notifyConnectionStatus]; - - //Retry - @synchronized(self) { - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:self]; - [self.reconnectThread start]; + if (strongSelf->_autoResubscribe) { + AWSDDLogInfo(@"Auto-resubscribe is enabled. Resubscribing to topics."); + for (AWSIoTMQTTTopicModel *topic in strongSelf.topicListeners.allValues) { + [strongSelf.session subscribeToTopic:topic.topic atLevel:topic.qos]; + } } + break; } - break; - case AWSMQTTSessionEventConnectionError: - AWSDDLogError(@"MQTTSessionEventConnectionError: Received an MQTT session connection error"); - - self.connectionAgeInSeconds = 0; - if (self.connectionAgeTimer != nil ) { - [self.connectionAgeTimer invalidate]; - self.connectionAgeTimer = nil; - } - if (self.userDidIssueDisconnect ) { - //Clear all session state here. - [self.topicListeners removeAllObjects]; - self.mqttStatus = AWSIoTMQTTStatusDisconnected; - [self notifyConnectionStatus]; - } - else { - //Connection errored out unexpectedly. - - //Notify - self.mqttStatus = AWSIoTMQTTStatusConnectionError; - [self notifyConnectionStatus]; - - //Retry - @synchronized(self) { - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:self]; - [self.reconnectThread start]; + case AWSMQTTSessionEventConnectionRefused: + AWSDDLogWarn(@"MQTT session refused."); + strongSelf.mqttStatus = AWSIoTMQTTStatusConnectionRefused; + [strongSelf notifyConnectionStatus]; + break; + case AWSMQTTSessionEventConnectionClosed: + AWSDDLogInfo(@"MQTTSessionEventConnectionClosed: MQTT session closed."); + + strongSelf.connectionAgeInSeconds = 0; + if (strongSelf.connectionAgeTimer != nil ) { + [strongSelf.connectionAgeTimer invalidate]; + strongSelf.connectionAgeTimer = nil; } - } - break; - case AWSMQTTSessionEventProtocolError: - AWSDDLogError(@"MQTT session protocol error"); - self.mqttStatus = AWSIoTMQTTStatusProtocolError; - [self notifyConnectionStatus]; - AWSDDLogError(@"Disconnecting."); - [self disconnect]; - break; - default: - break; - } + + if (strongSelf.userDidIssueDisconnect ) { + [strongSelf.topicListeners removeAllObjects]; + strongSelf.mqttStatus = AWSIoTMQTTStatusDisconnected; + [strongSelf notifyConnectionStatus]; + } + else { + strongSelf.mqttStatus = AWSIoTMQTTStatusConnectionError; + [strongSelf notifyConnectionStatus]; + [strongSelf scheduleReconnectGCD]; + } + break; + case AWSMQTTSessionEventConnectionError: + AWSDDLogError(@"MQTTSessionEventConnectionError: Received an MQTT session connection error"); + + strongSelf.connectionAgeInSeconds = 0; + if (strongSelf.connectionAgeTimer != nil ) { + [strongSelf.connectionAgeTimer invalidate]; + strongSelf.connectionAgeTimer = nil; + } + if (strongSelf.userDidIssueDisconnect ) { + [strongSelf.topicListeners removeAllObjects]; + strongSelf.mqttStatus = AWSIoTMQTTStatusDisconnected; + [strongSelf notifyConnectionStatus]; + } + else { + strongSelf.mqttStatus = AWSIoTMQTTStatusConnectionError; + [strongSelf notifyConnectionStatus]; + [strongSelf scheduleReconnectGCD]; + } + break; + case AWSMQTTSessionEventProtocolError: + AWSDDLogError(@"MQTT session protocol error"); + strongSelf.mqttStatus = AWSIoTMQTTStatusProtocolError; + [strongSelf notifyConnectionStatus]; + AWSDDLogError(@"Disconnecting."); + [strongSelf disconnect]; + break; + default: + break; + } + }); } @@ -1133,6 +1152,8 @@ - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventC - (void)session:(AWSMQTTSession*)session newMessage:(AWSMQTTMessage*)message onTopic:(NSString*)topic { + if (self.isBeingDeallocated) return; + AWSDDLogVerbose(@"MQTTSessionDelegate newMessage: %@ onTopic: %@",[[NSString alloc] initWithData:message.data encoding:NSUTF8StringEncoding], topic); NSArray *topicParts = [topic componentsSeparatedByString: @"/"]; @@ -1200,12 +1221,13 @@ - (void)session:(AWSMQTTSession*)session #pragma mark - callback handler - - (void)session:(AWSMQTTSession*)session newAckForMessageId:(UInt16)msgId { + if (self.isBeingDeallocated) return; + AWSDDLogVerbose(@"MQTTSessionDelegate new ack for msgId: %d", msgId); NSNumber *msgIdNumber = [NSNumber numberWithInt:msgId]; AWSIoTMQTTAckBlock callback = [[self ackCallbackDictionary] objectForKey:msgIdNumber]; if(callback) { - // Give callback to the client on a background thread dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ callback(); }); @@ -1216,20 +1238,19 @@ - (void)session:(AWSMQTTSession*)session newAckForMessageId:(UInt16)msgId { #pragma mark - AWSSRWebSocketDelegate - - (void)webSocketDidOpen:(AWSSRWebSocket *)webSocket { + if (self.isBeingDeallocated) return; + + if (!webSocket || webSocket != self.webSocket) { + AWSDDLogError(@"webSocketDidOpen called with invalid webSocket reference"); + return; + } + AWSDDLogInfo(@"Websocket did open and is connected."); - // The WebSocket is connected; at this point we need to create streams - // for MQTT encode/decode and then instantiate the MQTT client. + // Create bound pair for decoder CFReadStreamRef decoderReadStream; CFWriteStreamRef decoderWriteStream; - // CFStreamCreateBoundPair() requires addresses, so use the ivars for - // these properties. 128KB is the maximum message size for AWS IoT (see https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html). - // The streams should be able to buffer an entire maximum-sized message - // since the MQTT client isn't capable of dealing with partial reads. - - //Create a bound pair of read and write streams. Any data written to the write stream is received by the read stream. - // i.e., whatever is written to the "websocketOutputStream" is received by the "inputStream". CFStreamCreateBoundPair(nil, &decoderReadStream, &decoderWriteStream, 128*1024); // 128KB buffer size NSInputStream *inputStream = (__bridge_transfer NSInputStream *)decoderReadStream; self.websocketOutputStream = (__bridge_transfer NSOutputStream *)decoderWriteStream; @@ -1238,6 +1259,11 @@ - (void)webSocketDidOpen:(AWSSRWebSocket *)webSocket { //Create write stream to write to the WebSocket. self.encoderOutputStream = [AWSIoTWebSocketOutputStreamFactory createAWSIoTWebSocketOutputStreamWithWebSocket:webSocket]; + if (!inputStream || !self.websocketOutputStream || !self.encoderOutputStream) { + AWSDDLogError(@"Failed to create required streams for WebSocket connection"); + return; + } + //Cancel previous streams thread if necessary @synchronized(self) { if (self.streamsThread && !self.streamsThread.isCancelled) { @@ -1254,39 +1280,61 @@ - (void)webSocketDidOpen:(AWSSRWebSocket *)webSocket { } - - (void)webSocket:(AWSSRWebSocket *)webSocket didFailWithError:(NSError *)error { + if (self.isBeingDeallocated) return; + if (!webSocket || webSocket != self.webSocket) { + AWSDDLogError(@"webSocket:didFailWithError called with invalid webSocket"); + return; + } + AWSDDLogError(@"didFailWithError: Websocket failed With Error %@", error); - // The WebSocket has failed.The input/output streams can be closed here. - // Also, the webSocket can be set to nil [self cleanUpWebsocketOutputStream]; - [self.encoderOutputStream close]; - [self.webSocket close]; - self.webSocket = nil; + @synchronized(self) { + if (self.encoderOutputStream) { + [self.encoderOutputStream close]; + self.encoderOutputStream = nil; + } + } + + @synchronized(self) { + if (self.webSocket) { + self.webSocket.delegate = nil; + [self.webSocket close]; + self.webSocket = nil; + } + } - // If this is not because of user initated disconnect, setup timer to retry. if (!self.userDidIssueDisconnect ) { self.mqttStatus = AWSIoTMQTTStatusConnectionError; - // Indicate an error to the connection status callback. [self notifyConnectionStatus]; - - @synchronized(self) { - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:self]; - [self.reconnectThread start]; - } + [self scheduleReconnectGCD]; } } - (void)webSocket:(AWSSRWebSocket *)webSocket didReceiveMessage:(id)message { + if (self.isBeingDeallocated) return; + if (!webSocket || webSocket != self.webSocket) { + AWSDDLogError(@"webSocket:didReceiveMessage called with invalid webSocket"); + return; + } + NSOutputStream *wsOut = nil; + @synchronized (self) { + wsOut = self.websocketOutputStream; + } + if (!wsOut) { + return; + } + if ([message isKindOfClass:[NSData class]]) { NSData *messageData = (NSData *)message; AWSDDLogVerbose(@"Websocket didReceiveMessage: Received %lu bytes", (unsigned long)messageData.length); - // When a message is received, write it to the Decoder's input stream. - [self.websocketOutputStream write:[messageData bytes] maxLength:messageData.length]; + @synchronized (wsOut) { + [wsOut write:[messageData bytes] maxLength:messageData.length]; + } } else { @@ -1295,48 +1343,73 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didReceiveMessage:(id)message { } - (void)webSocket:(AWSSRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean { + if (self.isBeingDeallocated) return; + if (!webSocket || webSocket != self.webSocket) { + AWSDDLogError(@"webSocket:didCloseWithCode called with invalid webSocket"); + return; + } + AWSDDLogInfo(@"WebSocket closed with code:%ld with reason:%@", (long)code, reason); - // The WebSocket has closed. The input/output streams can be closed here. [self cleanUpWebsocketOutputStream]; - [self.encoderOutputStream close]; - [self.webSocket close]; - self.webSocket = nil; + @synchronized(self) { + if (self.encoderOutputStream) { + [self.encoderOutputStream close]; + self.encoderOutputStream = nil; + } + } + + @synchronized(self) { + if (self.webSocket) { + self.webSocket.delegate = nil; + [self.webSocket close]; + self.webSocket = nil; + } + } - // If this is not because of user initated disconnect, setup timer to retry. if (!self.userDidIssueDisconnect ) { self.mqttStatus = AWSIoTMQTTStatusConnectionError; - // Indicate an error to the connection status callback. [self notifyConnectionStatus]; - - @synchronized(self) { - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:self]; - [self.reconnectThread start]; - } + [self scheduleReconnectGCD]; } } - (void)webSocket:(AWSSRWebSocket *)webSocket didReceivePong:(NSData *)pongPayload { + if (self.isBeingDeallocated) return; + if (!webSocket || webSocket != self.webSocket) return; AWSDDLogVerbose(@"Websocket received pong"); } - # pragma mark - private/serial functions - -- (void)scheduleReconnection { - dispatch_assert_queue(self.timerQueue); - - BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting; - if (!self.reconnectTimer && !isConnectingOrConnected) { - self.reconnectTimer = [NSTimer timerWithTimeInterval:self.currentReconnectTime - target:self - selector: @selector(reconnectToSession) - userInfo:nil - repeats:NO]; - [[NSRunLoop currentRunLoop] addTimer:self.reconnectTimer forMode:NSDefaultRunLoopMode]; - [[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]; - } +// New GCD-based reconnect scheduler +- (void)scheduleReconnectGCD { + if (self.isBeingDeallocated) return; + dispatch_async(self.timerQueue, ^{ + BOOL isConnectingOrConnected = self.mqttStatus == AWSIoTMQTTStatusConnected || self.mqttStatus == AWSIoTMQTTStatusConnecting; + if (self.reconnectTimerSource || isConnectingOrConnected || self.userDidIssueDisconnect) { + return; + } + __weak AWSIoTMQTTClient *weakSelf = self; + self.reconnectTimerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.timerQueue); + if (!self.reconnectTimerSource) { + return; + } + uint64_t intervalNs = (uint64_t)(self.currentReconnectTime * NSEC_PER_SEC); + dispatch_source_set_timer(self.reconnectTimerSource, dispatch_time(DISPATCH_TIME_NOW, intervalNs), DISPATCH_TIME_FOREVER, (uint64_t)(0.1 * NSEC_PER_SEC)); + dispatch_source_set_event_handler(self.reconnectTimerSource, ^{ + __strong AWSIoTMQTTClient *strongSelf = weakSelf; + if (!strongSelf) return; + dispatch_source_cancel(strongSelf.reconnectTimerSource); + strongSelf.reconnectTimerSource = nil; + [strongSelf reconnectToSession]; + }); + dispatch_source_set_cancel_handler(self.reconnectTimerSource, ^{ + // Clear runloop reference after unscheduling streams + }); + dispatch_resume(self.reconnectTimerSource); + }); } @end diff --git a/AWSIoT/Internal/AWSIoTStreamThread.m b/AWSIoT/Internal/AWSIoTStreamThread.m index 8ad47bcc445..00f86bb2034 100644 --- a/AWSIoT/Internal/AWSIoTStreamThread.m +++ b/AWSIoT/Internal/AWSIoTStreamThread.m @@ -22,17 +22,63 @@ @interface AWSIoTStreamThread() @property(nonatomic, strong, nullable) NSOutputStream *encoderOutputStream; @property(nonatomic, strong, nullable) NSInputStream *decoderInputStream; @property(nonatomic, strong, nullable) NSOutputStream *outputStream; -@property(nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer; -@property(nonatomic, strong, nullable) NSRunLoop *runLoopForStreamsThread; +@property(atomic, strong, nullable) NSTimer *defaultRunLoopTimer; +@property(atomic, strong, nullable) NSRunLoop *runLoopForStreamsThread; @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; -@property(nonatomic, assign) BOOL isRunning; -@property(nonatomic, assign) BOOL shouldDisconnect; -@property(nonatomic, strong) dispatch_queue_t serialQueue; -@property(nonatomic, assign) BOOL didCleanUp; +@property(atomic, assign) BOOL isRunning; +@property(atomic, assign) BOOL shouldDisconnect; +@property(atomic, assign) BOOL didCleanUp; +@property(atomic, assign) BOOL isDeallocationInProgress; + @end @implementation AWSIoTStreamThread +- (void)dealloc { + _isDeallocationInProgress = YES; + + [[NSNotificationCenter defaultCenter] removeObserver:self]; + + // ALWAYS use minimal cleanup during dealloc to avoid crashes + // - Minimal cleanup is safer during object destruction + // - Avoids runloop operations that could crash during dealloc + // - Ignores shouldDisconnect flag for safety + [self performMinimalCleanup]; +} + +- (void)performMinimalCleanup { + if (_didCleanUp) { + return; + } + _didCleanUp = YES; + + // Stop timer to prevent callbacks during/after deallocation + if (_defaultRunLoopTimer) { + [_defaultRunLoopTimer invalidate]; + _defaultRunLoopTimer = nil; + } + + if (_outputStream) { + _outputStream.delegate = nil; + _outputStream = nil; + } + + if (_decoderInputStream) { + _decoderInputStream = nil; + } + + if (_encoderOutputStream) { + _encoderOutputStream = nil; + } + + if (_session) { + _session = nil; + } + + // Clear callback to break retain cycles + _onStop = nil; +} + - (nonnull instancetype)initWithSession:(nonnull AWSMQTTSession *)session decoderInputStream:(nonnull NSInputStream *)decoderInputStream encoderOutputStream:(nonnull NSOutputStream *)encoderOutputStream { @@ -53,8 +99,8 @@ - (instancetype)initWithSession:(nonnull AWSMQTTSession *)session _outputStream = outputStream; _defaultRunLoopTimeInterval = 10; _shouldDisconnect = NO; - _serialQueue = dispatch_queue_create("com.amazonaws.iot.streamthread.syncQueue", DISPATCH_QUEUE_SERIAL); _didCleanUp = NO; + _isDeallocationInProgress = NO; } return self; } @@ -92,11 +138,13 @@ - (void)main { [self.session connectToInputStream:self.decoderInputStream outputStream:self.encoderOutputStream]; - while ([self shouldContinueRunning]) { - //This will continue run until the thread is cancelled - //Run one cycle of the runloop. This will return after a input source event or timer event is processed - [self.runLoopForStreamsThread runMode:NSDefaultRunLoopMode - beforeDate:[NSDate dateWithTimeIntervalSinceNow:self.defaultRunLoopTimeInterval]]; + // Capture values locally to prevent crashes if self is deallocated + NSRunLoop *runLoop = self.runLoopForStreamsThread; + NSTimeInterval interval = self.defaultRunLoopTimeInterval; + + while ([self shouldContinueRunning] && runLoop) { + [runLoop runMode:NSDefaultRunLoopMode + beforeDate:[NSDate dateWithTimeIntervalSinceNow:interval]]; } [self cleanUp]; @@ -105,76 +153,101 @@ - (void)main { } - (BOOL)shouldContinueRunning { - __block BOOL shouldRun; - dispatch_sync(self.serialQueue, ^{ - shouldRun = self.isRunning && !self.isCancelled && self.defaultRunLoopTimer != nil; - }); - return shouldRun; + if (self.isDeallocationInProgress) { + return NO; + } + + if (!self.runLoopForStreamsThread || !self.defaultRunLoopTimer) { + return NO; + } + + return self.isRunning && !self.isCancelled; } - (void)cancel { AWSDDLogVerbose(@"Issued Cancel on thread [%@]", (NSThread *)self); - dispatch_sync(self.serialQueue, ^{ - self.isRunning = NO; - [super cancel]; - }); + + if (self.isDeallocationInProgress) { + return; + } + + // Invalidate timer immediately to break retain cycle + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; + } + + // Atomic property, no synchronization needed + self.isRunning = NO; + [super cancel]; } - (void)cancelAndDisconnect:(BOOL)shouldDisconnect { AWSDDLogVerbose(@"Issued Cancel and Disconnect = [%@] on thread [%@]", shouldDisconnect ? @"YES" : @"NO", (NSThread *)self); - dispatch_sync(self.serialQueue, ^{ - self.shouldDisconnect = shouldDisconnect; - self.isRunning = NO; - [super cancel]; - }); + + if (self.isDeallocationInProgress) { + return; + } + + // Invalidate timer immediately to break retain cycle + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; + } + + // Set flags and cancel - properties are atomic + self.shouldDisconnect = shouldDisconnect; + self.isRunning = NO; + [super cancel]; } - (void)cleanUp { - dispatch_sync(self.serialQueue, ^{ - if (self.didCleanUp) { - AWSDDLogVerbose(@"Clean up already called for thread: [%@]", (NSThread *)self); - return; + if (self.didCleanUp) { + return; + } + self.didCleanUp = YES; + + // Stop timer to prevent callbacks + if (self.defaultRunLoopTimer) { + [self.defaultRunLoopTimer invalidate]; + self.defaultRunLoopTimer = nil; + } + // Conditional cleanup based on shouldDisconnect flag + if (self.shouldDisconnect) { + if (self.outputStream) { + if (self.runLoopForStreamsThread) { + [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread + forMode:NSDefaultRunLoopMode]; + } + self.outputStream.delegate = nil; + [self.outputStream close]; + self.outputStream = nil; } - self.didCleanUp = YES; - if (self.defaultRunLoopTimer) { - [self.defaultRunLoopTimer invalidate]; - self.defaultRunLoopTimer = nil; + if (self.decoderInputStream) { + [self.decoderInputStream close]; + self.decoderInputStream = nil; } - if (self.shouldDisconnect) { - // Properly handle session closure first - if (self.session) { - [self.session close]; - self.session = nil; - } - - // Make sure we handle the streams in a thread-safe way - if (self.outputStream) { - // Remove from runLoop first before closing - if (self.runLoopForStreamsThread) { - [self.outputStream removeFromRunLoop:self.runLoopForStreamsThread - forMode:NSDefaultRunLoopMode]; - } - self.outputStream.delegate = nil; - [self.outputStream close]; - self.outputStream = nil; - } - - if (self.decoderInputStream) { - [self.decoderInputStream close]; - self.decoderInputStream = nil; - } - + if (self.session) { + [self.session close]; + self.session = nil; + } + + @synchronized(self.encoderOutputStream) { if (self.encoderOutputStream) { [self.encoderOutputStream close]; self.encoderOutputStream = nil; } - } else { - AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); } + } else { + AWSDDLogVerbose(@"Skipping disconnect for thread: [%@]", (NSThread *)self); + } - // Make sure onStop is called on the main thread to avoid UI issues + self.runLoopForStreamsThread = nil; + + // Handle onStop callback on main thread (skip during deallocation to avoid async operations) + if (!self.isDeallocationInProgress) { void (^stopBlock)(void) = self.onStop; if (stopBlock) { self.onStop = nil; @@ -182,7 +255,7 @@ - (void)cleanUp { stopBlock(); }); } - }); + } } @end diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTDecoder.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTDecoder.m index 6e3eb52770a..e36fe9128d9 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTDecoder.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTDecoder.m @@ -41,6 +41,8 @@ - (id)initWithStream:(NSInputStream*)aStream - (void)open { AWSDDLogDebug(@"opening decoder stream."); + if (stream == nil) + return; [stream setDelegate:self]; [stream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode]; [stream open]; @@ -48,6 +50,8 @@ - (void)open { - (void)close { AWSDDLogDebug(@"closing decoder stream."); + if (stream == nil) + return; [stream setDelegate:nil]; [stream close]; stream = nil; diff --git a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m index 9a6fc4986e7..9fee8143185 100644 --- a/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m +++ b/AWSIoT/Internal/MQTTSDK/AWSMQTTEncoder.m @@ -38,6 +38,8 @@ - (id)initWithStream:(NSOutputStream*)aStream - (void)open { AWSDDLogDebug(@"opening encoder stream."); + if (self.stream == nil) + return; [self.stream setDelegate:self]; [self.stream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode]; [self.stream open]; @@ -45,6 +47,8 @@ - (void)open { - (void)close { AWSDDLogDebug(@"closing encoder stream."); + if (self.stream == nil) + return; [self.stream close]; [self.stream setDelegate:nil]; self.stream = nil; diff --git a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m index 5d12f019f42..a0d34d0007c 100644 --- a/AWSIoTUnitTests/AWSIoTStreamThreadTests.m +++ b/AWSIoTUnitTests/AWSIoTStreamThreadTests.m @@ -19,10 +19,9 @@ @interface AWSIoTStreamThread() @property(nonatomic, assign) NSTimeInterval defaultRunLoopTimeInterval; -@property (nonatomic, assign) BOOL isRunning; -@property (nonatomic, strong) dispatch_queue_t serialQueue; -@property (nonatomic, assign) BOOL didCleanUp; -@property (nonatomic, strong, nullable) NSTimer *defaultRunLoopTimer; +@property (atomic, assign) BOOL isRunning; +@property (atomic, assign) BOOL didCleanUp; +@property (atomic, strong, nullable) NSTimer *defaultRunLoopTimer; @end @@ -155,14 +154,13 @@ - (void)testCancelAndDisconnect_shouldSynchronizeOnCleanupQueue { [self.thread cancelAndDisconnect:YES]; - // Validate synchronization - __block BOOL didSynchronize = NO; - dispatch_sync(self.thread.serialQueue, ^{ - didSynchronize = YES; - }); - - XCTAssertTrue(didSynchronize, @"The cleanupQueue should synchronize the operations"); + // With atomic properties, synchronization is handled automatically + // Validate that cleanup completed by checking atomic properties [self waitForExpectations:@[stopExpectation] timeout:1]; + + // The atomic properties ensure thread-safe access without explicit queues + XCTAssertTrue(self.thread.didCleanUp, @"didCleanUp should be YES after cleanup"); + XCTAssertFalse(self.thread.isRunning, @"isRunning should be NO after cleanup"); } @end