178178 * @brief The length of the MQTT metrics string.
179179 */
180180#define METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( METRICS_STRING ) - 1 ) )
181+
182+ /**
183+ * @brief The length of the outgoing publish records array used by the coreMQTT
184+ * library to track QoS > 0 packet ACKS for outgoing publishes.
185+ */
186+ #define OUTGOING_PUBLISH_RECORD_LEN ( 10U )
187+
188+ /**
189+ * @brief The length of the incoming publish records array used by the coreMQTT
190+ * library to track QoS > 0 packet ACKS for incoming publishes.
191+ */
192+ #define INCOMING_PUBLISH_RECORD_LEN ( 10U )
181193/*-----------------------------------------------------------*/
182194
183195/**
@@ -204,6 +216,13 @@ struct NetworkContext
204216};
205217/*-----------------------------------------------------------*/
206218
219+ /**
220+ * @brief Packet Identifier updated when an ACK packet is received.
221+ *
222+ * It is used to match an expected ACK for a transmitted packet.
223+ */
224+ static uint16_t globalAckPacketIdentifier = 0U ;
225+
207226/**
208227 * @brief Packet Identifier generated when Subscribe request was sent to the broker.
209228 *
@@ -258,6 +277,24 @@ static bool mqttSessionEstablished = false;
258277 * publish messages.
259278 */
260279static MQTTPublishCallback_t appPublishCallback = NULL ;
280+
281+ /**
282+ * @brief Array to track the outgoing publish records for outgoing publishes
283+ * with QoS > 0.
284+ *
285+ * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0.
286+ *
287+ */
288+ static MQTTPubAckInfo_t pOutgoingPublishRecords [ OUTGOING_PUBLISH_RECORD_LEN ];
289+
290+ /**
291+ * @brief Array to track the incoming publish records for incoming publishes
292+ * with QoS > 0.
293+ *
294+ * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0.
295+ *
296+ */
297+ static MQTTPubAckInfo_t pIncomingPublishRecords [ INCOMING_PUBLISH_RECORD_LEN ];
261298/*-----------------------------------------------------------*/
262299
263300/**
@@ -336,13 +373,29 @@ static void mqttCallback( MQTTContext_t * pMqttContext,
336373 * false otherwise.
337374 */
338375static bool handlePublishResend ( MQTTContext_t * pMqttContext );
376+
377+ /**
378+ * @brief Wait for an expected ACK packet to be received.
379+ *
380+ * This function handles waiting for an expected ACK packet by calling
381+ * #MQTT_ProcessLoop and waiting for #mqttCallback to set the global ACK
382+ * packet identifier to the expected ACK packet identifier.
383+ *
384+ * @param[in] pMqttContext MQTT context pointer.
385+ * @param[in] usPacketIdentifier Packet identifier for expected ACK packet.
386+ * @param[in] ulTimeout Maximum duration to wait for expected ACK packet.
387+ *
388+ * @return true if the expected ACK packet was received, false otherwise.
389+ */
390+ static bool waitForPacketAck ( MQTTContext_t * pMqttContext ,
391+ uint16_t usPacketIdentifier ,
392+ uint32_t ulTimeout );
339393/*-----------------------------------------------------------*/
340394
341395static uint32_t generateRandomNumber ()
342396{
343397 return ( rand () );
344398}
345-
346399/*-----------------------------------------------------------*/
347400
348401static bool connectToBrokerWithBackoffRetries ( NetworkContext_t * pNetworkContext )
@@ -556,6 +609,9 @@ static void mqttCallback( MQTTContext_t * pMqttContext,
556609 /* Make sure the ACK packet identifier matches with the request
557610 * packet identifier. */
558611 assert ( globalSubscribePacketIdentifier == packetIdentifier );
612+
613+ /* Update the global ACK packet identifier. */
614+ globalAckPacketIdentifier = packetIdentifier ;
559615 break ;
560616
561617 case MQTT_PACKET_TYPE_UNSUBACK :
@@ -564,6 +620,9 @@ static void mqttCallback( MQTTContext_t * pMqttContext,
564620 /* Make sure the ACK packet identifier matches with the request
565621 * packet identifier. */
566622 assert ( globalUnsubscribePacketIdentifier == packetIdentifier );
623+
624+ /* Update the global ACK packet identifier. */
625+ globalAckPacketIdentifier = packetIdentifier ;
567626 break ;
568627
569628 case MQTT_PACKET_TYPE_PINGRESP :
@@ -578,6 +637,9 @@ static void mqttCallback( MQTTContext_t * pMqttContext,
578637 LogDebug ( ( "PUBACK received for packet id %u." ,
579638 packetIdentifier ) );
580639
640+ /* Update the global ACK packet identifier. */
641+ globalAckPacketIdentifier = packetIdentifier ;
642+
581643 /* Cleanup the publish packet from the #outgoingPublishPackets
582644 * array when a PUBACK is received. */
583645 cleanupOutgoingPublishWithPacketID ( packetIdentifier );
@@ -641,6 +703,53 @@ static bool handlePublishResend( MQTTContext_t * pMqttContext )
641703}
642704/*-----------------------------------------------------------*/
643705
706+ static bool waitForPacketAck ( MQTTContext_t * pMqttContext ,
707+ uint16_t usPacketIdentifier ,
708+ uint32_t ulTimeout )
709+ {
710+ uint32_t ulMqttProcessLoopEntryTime ;
711+ uint32_t ulMqttProcessLoopTimeoutTime ;
712+ uint32_t ulCurrentTime ;
713+
714+ MQTTStatus_t eMqttStatus = MQTTSuccess ;
715+ bool xStatus = false;
716+
717+ /* Reset the ACK packet identifier being received. */
718+ globalAckPacketIdentifier = 0U ;
719+
720+ ulCurrentTime = pMqttContext -> getTime ();
721+ ulMqttProcessLoopEntryTime = ulCurrentTime ;
722+ ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeout ;
723+
724+ /* Call MQTT_ProcessLoop multiple times until the expected packet ACK
725+ * is received, a timeout happens, or MQTT_ProcessLoop fails. */
726+ while ( ( globalAckPacketIdentifier != usPacketIdentifier ) &&
727+ ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) &&
728+ ( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) )
729+ {
730+ /* Event callback will set #globalAckPacketIdentifier when receiving
731+ * appropriate packet. */
732+ eMqttStatus = MQTT_ProcessLoop ( pMqttContext );
733+ ulCurrentTime = pMqttContext -> getTime ();
734+ }
735+
736+ if ( ( ( eMqttStatus != MQTTSuccess ) && ( eMqttStatus != MQTTNeedMoreBytes ) ) ||
737+ ( globalAckPacketIdentifier != usPacketIdentifier ) )
738+ {
739+ LogError ( ( "MQTT_ProcessLoop failed to receive ACK packet: Expected ACK Packet ID=%02X, LoopDuration=%u, Status=%s" ,
740+ usPacketIdentifier ,
741+ ( ulCurrentTime - ulMqttProcessLoopEntryTime ),
742+ MQTT_Status_strerror ( eMqttStatus ) ) );
743+ }
744+ else
745+ {
746+ xStatus = true;
747+ }
748+
749+ return xStatus ;
750+ }
751+ /*-----------------------------------------------------------*/
752+
644753bool EstablishMqttSession ( MQTTPublishCallback_t publishCallback )
645754{
646755 bool returnStatus = false;
@@ -678,6 +787,7 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback )
678787 transport .pNetworkContext = pNetworkContext ;
679788 transport .send = Openssl_Send ;
680789 transport .recv = Openssl_Recv ;
790+ transport .writev = NULL ;
681791
682792 /* Fill the values for network buffer. */
683793 networkBuffer .pBuffer = buffer ;
@@ -696,55 +806,70 @@ bool EstablishMqttSession( MQTTPublishCallback_t publishCallback )
696806 if ( mqttStatus != MQTTSuccess )
697807 {
698808 returnStatus = false;
699- LogError ( ( "MQTT init failed with status %s." ,
809+ LogError ( ( "MQTT_Init failed with status %s." ,
700810 MQTT_Status_strerror ( mqttStatus ) ) );
701811 }
702812 else
703813 {
704- /* Establish an MQTT session by sending a CONNECT packet. */
705-
706- /* If #createCleanSession is true, start with a clean session
707- * i.e. direct the MQTT broker to discard any previous session data.
708- * If #createCleanSession is false, direct the broker to attempt to
709- * reestablish a session which was already present. */
710- connectInfo .cleanSession = createCleanSession ;
711-
712- /* The client identifier is used to uniquely identify this MQTT client to
713- * the MQTT broker. In a production device the identifier can be something
714- * unique, such as a device serial number. */
715- connectInfo .pClientIdentifier = CLIENT_IDENTIFIER ;
716- connectInfo .clientIdentifierLength = CLIENT_IDENTIFIER_LENGTH ;
717-
718- /* The maximum time interval in seconds which is allowed to elapse
719- * between two Control Packets.
720- * It is the responsibility of the client to ensure that the interval between
721- * control packets being sent does not exceed the this keep-alive value. In the
722- * absence of sending any other control packets, the client MUST send a
723- * PINGREQ packet. */
724- connectInfo .keepAliveSeconds = MQTT_KEEP_ALIVE_INTERVAL_SECONDS ;
725-
726- /* Username and password for authentication. Not used in this demo. */
727- connectInfo .pUserName = METRICS_STRING ;
728- connectInfo .userNameLength = METRICS_STRING_LENGTH ;
729- connectInfo .pPassword = NULL ;
730- connectInfo .passwordLength = 0U ;
731-
732- /* Send an MQTT CONNECT packet to the broker. */
733- mqttStatus = MQTT_Connect ( pMqttContext ,
734- & connectInfo ,
735- NULL ,
736- CONNACK_RECV_TIMEOUT_MS ,
737- & sessionPresent );
814+ mqttStatus = MQTT_InitStatefulQoS ( pMqttContext ,
815+ pOutgoingPublishRecords ,
816+ OUTGOING_PUBLISH_RECORD_LEN ,
817+ pIncomingPublishRecords ,
818+ INCOMING_PUBLISH_RECORD_LEN );
738819
739820 if ( mqttStatus != MQTTSuccess )
740821 {
741822 returnStatus = false;
742- LogError ( ( "Connection with MQTT broker failed with status %s." ,
823+ LogError ( ( "MQTT_InitStatefulQoS failed with status %s." ,
743824 MQTT_Status_strerror ( mqttStatus ) ) );
744825 }
745826 else
746827 {
747- LogDebug ( ( "MQTT connection successfully established with broker." ) );
828+ /* Establish an MQTT session by sending a CONNECT packet. */
829+
830+ /* If #createCleanSession is true, start with a clean session
831+ * i.e. direct the MQTT broker to discard any previous session data.
832+ * If #createCleanSession is false, direct the broker to attempt to
833+ * reestablish a session which was already present. */
834+ connectInfo .cleanSession = createCleanSession ;
835+
836+ /* The client identifier is used to uniquely identify this MQTT client to
837+ * the MQTT broker. In a production device the identifier can be something
838+ * unique, such as a device serial number. */
839+ connectInfo .pClientIdentifier = CLIENT_IDENTIFIER ;
840+ connectInfo .clientIdentifierLength = CLIENT_IDENTIFIER_LENGTH ;
841+
842+ /* The maximum time interval in seconds which is allowed to elapse
843+ * between two Control Packets.
844+ * It is the responsibility of the client to ensure that the interval between
845+ * control packets being sent does not exceed the this keep-alive value. In the
846+ * absence of sending any other control packets, the client MUST send a
847+ * PINGREQ packet. */
848+ connectInfo .keepAliveSeconds = MQTT_KEEP_ALIVE_INTERVAL_SECONDS ;
849+
850+ /* Username and password for authentication. Not used in this demo. */
851+ connectInfo .pUserName = METRICS_STRING ;
852+ connectInfo .userNameLength = METRICS_STRING_LENGTH ;
853+ connectInfo .pPassword = NULL ;
854+ connectInfo .passwordLength = 0U ;
855+
856+ /* Send an MQTT CONNECT packet to the broker. */
857+ mqttStatus = MQTT_Connect ( pMqttContext ,
858+ & connectInfo ,
859+ NULL ,
860+ CONNACK_RECV_TIMEOUT_MS ,
861+ & sessionPresent );
862+
863+ if ( mqttStatus != MQTTSuccess )
864+ {
865+ returnStatus = false;
866+ LogError ( ( "Connection with MQTT broker failed with status %s." ,
867+ MQTT_Status_strerror ( mqttStatus ) ) );
868+ }
869+ else
870+ {
871+ LogDebug ( ( "MQTT connection successfully established with broker." ) );
872+ }
748873 }
749874 }
750875
@@ -867,17 +992,9 @@ bool SubscribeToTopic( const char * pTopicFilter,
867992 * of receiving publish message before subscribe ack is zero; but application
868993 * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to
869994 * receive packet from network. */
870- mqttStatus = MQTT_ProcessLoop ( pMqttContext , MQTT_PROCESS_LOOP_TIMEOUT_MS );
871-
872- if ( mqttStatus != MQTTSuccess )
873- {
874- LogError ( ( "MQTT_ProcessLoop returned with status = %s." ,
875- MQTT_Status_strerror ( mqttStatus ) ) );
876- }
877- else
878- {
879- returnStatus = true;
880- }
995+ returnStatus = waitForPacketAck ( pMqttContext ,
996+ globalSubscribePacketIdentifier ,
997+ MQTT_PROCESS_LOOP_TIMEOUT_MS );
881998 }
882999
8831000 return returnStatus ;
@@ -927,17 +1044,9 @@ bool UnsubscribeFromTopic( const char * pTopicFilter,
9271044 /* Process incoming packet from the broker. Acknowledgment for unsubscribe
9281045 * operation ( UNSUBACK ) will be received here. This demo uses
9291046 * MQTT_ProcessLoop to receive packet from network. */
930- mqttStatus = MQTT_ProcessLoop ( pMqttContext , MQTT_PROCESS_LOOP_TIMEOUT_MS );
931-
932- if ( mqttStatus != MQTTSuccess )
933- {
934- LogError ( ( "MQTT_ProcessLoop returned with status = %s." ,
935- MQTT_Status_strerror ( mqttStatus ) ) );
936- }
937- else
938- {
939- returnStatus = true;
940- }
1047+ returnStatus = waitForPacketAck ( pMqttContext ,
1048+ globalUnsubscribePacketIdentifier ,
1049+ MQTT_PROCESS_LOOP_TIMEOUT_MS );
9411050 }
9421051
9431052 return returnStatus ;
@@ -1009,17 +1118,30 @@ bool PublishToTopic( const char * pTopicFilter,
10091118}
10101119/*-----------------------------------------------------------*/
10111120
1012- bool ProcessLoop ( uint32_t timeoutMs )
1121+ bool ProcessLoopWithTimeout ( uint32_t ulTimeoutMs )
10131122{
1123+ uint32_t ulMqttProcessLoopTimeoutTime ;
1124+ uint32_t ulCurrentTime ;
1125+
1126+ MQTTStatus_t eMqttStatus = MQTTSuccess ;
10141127 bool returnStatus = false;
1015- MQTTStatus_t mqttStatus = MQTTSuccess ;
10161128
1017- mqttStatus = MQTT_ProcessLoop ( & mqttContext , timeoutMs );
1129+ ulCurrentTime = mqttContext .getTime ();
1130+ ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeoutMs ;
10181131
1019- if ( mqttStatus != MQTTSuccess )
1132+ /* Call MQTT_ProcessLoop multiple times until the timeout expires or
1133+ * #MQTT_ProcessLoop fails. */
1134+ while ( ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) &&
1135+ ( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) )
1136+ {
1137+ eMqttStatus = MQTT_ProcessLoop ( & mqttContext );
1138+ ulCurrentTime = mqttContext .getTime ();
1139+ }
1140+
1141+ if ( ( eMqttStatus != MQTTSuccess ) && ( eMqttStatus != MQTTNeedMoreBytes ) )
10201142 {
10211143 LogError ( ( "MQTT_ProcessLoop returned with status = %s." ,
1022- MQTT_Status_strerror ( mqttStatus ) ) );
1144+ MQTT_Status_strerror ( eMqttStatus ) ) );
10231145 }
10241146 else
10251147 {
0 commit comments