Skip to content

Commit 05b0f9f

Browse files
committed
pub-sub-demo: Add a new subscription topic
When device advisor test is active, subscribe to a topic `device_advisor_test` with QoS 1, so that the device advisor can publish the maximum TLS fragment (16023 bytes) to that topic. Signed-off-by: Devaraj Ranganna <[email protected]>
1 parent 90aed05 commit 05b0f9f

File tree

1 file changed

+70
-9
lines changed

1 file changed

+70
-9
lines changed

Projects/aws-iot-example/mqtt_demo_pub_sub.c

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@
100100
/**
101101
* @brief Size of statically allocated buffers for holding payloads.
102102
*/
103-
#define mqttexampleSTRING_BUFFER_LENGTH ( 100 )
103+
#if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 )
104+
#define mqttexampleSTRING_BUFFER_LENGTH ( 20480 )
105+
#else
106+
#define mqttexampleSTRING_BUFFER_LENGTH ( 512 )
107+
108+
#endif
104109

105110
/**
106111
* @brief Format of the loop-back topic.
@@ -116,36 +121,42 @@
116121
* this demo this can be a task number, when more than one tasks are publishing within a device.
117122
*
118123
*/
119-
#define mqttexampleLOOPBACK_TOPIC_FORMAT "pubsub/%s/task_%u"
124+
#define mqttexampleLOOPBACK_TOPIC_FORMAT "pubsub/%s/task_%u"
120125

121126
/**
122127
* @brief Format for the topic to which demo task sends PUBLISH messages to broker.
123128
* The topic is set by default as the loopback topic, so that device will receive the same message which is sent to the
124129
* broker.
125130
*/
126-
#define mqttexampleOUTPUT_TOPIC_FORMAT mqttexampleLOOPBACK_TOPIC_FORMAT
131+
#define mqttexampleOUTPUT_TOPIC_FORMAT mqttexampleLOOPBACK_TOPIC_FORMAT
127132

128133
/**
129134
* @brief Size of the static buffer to hold the output topic name.
130135
* The buffer should accommodate the topic format string, thing name and the task number which is a 32bit integer.
131136
*/
132-
#define mqttexampleOUTPUT_TOPIC_BUFFER_LENGTH ( sizeof( mqttexampleOUTPUT_TOPIC_FORMAT ) + mqttexampleTHING_NAME_MAX_LENGTH + 10U )
137+
#define mqttexampleOUTPUT_TOPIC_BUFFER_LENGTH ( sizeof( mqttexampleOUTPUT_TOPIC_FORMAT ) + mqttexampleTHING_NAME_MAX_LENGTH + 10U )
133138

134139
/**
135140
* @brief Format for the topic to receive incoming messages from the MQTT broker.
136141
* Topic is set by default as the loopback topic so that the device will receive the same message which is published to the
137142
* broker.
138143
*/
139-
#define mqttexampleINPUT_TOPIC_FORMAT mqttexampleLOOPBACK_TOPIC_FORMAT
144+
#define mqttexampleINPUT_TOPIC_FORMAT mqttexampleLOOPBACK_TOPIC_FORMAT
140145

141146
/**
142147
* @brief Size of the static buffer to hold the topic name.
143148
* The buffer should accommodate the topic format string, thing name and the task number which is a 32bit integer.
144149
*/
145-
#define mqttexampleINPUT_TOPIC_BUFFER_LENGTH ( sizeof( mqttexampleINPUT_TOPIC_FORMAT ) + mqttexampleTHING_NAME_MAX_LENGTH + 10U )
150+
#define mqttexampleINPUT_TOPIC_BUFFER_LENGTH ( sizeof( mqttexampleINPUT_TOPIC_FORMAT ) + mqttexampleTHING_NAME_MAX_LENGTH + 10U )
146151

147152
static char cTopicFilter[ appCONFIG_MQTT_NUM_PUBSUB_TASKS ][ mqttexampleINPUT_TOPIC_BUFFER_LENGTH ];
148153

154+
#if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 )
155+
#define mqttexampleDEVICE_ADVISOR_TOPIC_FORMAT "device_advisor_test"
156+
#define mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH ( strlen( mqttexampleDEVICE_ADVISOR_TOPIC_FORMAT ) )
157+
static char cDeviceAdvisorTopicFilter[] = mqttexampleDEVICE_ADVISOR_TOPIC_FORMAT;
158+
#endif
159+
149160
/*-----------------------------------------------------------*/
150161

151162
/**
@@ -306,9 +317,8 @@ static void prvRegisterSubscribeCallback( const char * pTopicFilter,
306317
cTopicFilter[ usIndex ],
307318
xTopicFilterLen,
308319
&isMatch );
309-
assert( mqttStatus == MQTTSuccess );
310320

311-
if( isMatch )
321+
if( ( mqttStatus == MQTTSuccess ) && isMatch )
312322
{
313323
/* Add subscription so that incoming publishes are routed to the application callback. */
314324
subscriptionAdded = addSubscription( pTopicFilter,
@@ -323,6 +333,30 @@ static void prvRegisterSubscribeCallback( const char * pTopicFilter,
323333
topicFilterLength ) );
324334
}
325335
}
336+
337+
#if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 )
338+
mqttStatus = MQTT_MatchTopic( pTopicFilter,
339+
topicFilterLength,
340+
cDeviceAdvisorTopicFilter,
341+
mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH,
342+
&isMatch );
343+
344+
if( ( mqttStatus == MQTTSuccess ) && isMatch )
345+
{
346+
/* Add subscription so that incoming publishes are routed to the application callback. */
347+
subscriptionAdded = addSubscription( pTopicFilter,
348+
topicFilterLength,
349+
prvIncomingPublishCallback,
350+
NULL );
351+
352+
if( subscriptionAdded == false )
353+
{
354+
LogError( ( "Failed to register a publish callback for topic %.*s.",
355+
pTopicFilter,
356+
topicFilterLength ) );
357+
}
358+
}
359+
#endif /* if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 ) */
326360
}
327361
}
328362

@@ -578,7 +612,34 @@ void vSimpleSubscribePublishTask( void * pvParameters )
578612
}
579613
}
580614

581-
if( xStatus == pdTRUE )
615+
#if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 )
616+
if( xStatus == pdPASS )
617+
{
618+
/* Subscribe to the same topic to which this task will publish. That will
619+
* result in each published message being published from the server back to
620+
* the target. */
621+
622+
LogDebug( ( "Sending subscribe request to agent for topic filter: %.*s\n", mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH, cDeviceAdvisorTopicFilter ) );
623+
624+
xMQTTStatus = prvSubscribeToTopic( MQTTQoS1, cDeviceAdvisorTopicFilter, mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH );
625+
626+
if( xMQTTStatus != MQTTSuccess )
627+
{
628+
LogError( ( "Failed to subscribe to topic: %.*s\n",
629+
mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH,
630+
cDeviceAdvisorTopicFilter ) );
631+
xStatus = pdFALSE;
632+
}
633+
else
634+
{
635+
LogInfo( ( "Successfully subscribed to topic: %.*s\n",
636+
mqttexampleDEVICE_ADVISOR_TOPIC_BUFFER_LENGTH,
637+
cDeviceAdvisorTopicFilter ) );
638+
}
639+
}
640+
#endif /* if ( appCONFIG_DEVICE_ADVISOR_TEST_ACTIVE == 1 ) */
641+
642+
if( xStatus == pdPASS )
582643
{
583644
/* Create a topic name for this task to publish to. */
584645
xOutTopicLength = snprintf( cOutTopicBuf,

0 commit comments

Comments
 (0)