Skip to content

Commit f07f0d2

Browse files
M1chakartben
authored andcommitted
net: mqtt_sn: Implement updating will topic and message
Since the protocol doesn't have message IDs in the responses to these update messages, there's no reliable way to know, if an update succeeded or not. I use that fact to simplify the implementation by: - Not providing success/failure callbacks. - Not handling updating the variables in the client struct while an update is in progess. In addition to adding some tests, I tested this with the emqx server. Signed-off-by: Michael Zimmermann <[email protected]>
1 parent d4fddc2 commit f07f0d2

File tree

4 files changed

+336
-2
lines changed

4 files changed

+336
-2
lines changed

doc/connectivity/networking/api/mqtt_sn.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ Deviations from the standard
149149
Certain parts of the protocol are not yet supported in the library.
150150

151151
* QoS -1 - it's most useful with predefined topics
152-
* Setting the will topic and message after the initial connect
153152
* Forwarder Encapsulation
154153

155154
.. _mqtt_sn_api_reference:

include/zephyr/net/mqtt_sn.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,20 @@ int mqtt_sn_transport_udp_init(struct mqtt_sn_transport_udp *udp, struct sockadd
247247
socklen_t addrlen);
248248
#endif
249249

250+
/**
251+
* Structure for storing will update state.
252+
*/
253+
struct mqtt_sn_will_update {
254+
/** An update message needs to be send */
255+
bool in_progress;
256+
257+
/** Number of retries for failed update attempts */
258+
uint8_t retries;
259+
260+
/** Timestamp of the last update attempt */
261+
int64_t last_attempt;
262+
};
263+
250264
/**
251265
* Structure describing an MQTT-SN client.
252266
*/
@@ -315,6 +329,12 @@ struct mqtt_sn_client {
315329
/** Radius of the next GWINFO transmission */
316330
int64_t radius_gwinfo;
317331

332+
/** State for will topic updates */
333+
struct mqtt_sn_will_update will_topic_update;
334+
335+
/** State for will message updates */
336+
struct mqtt_sn_will_update will_message_update;
337+
318338
/** Delayable work structure for processing MQTT-SN events */
319339
struct k_work_delayable process_work;
320340
};
@@ -481,6 +501,40 @@ int mqtt_sn_get_topic_name(struct mqtt_sn_client *client, uint16_t id,
481501
int mqtt_sn_predefine_topic(struct mqtt_sn_client *client, uint16_t topic_id,
482502
struct mqtt_sn_data *topic_name);
483503

504+
/**
505+
* @brief Send a will topic update to the server.
506+
*
507+
* Call this to send the will topic stored in client->will_topic to the server.
508+
* Should be used if you changed the value after connecting. The variables
509+
* client->will_retain and client->will_qos will also be sent as part of the
510+
* update.
511+
*
512+
* @warning Since there is no message ID in the will topic update message,
513+
* this can't be done without race conditions. Contribute to newer
514+
* versions of the specification if you want this to be changed.
515+
*
516+
* @param[in] client The MQTT-SN client to use for sending the update.
517+
*
518+
* @return 0 or a negative error code (errno.h) indicating reason of failure.
519+
*/
520+
int mqtt_sn_update_will_topic(struct mqtt_sn_client *client);
521+
522+
/**
523+
* @brief Send a will message update to the server.
524+
*
525+
* Call this to send the will message stored in client->will_msg to the server.
526+
* Should be used if you changed the value after connecting.
527+
*
528+
* @warning Since there is no message ID in the will message update message,
529+
* this can't be done without race conditions. Contribute to newer
530+
* versions of the specification if you want this to be changed.
531+
*
532+
* @param[in] client The MQTT-SN client to use for sending the update.
533+
*
534+
* @return 0 or a negative error code (errno.h) indicating reason of failure.
535+
*/
536+
int mqtt_sn_update_will_message(struct mqtt_sn_client *client);
537+
484538
#ifdef __cplusplus
485539
}
486540
#endif

subsys/net/lib/mqtt_sn/mqtt_sn.c

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,128 @@ static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
643643
}
644644
}
645645

646+
static void mqtt_sn_do_will_topic_update(struct mqtt_sn_client *client)
647+
{
648+
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_WILLTOPICUPD};
649+
650+
if (client == NULL) {
651+
return;
652+
}
653+
654+
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
655+
LOG_ERR("Cannot update will topic: not connected");
656+
return;
657+
}
658+
659+
LOG_INF("Updating will topic");
660+
661+
p.params.willtopicupd.topic.data = client->will_topic.data;
662+
p.params.willtopicupd.topic.size = client->will_topic.size;
663+
p.params.willtopicupd.retain = client->will_retain;
664+
p.params.willtopicupd.qos = client->will_qos;
665+
666+
encode_and_send(client, &p, 0);
667+
}
668+
669+
static void mqtt_sn_do_will_message_update(struct mqtt_sn_client *client)
670+
{
671+
struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_WILLMSGUPD};
672+
673+
if (client == NULL) {
674+
return;
675+
}
676+
677+
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
678+
LOG_ERR("Cannot update will message: not connected");
679+
return;
680+
}
681+
682+
LOG_INF("Updating will message");
683+
684+
p.params.willmsgupd.msg.data = client->will_msg.data;
685+
p.params.willmsgupd.msg.size = client->will_msg.size;
686+
687+
encode_and_send(client, &p, 0);
688+
}
689+
690+
static int process_will_topic_update(struct mqtt_sn_client *client, int64_t *next_cycle)
691+
{
692+
const int64_t now = k_uptime_get();
693+
int64_t next_attempt;
694+
695+
if (!client->will_topic_update.in_progress) {
696+
return 0;
697+
}
698+
699+
if (now == 0) {
700+
next_attempt = 1;
701+
} else if (client->will_topic_update.last_attempt == 0) {
702+
next_attempt = 0;
703+
} else {
704+
next_attempt = client->will_topic_update.last_attempt + T_RETRY_MSEC;
705+
}
706+
707+
if (next_attempt <= now) {
708+
if (client->will_topic_update.retries-- == 0) {
709+
LOG_WRN("Will topic update ran out of retries");
710+
client->will_topic_update.in_progress = false;
711+
mqtt_sn_disconnect_internal(client);
712+
return -ETIMEDOUT;
713+
}
714+
715+
LOG_DBG("Sending WILLTOPICUPD");
716+
mqtt_sn_do_will_topic_update(client);
717+
client->will_topic_update.last_attempt = now;
718+
next_attempt = now + T_RETRY_MSEC;
719+
}
720+
721+
if (*next_cycle == 0 || next_attempt < *next_cycle) {
722+
*next_cycle = next_attempt;
723+
}
724+
LOG_DBG("next_cycle: %lld", *next_cycle);
725+
726+
return 0;
727+
}
728+
729+
static int process_will_message_update(struct mqtt_sn_client *client, int64_t *next_cycle)
730+
{
731+
const int64_t now = k_uptime_get();
732+
int64_t next_attempt;
733+
734+
if (!client->will_message_update.in_progress) {
735+
return 0;
736+
}
737+
738+
if (now == 0) {
739+
next_attempt = 1;
740+
} else if (client->will_message_update.last_attempt == 0) {
741+
next_attempt = 0;
742+
} else {
743+
next_attempt = client->will_message_update.last_attempt + T_RETRY_MSEC;
744+
}
745+
746+
if (next_attempt <= now) {
747+
if (client->will_message_update.retries-- == 0) {
748+
LOG_WRN("Will message update ran out of retries");
749+
client->will_message_update.in_progress = false;
750+
mqtt_sn_disconnect_internal(client);
751+
return -ETIMEDOUT;
752+
}
753+
754+
LOG_DBG("Sending WILLMSGUPD");
755+
mqtt_sn_do_will_message_update(client);
756+
client->will_message_update.last_attempt = now;
757+
next_attempt = now + T_RETRY_MSEC;
758+
}
759+
760+
if (*next_cycle == 0 || next_attempt < *next_cycle) {
761+
*next_cycle = next_attempt;
762+
}
763+
LOG_DBG("next_cycle: %lld", *next_cycle);
764+
765+
return 0;
766+
}
767+
646768
/**
647769
* @brief Process all publish tasks in the queue.
648770
*
@@ -997,6 +1119,16 @@ static void process_work(struct k_work *wrk)
9971119
}
9981120

9991121
if (client->state == MQTT_SN_CLIENT_ACTIVE) {
1122+
err = process_will_topic_update(client, &next_cycle);
1123+
if (err) {
1124+
return;
1125+
}
1126+
1127+
err = process_will_message_update(client, &next_cycle);
1128+
if (err) {
1129+
return;
1130+
}
1131+
10001132
err = process_topics(client, &next_cycle);
10011133
if (err) {
10021134
return;
@@ -1599,6 +1731,35 @@ static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_para
15991731
mqtt_sn_disconnect_internal(client);
16001732
}
16011733

1734+
static void handle_willtopicresp(struct mqtt_sn_client *client,
1735+
struct mqtt_sn_param_willtopicresp *p)
1736+
{
1737+
if (!client->will_topic_update.in_progress) {
1738+
LOG_ERR("There's no will topic update in progress");
1739+
return;
1740+
}
1741+
1742+
if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1743+
client->will_topic_update.in_progress = false;
1744+
} else {
1745+
LOG_WRN("WILLTOPICRESP with ret code %d", p->ret_code);
1746+
}
1747+
}
1748+
1749+
static void handle_willmsgresp(struct mqtt_sn_client *client, struct mqtt_sn_param_willmsgresp *p)
1750+
{
1751+
if (!client->will_message_update.in_progress) {
1752+
LOG_ERR("There's no will message update in progress");
1753+
return;
1754+
}
1755+
1756+
if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1757+
client->will_message_update.in_progress = false;
1758+
} else {
1759+
LOG_WRN("WILLMSGRESP with ret code %d", p->ret_code);
1760+
}
1761+
}
1762+
16021763
static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
16031764
{
16041765
int err;
@@ -1667,8 +1828,10 @@ static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr
16671828
handle_disconnect(client, &p.params.disconnect);
16681829
break;
16691830
case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1831+
handle_willtopicresp(client, &p.params.willtopicresp);
16701832
break;
16711833
case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1834+
handle_willmsgresp(client, &p.params.willmsgresp);
16721835
break;
16731836
default:
16741837
LOG_ERR("Unexpected message type %d", p.type);
@@ -1775,3 +1938,37 @@ int mqtt_sn_predefine_topic(struct mqtt_sn_client *client, uint16_t topic_id,
17751938
return 0;
17761939

17771940
}
1941+
1942+
static int attempt_will_update(struct mqtt_sn_client *client, struct mqtt_sn_will_update *state)
1943+
{
1944+
int err;
1945+
1946+
if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1947+
return -ENOTCONN;
1948+
}
1949+
1950+
if (state->in_progress) {
1951+
return -EALREADY;
1952+
}
1953+
1954+
state->retries = N_RETRY;
1955+
state->last_attempt = 0;
1956+
state->in_progress = true;
1957+
1958+
err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1959+
if (err < 0) {
1960+
return err;
1961+
}
1962+
1963+
return 0;
1964+
}
1965+
1966+
int mqtt_sn_update_will_topic(struct mqtt_sn_client *client)
1967+
{
1968+
return attempt_will_update(client, &client->will_topic_update);
1969+
}
1970+
1971+
int mqtt_sn_update_will_message(struct mqtt_sn_client *client)
1972+
{
1973+
return attempt_will_update(client, &client->will_message_update);
1974+
}

0 commit comments

Comments
 (0)