Skip to content

Commit 00d256e

Browse files
committed
[nrf fromtree] net: mqtt: Add MQTT 5.0 support for PUBLISH
Add support for PUBLISH message specified in MQTT 5.0. The message encoder and decoder were updated to support MQTT properties. Signed-off-by: Robert Lubos <[email protected]> (cherry picked from commit 15ad90a)
1 parent cb3591a commit 00d256e

File tree

8 files changed

+332
-24
lines changed

8 files changed

+332
-24
lines changed

include/zephyr/net/mqtt.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,55 @@ struct mqtt_publish_param {
393393
* by the broker.
394394
*/
395395
uint8_t retain_flag : 1;
396+
397+
#if defined(CONFIG_MQTT_VERSION_5_0)
398+
/** MQTT 5.0 properties. */
399+
struct {
400+
/** MQTT 5.0, chapter 3.3.2.3.7 User Property. */
401+
struct mqtt_utf8_pair user_prop[CONFIG_MQTT_USER_PROPERTIES_MAX];
402+
403+
/** MQTT 5.0, chapter 3.3.2.3.5 Response Topic. */
404+
struct mqtt_utf8 response_topic;
405+
406+
/** MQTT 5.0, chapter 3.3.2.3.6 Correlation Data. */
407+
struct mqtt_binstr correlation_data;
408+
409+
/** MQTT 5.0, chapter 3.3.2.3.9 Content Type. */
410+
struct mqtt_utf8 content_type;
411+
412+
/** MQTT 5.0, chapter 3.3.2.3.8 Subscription Identifier. */
413+
uint32_t subscription_identifier[CONFIG_MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX];
414+
415+
/** MQTT 5.0, chapter 3.3.2.3.3 Message Expiry Interval. */
416+
uint32_t message_expiry_interval;
417+
418+
/** MQTT 5.0, chapter 3.3.2.3.4 Topic Alias. */
419+
uint16_t topic_alias;
420+
421+
/** MQTT 5.0, chapter 3.3.2.3.2 Payload Format Indicator. */
422+
uint8_t payload_format_indicator;
423+
424+
/** Flags indicating whether given property was present in received packet. */
425+
struct {
426+
/** Payload Format Indicator property was present. */
427+
bool has_payload_format_indicator;
428+
/** Message Expiry Interval property was present. */
429+
bool has_message_expiry_interval;
430+
/** Topic Alias property was present. */
431+
bool has_topic_alias;
432+
/** Response Topic property was present. */
433+
bool has_response_topic;
434+
/** Correlation Data property was present. */
435+
bool has_correlation_data;
436+
/** User Property property was present. */
437+
bool has_user_prop;
438+
/** Subscription Identifier property was present. */
439+
bool has_subscription_identifier;
440+
/** Content Type property was present. */
441+
bool has_content_type;
442+
} rx;
443+
} prop;
444+
#endif /* CONFIG_MQTT_VERSION_5_0 */
396445
};
397446

398447
/** @brief List of topics in a subscription request. */

subsys/net/lib/mqtt/Kconfig

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ config MQTT_USER_PROPERTIES_MAX
9191
Maximum number of user properties that the client can include in a
9292
packet or parse on input.
9393

94+
config MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX
95+
int "Maximum number of Subscription ID properties in a Publish packet"
96+
default 1
97+
range 1 32
98+
help
99+
Maximum number of Subscription ID properties that the client can
100+
parse when processing Publish message.
101+
94102
config MQTT_TOPIC_ALIAS_MAX
95103
int "Maximum number of supported topic aliases"
96104
default 5

subsys/net/lib/mqtt/mqtt.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ int mqtt_publish(struct mqtt_client *client,
262262
goto error;
263263
}
264264

265-
err_code = publish_encode(param, &packet);
265+
err_code = publish_encode(client, param, &packet);
266266
if (err_code < 0) {
267267
goto error;
268268
}

subsys/net/lib/mqtt/mqtt_decoder.c

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -155,19 +155,7 @@ static int unpack_raw_data(uint32_t length, struct buf_ctx *buf,
155155
return 0;
156156
}
157157

158-
/**
159-
* @brief Unpacks variable length integer from the buffer from the offset
160-
* requested.
161-
*
162-
* @param[inout] buf A pointer to the buf_ctx structure containing current
163-
* buffer position.
164-
* @param[out] val Memory where the value is to be unpacked.
165-
*
166-
* @retval Number of bytes parsed if the procedure is successful.
167-
* @retval -EINVAL if the length decoding would use more that 4 bytes.
168-
* @retval -EAGAIN if the buffer would be exceeded during the read.
169-
*/
170-
static int unpack_variable_int(struct buf_ctx *buf, uint32_t *val)
158+
int unpack_variable_int(struct buf_ctx *buf, uint32_t *val)
171159
{
172160
uint8_t shift = 0U;
173161
int bytes = 0;
@@ -435,6 +423,43 @@ int decode_user_property(struct property_decoder *prop,
435423
return 0;
436424
}
437425

426+
int decode_sub_id_property(struct property_decoder *prop,
427+
uint32_t *remaining_len,
428+
struct buf_ctx *buf)
429+
{
430+
uint32_t *sub_id_array = prop->data;
431+
uint32_t *chosen = NULL;
432+
uint32_t value;
433+
int bytes;
434+
435+
bytes = unpack_variable_int(buf, &value);
436+
if (bytes < 0) {
437+
return -EINVAL;
438+
}
439+
440+
if (*remaining_len < bytes) {
441+
return -EINVAL;
442+
}
443+
444+
*remaining_len -= bytes;
445+
*prop->found = true;
446+
447+
for (int i = 0; i < CONFIG_MQTT_SUBSCRIPTION_ID_PROPERTIES_MAX; i++) {
448+
if (sub_id_array[i] == 0) {
449+
chosen = &sub_id_array[i];
450+
break;
451+
}
452+
}
453+
454+
if (chosen == NULL) {
455+
NET_DBG("Cannot parse all subscription id properties, ignore excess");
456+
} else {
457+
*chosen = value;
458+
}
459+
460+
return 0;
461+
}
462+
438463
static int properties_decode(struct property_decoder *prop, uint8_t cnt,
439464
struct buf_ctx *buf)
440465
{
@@ -479,12 +504,14 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
479504
switch (type) {
480505
case MQTT_PROP_SESSION_EXPIRY_INTERVAL:
481506
case MQTT_PROP_MAXIMUM_PACKET_SIZE:
507+
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
482508
err = decode_uint32_property(current_prop,
483509
&properties_len, buf);
484510
break;
485511
case MQTT_PROP_RECEIVE_MAXIMUM:
486512
case MQTT_PROP_TOPIC_ALIAS_MAXIMUM:
487513
case MQTT_PROP_SERVER_KEEP_ALIVE:
514+
case MQTT_PROP_TOPIC_ALIAS:
488515
err = decode_uint16_property(current_prop,
489516
&properties_len, buf);
490517
break;
@@ -493,6 +520,7 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
493520
case MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE:
494521
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE:
495522
case MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE:
523+
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
496524
err = decode_uint8_property(current_prop,
497525
&properties_len, buf);
498526
break;
@@ -501,6 +529,8 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
501529
case MQTT_PROP_RESPONSE_INFORMATION:
502530
case MQTT_PROP_SERVER_REFERENCE:
503531
case MQTT_PROP_AUTHENTICATION_METHOD:
532+
case MQTT_PROP_RESPONSE_TOPIC:
533+
case MQTT_PROP_CONTENT_TYPE:
504534
err = decode_string_property(current_prop,
505535
&properties_len, buf);
506536
break;
@@ -509,9 +539,14 @@ static int properties_decode(struct property_decoder *prop, uint8_t cnt,
509539
&properties_len, buf);
510540
break;
511541
case MQTT_PROP_AUTHENTICATION_DATA:
542+
case MQTT_PROP_CORRELATION_DATA:
512543
err = decode_binary_property(current_prop,
513544
&properties_len, buf);
514545
break;
546+
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
547+
err = decode_sub_id_property(current_prop,
548+
&properties_len, buf);
549+
break;
515550
default:
516551
err = -ENOTSUP;
517552
}
@@ -665,7 +700,68 @@ int connect_ack_decode(const struct mqtt_client *client, struct buf_ctx *buf,
665700
return 0;
666701
}
667702

668-
int publish_decode(uint8_t flags, uint32_t var_length, struct buf_ctx *buf,
703+
#if defined(CONFIG_MQTT_VERSION_5_0)
704+
static int publish_properties_decode(struct buf_ctx *buf,
705+
struct mqtt_publish_param *param)
706+
{
707+
struct property_decoder prop[] = {
708+
{
709+
&param->prop.payload_format_indicator,
710+
&param->prop.rx.has_payload_format_indicator,
711+
MQTT_PROP_PAYLOAD_FORMAT_INDICATOR
712+
},
713+
{
714+
&param->prop.message_expiry_interval,
715+
&param->prop.rx.has_message_expiry_interval,
716+
MQTT_PROP_MESSAGE_EXPIRY_INTERVAL
717+
},
718+
{
719+
&param->prop.topic_alias,
720+
&param->prop.rx.has_topic_alias,
721+
MQTT_PROP_TOPIC_ALIAS
722+
},
723+
{
724+
&param->prop.response_topic,
725+
&param->prop.rx.has_response_topic,
726+
MQTT_PROP_RESPONSE_TOPIC
727+
},
728+
{
729+
&param->prop.correlation_data,
730+
&param->prop.rx.has_correlation_data,
731+
MQTT_PROP_CORRELATION_DATA
732+
},
733+
{
734+
&param->prop.user_prop,
735+
&param->prop.rx.has_user_prop,
736+
MQTT_PROP_USER_PROPERTY
737+
},
738+
{
739+
&param->prop.subscription_identifier,
740+
&param->prop.rx.has_subscription_identifier,
741+
MQTT_PROP_SUBSCRIPTION_IDENTIFIER
742+
},
743+
{
744+
&param->prop.content_type,
745+
&param->prop.rx.has_content_type,
746+
MQTT_PROP_CONTENT_TYPE
747+
}
748+
};
749+
750+
return properties_decode(prop, ARRAY_SIZE(prop), buf);
751+
}
752+
#else
753+
static int publish_properties_decode(struct buf_ctx *buf,
754+
struct mqtt_publish_param *param)
755+
{
756+
ARG_UNUSED(param);
757+
ARG_UNUSED(buf);
758+
759+
return -ENOTSUP;
760+
}
761+
#endif /* CONFIG_MQTT_VERSION_5_0 */
762+
763+
int publish_decode(const struct mqtt_client *client, uint8_t flags,
764+
uint32_t var_length, struct buf_ctx *buf,
669765
struct mqtt_publish_param *param)
670766
{
671767
int err_code;
@@ -691,6 +787,16 @@ int publish_decode(uint8_t flags, uint32_t var_length, struct buf_ctx *buf,
691787
var_header_length += sizeof(uint16_t);
692788
}
693789

790+
if (mqtt_is_version_5_0(client)) {
791+
err_code = publish_properties_decode(buf, param);
792+
if (err_code < 0) {
793+
return err_code;
794+
}
795+
796+
/* Add parsed properties length */
797+
var_header_length += err_code;
798+
}
799+
694800
if (var_length < var_header_length) {
695801
NET_ERR("Corrupted PUBLISH message, header length (%u) larger "
696802
"than total length (%u)", var_header_length,

subsys/net/lib/mqtt/mqtt_encoder.c

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,92 @@ int connect_request_encode(const struct mqtt_client *client,
862862
return mqtt_encode_fixed_header(message_type, start, buf);
863863
}
864864

865-
int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf)
865+
#if defined(CONFIG_MQTT_VERSION_5_0)
866+
static uint32_t publish_properties_length(const struct mqtt_publish_param *param)
867+
{
868+
return uint8_property_length(MQTT_PROP_PAYLOAD_FORMAT_INDICATOR,
869+
param->prop.payload_format_indicator) +
870+
uint32_property_length(param->prop.message_expiry_interval) +
871+
uint16_property_length(param->prop.topic_alias) +
872+
string_property_length(&param->prop.response_topic) +
873+
binary_property_length(&param->prop.correlation_data) +
874+
user_properties_length(param->prop.user_prop) +
875+
/* Client does not include Subscription Identifier in any case. */
876+
string_property_length(&param->prop.content_type);
877+
}
878+
879+
static int publish_properties_encode(const struct mqtt_publish_param *param,
880+
struct buf_ctx *buf)
881+
{
882+
uint32_t properties_len;
883+
int err;
884+
885+
/* Precalculate total properties length */
886+
properties_len = publish_properties_length(param);
887+
err = pack_variable_int(properties_len, buf);
888+
if (err < 0) {
889+
return err;
890+
}
891+
892+
err = encode_uint8_property(MQTT_PROP_PAYLOAD_FORMAT_INDICATOR,
893+
param->prop.payload_format_indicator, buf);
894+
if (err < 0) {
895+
return err;
896+
}
897+
898+
err = encode_uint32_property(MQTT_PROP_MESSAGE_EXPIRY_INTERVAL,
899+
param->prop.message_expiry_interval, buf);
900+
if (err < 0) {
901+
return err;
902+
}
903+
904+
err = encode_uint16_property(MQTT_PROP_TOPIC_ALIAS,
905+
param->prop.topic_alias, buf);
906+
if (err < 0) {
907+
return err;
908+
}
909+
910+
err = encode_string_property(MQTT_PROP_RESPONSE_TOPIC,
911+
&param->prop.response_topic, buf);
912+
if (err < 0) {
913+
return err;
914+
}
915+
916+
err = encode_binary_property(MQTT_PROP_CORRELATION_DATA,
917+
&param->prop.correlation_data, buf);
918+
if (err < 0) {
919+
return err;
920+
}
921+
922+
err = encode_user_properties(param->prop.user_prop, buf);
923+
if (err < 0) {
924+
return err;
925+
}
926+
927+
/* Client does not include Subscription Identifier in any case. */
928+
929+
err = encode_string_property(MQTT_PROP_CONTENT_TYPE,
930+
&param->prop.content_type, buf);
931+
if (err < 0) {
932+
return err;
933+
}
934+
935+
return 0;
936+
}
937+
#else
938+
static int publish_properties_encode(const struct mqtt_publish_param *param,
939+
struct buf_ctx *buf)
940+
{
941+
ARG_UNUSED(param);
942+
ARG_UNUSED(buf);
943+
944+
return -ENOTSUP;
945+
}
946+
#endif /* CONFIG_MQTT_VERSION_5_0 */
947+
948+
int publish_encode(const struct mqtt_client *client,
949+
const struct mqtt_publish_param *param,
950+
struct buf_ctx *buf)
866951
{
867952
const uint8_t message_type = MQTT_MESSAGES_OPTIONS(
868953
MQTT_PKT_TYPE_PUBLISH, param->dup_flag,
@@ -891,6 +976,13 @@ int publish_encode(const struct mqtt_publish_param *param, struct buf_ctx *buf)
891976
}
892977
}
893978

979+
if (mqtt_is_version_5_0(client)) {
980+
err_code = publish_properties_encode(param, buf);
981+
if (err_code != 0) {
982+
return err_code;
983+
}
984+
}
985+
894986
/* Do not copy payload. We move the buffer pointer to ensure that
895987
* message length in fixed header is encoded correctly.
896988
*/

0 commit comments

Comments
 (0)