Skip to content

Commit c65c99f

Browse files
Deomid Ryabkovcesantabot
authored andcommitted
Azure IoT Hub support; MQTT refactoring
Also includes MQTT lib refactoring to support runtime reconfiguration. CL: Azure IoT Hub support; MQTT refactoring PUBLISHED_FROM=ed1f7b6ebb269d1d46e4384a8d79620e6fe9aeb1
1 parent 0ff92f3 commit c65c99f

File tree

2 files changed

+110
-43
lines changed

2 files changed

+110
-43
lines changed

include/mgos_mqtt.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "mgos_features.h"
3131
#include "mgos_init.h"
3232
#include "mgos_mongoose.h"
33+
#include "mgos_sys_config.h"
3334

3435
#ifdef __cplusplus
3536
extern "C" {
@@ -116,6 +117,11 @@ uint16_t mgos_mqtt_get_packet_id(void);
116117
*/
117118
void mgos_mqtt_set_max_qos(int qos);
118119

120+
/*
121+
* (Re)configure MQTT.
122+
*/
123+
bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg);
124+
119125
#ifdef __cplusplus
120126
}
121127
#endif /* __cplusplus */

src/mgos_mqtt.c

Lines changed: 104 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,21 @@ static struct mg_connection *s_conn = NULL;
6161
static bool s_connected = false;
6262
static mgos_mqtt_connect_fn_t s_connect_fn = NULL;
6363
static void *s_connect_fn_arg = NULL;
64-
static int s_max_qos = 2;
64+
static struct mgos_config_mqtt *s_cfg = NULL;
6565

6666
SLIST_HEAD(topic_handlers, topic_handler) s_topic_handlers;
6767
SLIST_HEAD(global_handlers, global_handler) s_global_handlers;
6868

6969
static void mqtt_global_reconnect(void);
7070

7171
void mgos_mqtt_set_max_qos(int qos) {
72-
if (s_max_qos == qos) return;
72+
if (s_cfg == NULL || s_cfg->max_qos == qos) return;
7373
LOG(LL_INFO, ("Setting max MQTT QOS to %d", qos));
74-
s_max_qos = qos;
74+
s_cfg->max_qos = qos;
7575
}
7676

7777
static int adjust_qos(int qos) {
78-
return s_max_qos < qos ? s_max_qos : qos;
78+
return s_cfg != NULL && s_cfg->max_qos < qos ? s_cfg->max_qos : qos;
7979
}
8080

8181
uint16_t mgos_mqtt_get_packet_id(void) {
@@ -134,18 +134,17 @@ static void mgos_mqtt_ev(struct mg_connection *nc, int ev, void *ev_data,
134134
if (status != 0) break;
135135
struct mg_send_mqtt_handshake_opts opts;
136136
memset(&opts, 0, sizeof(opts));
137-
// char *cb_client_id = NULL, *cb_user = NULL, *cb_pass = NULL;
138-
opts.user_name = mgos_sys_config_get_mqtt_user();
139-
opts.password = mgos_sys_config_get_mqtt_pass();
140-
if (mgos_sys_config_get_mqtt_clean_session()) {
137+
opts.user_name = s_cfg->user;
138+
opts.password = s_cfg->pass;
139+
if (s_cfg->clean_session) {
141140
opts.flags |= MG_MQTT_CLEAN_SESSION;
142141
}
143-
opts.keep_alive = mgos_sys_config_get_mqtt_keep_alive();
144-
opts.will_topic = mgos_sys_config_get_mqtt_will_topic();
145-
opts.will_message = mgos_sys_config_get_mqtt_will_message();
146-
const char *client_id = (mgos_sys_config_get_mqtt_client_id() != NULL
147-
? mgos_sys_config_get_mqtt_client_id()
148-
: mgos_sys_config_get_device_id());
142+
opts.keep_alive = s_cfg->keep_alive;
143+
opts.will_topic = s_cfg->will_topic;
144+
opts.will_message = s_cfg->will_message;
145+
const char *client_id =
146+
(s_cfg->client_id != NULL ? s_cfg->client_id
147+
: mgos_sys_config_get_device_id());
149148
if (s_connect_fn != NULL) {
150149
s_connect_fn(nc, client_id, &opts, s_connect_fn_arg);
151150
} else {
@@ -265,6 +264,78 @@ static void s_debug_write_cb(int ev, void *ev_data, void *userdata) {
265264
(void) userdata;
266265
}
267266

267+
static void mgos_mqtt_free_config(struct mgos_config_mqtt *cfg) {
268+
if (cfg == NULL) return;
269+
free(cfg->server);
270+
free(cfg->client_id);
271+
free(cfg->user);
272+
free(cfg->pass);
273+
free(cfg->ssl_cert);
274+
free(cfg->ssl_key);
275+
free(cfg->ssl_ca_cert);
276+
free(cfg->ssl_cipher_suites);
277+
free(cfg->will_topic);
278+
free(cfg->will_message);
279+
memset(cfg, 0, sizeof(*cfg));
280+
}
281+
282+
bool mgos_mqtt_set_config(const struct mgos_config_mqtt *cfg) {
283+
bool ret = false;
284+
struct mgos_config_mqtt *new_cfg = NULL;
285+
if (!cfg->enable) {
286+
ret = true;
287+
goto out;
288+
}
289+
if (cfg->server == NULL) {
290+
LOG(LL_ERROR, ("MQTT requires server name"));
291+
goto out;
292+
}
293+
new_cfg = (struct mgos_config_mqtt *) calloc(1, sizeof(*new_cfg));
294+
if (new_cfg == NULL) goto out;
295+
new_cfg->enable = cfg->enable;
296+
if (strchr(cfg->server, ':') == NULL) {
297+
int port = (cfg->ssl_ca_cert != NULL ? 8883 : 1883);
298+
mg_asprintf(&new_cfg->server, 0, "%s:%d", cfg->server, port);
299+
if (new_cfg->server == NULL) goto out;
300+
} else {
301+
new_cfg->server = strdup(cfg->server);
302+
}
303+
if (cfg->client_id) new_cfg->client_id = strdup(cfg->client_id);
304+
if (cfg->user) new_cfg->user = strdup(cfg->user);
305+
if (cfg->pass) new_cfg->pass = strdup(cfg->pass);
306+
new_cfg->reconnect_timeout_min = cfg->reconnect_timeout_min;
307+
new_cfg->reconnect_timeout_max = cfg->reconnect_timeout_max;
308+
if (cfg->ssl_cert) new_cfg->ssl_cert = strdup(cfg->ssl_cert);
309+
if (cfg->ssl_key) new_cfg->ssl_key = strdup(cfg->ssl_key);
310+
if (cfg->ssl_ca_cert) new_cfg->ssl_ca_cert = strdup(cfg->ssl_ca_cert);
311+
if (cfg->ssl_cipher_suites)
312+
new_cfg->ssl_cipher_suites = strdup(cfg->ssl_cipher_suites);
313+
if (cfg->ssl_psk_identity)
314+
new_cfg->ssl_psk_identity = strdup(cfg->ssl_psk_identity);
315+
if (cfg->ssl_psk_key) new_cfg->ssl_psk_key = strdup(cfg->ssl_psk_key);
316+
new_cfg->clean_session = cfg->clean_session;
317+
new_cfg->keep_alive = cfg->keep_alive;
318+
if (cfg->will_topic) new_cfg->will_topic = strdup(cfg->will_topic);
319+
if (cfg->will_message) new_cfg->will_message = strdup(cfg->will_message);
320+
new_cfg->max_qos = cfg->max_qos;
321+
new_cfg->recv_mbuf_limit = cfg->recv_mbuf_limit;
322+
323+
ret = true;
324+
325+
out:
326+
if (ret) {
327+
s_cfg = new_cfg;
328+
if (s_conn != NULL) {
329+
s_conn->flags |= MG_F_CLOSE_IMMEDIATELY;
330+
s_conn = NULL;
331+
}
332+
} else {
333+
mgos_mqtt_free_config(new_cfg);
334+
free(new_cfg);
335+
}
336+
return ret;
337+
}
338+
268339
bool mgos_mqtt_init(void) {
269340
if (mgos_sys_config_get_debug_stdout_topic() != NULL) {
270341
char *stdout_topic = strdup(mgos_sys_config_get_debug_stdout_topic());
@@ -279,49 +350,38 @@ bool mgos_mqtt_init(void) {
279350
free(stderr_topic);
280351
}
281352

282-
if (!mgos_sys_config_get_mqtt_enable()) return true;
283-
if (mgos_sys_config_get_mqtt_server() == NULL) {
284-
LOG(LL_ERROR, ("MQTT requires server name"));
285-
return false;
286-
}
287353
mgos_event_add_group_handler(MGOS_EVENT_GRP_NET, mgos_mqtt_net_ev, NULL);
288-
289-
mgos_mqtt_set_max_qos(mgos_sys_config_get_mqtt_max_qos());
290-
291354
mgos_event_add_handler(MGOS_EVENT_LOG, s_debug_write_cb, NULL);
292355

293-
return true;
356+
return mgos_mqtt_set_config(mgos_sys_config_get_mqtt());
294357
}
295358

296359
bool mgos_mqtt_global_connect(void) {
297360
bool ret = true;
298361
struct mg_mgr *mgr = mgos_get_mgr();
299362
struct mg_connect_opts opts;
300363

301-
/* If we're already connected, do nothing */
302-
if (s_conn != NULL) return ret;
364+
if (s_cfg == NULL || !s_cfg->enable) return false;
303365

304-
if (!mgos_sys_config_get_mqtt_enable()) {
305-
return false;
306-
}
366+
/* If we're already connected, do nothing */
367+
if (s_conn != NULL) return true;
307368

308-
LOG(LL_INFO, ("MQTT connecting to %s", mgos_sys_config_get_mqtt_server()));
309369
memset(&opts, 0, sizeof(opts));
310370
#if MG_ENABLE_SSL
311-
opts.ssl_cert = mgos_sys_config_get_mqtt_ssl_cert();
312-
opts.ssl_key = mgos_sys_config_get_mqtt_ssl_key();
313-
opts.ssl_ca_cert = mgos_sys_config_get_mqtt_ssl_ca_cert();
314-
opts.ssl_cipher_suites = mgos_sys_config_get_mqtt_ssl_cipher_suites();
315-
opts.ssl_psk_identity = mgos_sys_config_get_mqtt_ssl_psk_identity();
316-
opts.ssl_psk_key = mgos_sys_config_get_mqtt_ssl_psk_key();
371+
opts.ssl_cert = s_cfg->ssl_cert;
372+
opts.ssl_key = s_cfg->ssl_key;
373+
opts.ssl_ca_cert = s_cfg->ssl_ca_cert;
374+
opts.ssl_cipher_suites = s_cfg->ssl_cipher_suites;
375+
opts.ssl_psk_identity = s_cfg->ssl_psk_identity;
376+
opts.ssl_psk_key = s_cfg->ssl_psk_key;
317377
#endif
378+
LOG(LL_INFO, ("MQTT connecting to %s", s_cfg->server));
318379

319380
s_connected = false;
320-
s_conn = mg_connect_opt(mgr, mgos_sys_config_get_mqtt_server(), mgos_mqtt_ev,
321-
NULL, opts);
381+
s_conn = mg_connect_opt(mgr, s_cfg->server, mgos_mqtt_ev, NULL, opts);
322382
if (s_conn != NULL) {
323383
mg_set_protocol_mqtt(s_conn);
324-
s_conn->recv_mbuf_limit = mgos_sys_config_get_mqtt_recv_mbuf_limit();
384+
s_conn->recv_mbuf_limit = s_cfg->recv_mbuf_limit;
325385
} else {
326386
ret = false;
327387
}
@@ -338,15 +398,16 @@ static void reconnect_timer_cb(void *user_data) {
338398

339399
static void mqtt_global_reconnect(void) {
340400
int rt_ms;
401+
if (s_cfg == NULL || s_cfg->server == NULL) return;
402+
341403
if (s_reconnect_timeout_ms <= 0) s_reconnect_timeout_ms = 1;
342404
rt_ms = s_reconnect_timeout_ms * 2;
343-
if (mgos_sys_config_get_mqtt_server() == NULL) return;
344405

345-
if (rt_ms < mgos_sys_config_get_mqtt_reconnect_timeout_min() * 1000) {
346-
rt_ms = mgos_sys_config_get_mqtt_reconnect_timeout_min() * 1000;
406+
if (rt_ms < s_cfg->reconnect_timeout_min * 1000) {
407+
rt_ms = s_cfg->reconnect_timeout_min * 1000;
347408
}
348-
if (rt_ms > mgos_sys_config_get_mqtt_reconnect_timeout_max() * 1000) {
349-
rt_ms = mgos_sys_config_get_mqtt_reconnect_timeout_max() * 1000;
409+
if (rt_ms > s_cfg->reconnect_timeout_max * 1000) {
410+
rt_ms = s_cfg->reconnect_timeout_max * 1000;
350411
}
351412
/* Fuzz the time a little. */
352413
rt_ms = (int) mgos_rand_range(rt_ms * 0.9, rt_ms * 1.1);

0 commit comments

Comments
 (0)