Skip to content

Commit bbe25d5

Browse files
Add MQTT_Unsubscribe
1 parent 4d386cc commit bbe25d5

File tree

4 files changed

+288
-96
lines changed

4 files changed

+288
-96
lines changed

source/core_mqtt.c

Lines changed: 100 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -216,22 +216,31 @@ static MQTTStatus_t sendSubscribeWithoutCopy( MQTTContext_t * pContext,
216216
const MQTTPropBuilder_t * pPropertyBuilder );
217217

218218
/**
219-
* @brief Send MQTT UNSUBSCRIBE message without copying the user data into a buffer and
220-
* directly sending it.
219+
* @brief Send Unsubscribe without copying the users data into any buffer.
221220
*
222221
* @param[in] pContext Initialized MQTT context.
223-
* @param[in] pSubscriptionList MQTT subscription info.
224-
* @param[in] subscriptionCount The count of elements in the list.
225-
* @param[in] packetId The packet ID of the unsubscribe packet.
226-
* @param[in] remainingLength The remaining length of the unsubscribe packet.
222+
* @param[in] pSubscriptionList List of MQTT subscription info.
223+
* @param[in] subscriptionCount Number of elements in pSubscriptionList.
224+
* @param[in] packetId Packet identifier.
225+
* @param[in] remainingLength Remaining length of the packet.
226+
* @param[in] pPropertyBuilder MQTT property builder.
227+
* @note This operation may call the transport send function
228+
* repeatedly to send bytes over the network until either:
229+
* 1. The requested number of bytes @a remainingLength have been sent.
230+
* OR
231+
* 2. MQTT_SEND_TIMEOUT_MS milliseconds have gone by since entering this
232+
* function.
233+
* OR
234+
* 3. There is an error in sending data over the network.
227235
*
228-
* @return #MQTTSuccess or #MQTTSendFailed.
236+
* @return #MQTTSendFailed or #MQTTSuccess.
229237
*/
230238
static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
231239
const MQTTSubscribeInfo_t * pSubscriptionList,
232240
size_t subscriptionCount,
233241
uint16_t packetId,
234-
size_t remainingLength );
242+
size_t remainingLength,
243+
const MQTTPropBuilder_t * pPropertyBuilder );
235244

236245
/**
237246
* @brief Calculate the interval between two millisecond timestamps, including
@@ -633,7 +642,7 @@ static void addSubscriptionOptions( const MQTTSubscribeInfo_t subscriptionInfo,
633642
*
634643
* @return #MQTTSuccess, #MQTTServerRefused, #MQTTBadResponse, #MQTTBadParameter, #MQTTEventCallbackFailed.
635644
*/
636-
static MQTTStatus_t handleSuback( MQTTContext_t * pContext,
645+
static MQTTStatus_t handleSubUnsubAck( MQTTContext_t * pContext,
637646
MQTTPacketInfo_t * pIncomingPacket );
638647

639648
/**
@@ -2224,7 +2233,7 @@ static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
22242233
case MQTT_PACKET_TYPE_SUBACK:
22252234
case MQTT_PACKET_TYPE_UNSUBACK:
22262235
/* Deserialize and give these to the app provided callback. */
2227-
status = handleSuback( pContext, pIncomingPacket );
2236+
status = handleSubUnsubAck( pContext, pIncomingPacket );
22282237
break;
22292238

22302239
default:
@@ -2668,65 +2677,100 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
26682677
const MQTTSubscribeInfo_t * pSubscriptionList,
26692678
size_t subscriptionCount,
26702679
uint16_t packetId,
2671-
size_t remainingLength )
2680+
size_t remainingLength,
2681+
const MQTTPropBuilder_t * pPropertyBuilder )
26722682
{
26732683
MQTTStatus_t status = MQTTSuccess;
26742684
uint8_t * pIndex;
2685+
2686+
/**
2687+
* Fixed Size Properties
2688+
*/
26752689
TransportOutVector_t pIoVector[ MQTT_SUB_UNSUB_MAX_VECTORS ];
26762690
TransportOutVector_t * pIterator;
26772691
uint8_t serializedTopicFieldLength[ MQTT_SUB_UNSUB_MAX_VECTORS ][ CORE_MQTT_SERIALIZED_LENGTH_FIELD_BYTES ];
26782692
size_t totalPacketLength = 0U;
2679-
size_t unsubscriptionsSent = 0U;
26802693
size_t ioVectorLength = 0U;
2681-
size_t vectorsAdded;
2694+
size_t unsubscriptionsSent = 0U;
2695+
size_t vectorsAdded = 0U;
26822696
size_t topicFieldLengthIndex;
2697+
size_t unsubscribePropLen = 0U;
26832698

2684-
/* Maximum number of bytes required by the 'fixed' part of the UNSUBSCRIBE
2685-
* packet header according to the MQTT specification.
2686-
* MQTT Control Byte 0 + 1 = 1
2687-
* Remaining length (max) + 4 = 5
2688-
* Packet ID + 2 = 7 */
2689-
uint8_t unsubscribeheader[ 7U ];
2699+
/**
2700+
* Maximum number of bytes by the fixed header of a SUBSCRIBE packet.
2701+
* MQTT Control Byte 0 + 1 = 1
2702+
* Remaining Length + 4 = 5
2703+
* Packet Id + 2 = 7
2704+
*/
2705+
uint8_t unsubscribeHeader[ 7U ];
26902706

2691-
/* The vector array should be at least three element long as the topic
2692-
* string needs these many vector elements to be stored. */
2693-
assert( MQTT_SUB_UNSUB_MAX_VECTORS >= CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH );
2707+
/**
2708+
* Maximum number of bytes to send the Property Length.
2709+
* Property Length 0 + 4 = 4
2710+
*/
2711+
uint8_t propertyLength[ 4U ];
26942712

2695-
pIndex = unsubscribeheader;
2713+
pIndex = unsubscribeHeader;
26962714
pIterator = pIoVector;
26972715

2698-
pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength,
2699-
pIndex,
2700-
packetId );
2716+
pIndex = MQTT_SerializeUnsubscribeHeader( remainingLength, pIndex, packetId );
2717+
2718+
pIterator->iov_base = unsubscribeHeader;
2719+
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
2720+
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
2721+
/* coverity[misra_c_2012_rule_18_2_violation] */
2722+
/* coverity[misra_c_2012_rule_10_8_violation] */
2723+
pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeHeader );
2724+
totalPacketLength += pIterator->iov_len;
2725+
pIterator++;
2726+
ioVectorLength++;
2727+
2728+
/**
2729+
* Sending Property Buffer
2730+
*/
2731+
if( ( pPropertyBuilder != NULL ) && ( pPropertyBuilder->pBuffer != NULL ) )
2732+
{
2733+
unsubscribePropLen = pPropertyBuilder->currentIndex;
2734+
}
27012735

2702-
/* The header is to be sent first. */
2703-
pIterator->iov_base = unsubscribeheader;
2736+
pIndex = encodeVariableLength( propertyLength, unsubscribePropLen );
2737+
pIterator->iov_base = propertyLength;
27042738
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-182 */
27052739
/* More details at: https://github.com/FreeRTOS/coreMQTT/blob/main/MISRA.md#rule-108 */
27062740
/* coverity[misra_c_2012_rule_18_2_violation] */
27072741
/* coverity[misra_c_2012_rule_10_8_violation] */
2708-
pIterator->iov_len = ( size_t ) ( pIndex - unsubscribeheader );
2742+
pIterator->iov_len = ( size_t ) ( pIndex - propertyLength );
27092743
totalPacketLength += pIterator->iov_len;
27102744
pIterator++;
27112745
ioVectorLength++;
27122746

2747+
if( unsubscribePropLen > 0U )
2748+
{
2749+
pIterator->iov_base = pPropertyBuilder->pBuffer;
2750+
pIterator->iov_len = pPropertyBuilder->currentIndex;
2751+
totalPacketLength += pIterator->iov_len;
2752+
pIterator++;
2753+
ioVectorLength++;
2754+
}
2755+
27132756
while( ( status == MQTTSuccess ) && ( unsubscriptionsSent < subscriptionCount ) )
27142757
{
27152758
/* Reset the index for next iteration. */
27162759
topicFieldLengthIndex = 0;
27172760

2718-
/* Check whether the subscription topic will fit in the given vector. */
2761+
/* Check whether the subscription topic (with QoS) will fit in the
2762+
* given vector. */
27192763
while( ( ioVectorLength <= ( MQTT_SUB_UNSUB_MAX_VECTORS - CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ) ) &&
27202764
( unsubscriptionsSent < subscriptionCount ) )
27212765
{
2722-
/* The topic filter gets sent next. */
2766+
/* The topic filter and the filter length gets sent next. (filter length - 2 bytes , topic filter - utf8 ) */
27232767
vectorsAdded = addEncodedStringToVector( serializedTopicFieldLength[ topicFieldLengthIndex ],
27242768
pSubscriptionList[ unsubscriptionsSent ].pTopicFilter,
27252769
pSubscriptionList[ unsubscriptionsSent ].topicFilterLength,
27262770
pIterator,
27272771
&totalPacketLength );
27282772

2729-
/* Update the iterator to point to the next empty location. */
2773+
/* Update the pointer after the above operation. */
27302774
pIterator = &pIterator[ vectorsAdded ];
27312775
/* Update the total count based on how many vectors were added. */
27322776
ioVectorLength += vectorsAdded;
@@ -2739,6 +2783,7 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,
27392783

27402784
if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalPacketLength )
27412785
{
2786+
LogError( ( "Error in sending UNSUBSCRIBE packet" ) );
27422787
status = MQTTSendFailed;
27432788
}
27442789

@@ -3613,8 +3658,8 @@ static void addSubscriptionOptions( const MQTTSubscribeInfo_t subscriptionInfo,
36133658

36143659
/*-----------------------------------------------------------*/
36153660

3616-
static MQTTStatus_t handleSuback( MQTTContext_t * pContext,
3617-
MQTTPacketInfo_t * pIncomingPacket )
3661+
static MQTTStatus_t handleSubUnsubAck( MQTTContext_t * pContext,
3662+
MQTTPacketInfo_t * pIncomingPacket )
36183663
{
36193664
MQTTStatus_t status = MQTTSuccess;
36203665
uint16_t packetIdentifier;
@@ -4383,29 +4428,37 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext )
43834428
MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
43844429
const MQTTSubscribeInfo_t * pSubscriptionList,
43854430
size_t subscriptionCount,
4386-
uint16_t packetId )
4431+
uint16_t packetId,
4432+
const MQTTPropBuilder_t * pPropertyBuilder )
43874433
{
43884434
MQTTConnectionStatus_t connectStatus;
43894435
size_t remainingLength = 0UL, packetSize = 0UL;
4436+
MQTTStatus_t status = MQTTSuccess;
43904437

4391-
/* TODO, add last param back. Will be fixed with MQTT_Unsubscribe API. */
43924438
/* Validate arguments. */
4393-
MQTTStatus_t status = validateSubscribeUnsubscribeParams( pContext,
4394-
pSubscriptionList,
4395-
subscriptionCount,
4396-
packetId,
4397-
0 /* TODO This one. */ );
4439+
status = validateSubscribeUnsubscribeParams( pContext,
4440+
pSubscriptionList,
4441+
subscriptionCount,
4442+
packetId,
4443+
MQTT_TYPE_UNSUBSCRIBE );
4444+
4445+
if( ( status == MQTTSuccess ) && ( pPropertyBuilder != NULL ) && ( pPropertyBuilder->pBuffer != NULL ) )
4446+
{
4447+
status = MQTT_ValidateUnsubscribeProperties( pPropertyBuilder );
4448+
}
43984449

43994450
if( status == MQTTSuccess )
44004451
{
44014452
/* Get the remaining length and packet size.*/
44024453
status = MQTT_GetUnsubscribePacketSize( pSubscriptionList,
44034454
subscriptionCount,
4455+
pPropertyBuilder,
44044456
&remainingLength,
4405-
&packetSize );
4406-
LogDebug( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
4407-
( unsigned long ) packetSize,
4408-
( unsigned long ) remainingLength ) );
4457+
&packetSize,
4458+
pContext->connectionProperties.serverMaxPacketSize );
4459+
LogInfo( ( "UNSUBSCRIBE packet size is %lu and remaining length is %lu.",
4460+
( unsigned long ) packetSize,
4461+
( unsigned long ) remainingLength ) );
44094462
}
44104463

44114464
if( status == MQTTSuccess )
@@ -4426,7 +4479,8 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext,
44264479
pSubscriptionList,
44274480
subscriptionCount,
44284481
packetId,
4429-
remainingLength );
4482+
remainingLength,
4483+
pPropertyBuilder );
44304484
}
44314485

44324486
MQTT_POST_STATE_UPDATE_HOOK( pContext );

0 commit comments

Comments
 (0)