Skip to content

Commit 6148ab5

Browse files
committed
Update new_mqtt.c
1 parent 8d83b41 commit 6148ab5

File tree

1 file changed

+178
-13
lines changed

1 file changed

+178
-13
lines changed

src/mqtt/new_mqtt.c

Lines changed: 178 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,171 @@
1919
#include <math.h>
2020
#ifndef WINDOWS
2121
#include <lwip/dns.h>
22+
23+
#if defined(PLATFORM_W600) && (LWIP_TCPIP_CORE_LOCKING == 0)
24+
#include "lwip/priv/tcpip_priv.h"
25+
26+
/*
27+
* OpenW600 lwIP is built with LWIP_TCPIP_CORE_LOCKING=0 (no core mutex).
28+
* The lwIP DNS + MQTT APIs used by OpenBeken here are RAW API and must run on the tcpip thread.
29+
* Calling them from the OpenBeken main thread can corrupt lwIP state and reset the chip with no trace.
30+
*
31+
* For W600 only, marshal selected calls onto the tcpip thread via tcpip_api_call() (synchronous).
32+
*/
33+
34+
struct obk_w600_dns_call {
35+
struct tcpip_api_call_data call;
36+
const char* hostname;
37+
ip_addr_t* addr;
38+
dns_found_callback found;
39+
void* arg;
40+
u8_t addrtype;
41+
};
42+
static err_t obk_w600_dns_api(struct tcpip_api_call_data* c) {
43+
struct obk_w600_dns_call* d = (struct obk_w600_dns_call*)c;
44+
return dns_gethostbyname_addrtype(d->hostname, d->addr, d->found, d->arg, d->addrtype);
45+
}
46+
static err_t obk_w600_dns_gethostbyname_addrtype(const char* hostname, ip_addr_t* addr,
47+
dns_found_callback found, void* arg, u8_t addrtype)
48+
{
49+
struct obk_w600_dns_call d;
50+
memset(&d, 0, sizeof(d));
51+
d.hostname = hostname;
52+
d.addr = addr;
53+
d.found = found;
54+
d.arg = arg;
55+
d.addrtype = addrtype;
56+
return tcpip_api_call(obk_w600_dns_api, &d.call);
57+
}
58+
59+
struct obk_w600_mqtt_connect_call {
60+
struct tcpip_api_call_data call;
61+
mqtt_client_t* client;
62+
const ip_addr_t* ipaddr;
63+
u16_t port;
64+
mqtt_connection_cb_t cb;
65+
void* arg;
66+
const struct mqtt_connect_client_info_t* info;
67+
};
68+
static err_t obk_w600_mqtt_client_connect_api(struct tcpip_api_call_data* c) {
69+
struct obk_w600_mqtt_connect_call* d = (struct obk_w600_mqtt_connect_call*)c;
70+
return mqtt_client_connect(d->client, d->ipaddr, d->port, d->cb, d->arg, d->info);
71+
}
72+
static err_t obk_w600_mqtt_client_connect(mqtt_client_t* client, const ip_addr_t* ipaddr, u16_t port,
73+
mqtt_connection_cb_t cb, void* arg, const struct mqtt_connect_client_info_t* info)
74+
{
75+
struct obk_w600_mqtt_connect_call d;
76+
memset(&d, 0, sizeof(d));
77+
d.client = client;
78+
d.ipaddr = ipaddr;
79+
d.port = port;
80+
d.cb = cb;
81+
d.arg = arg;
82+
d.info = info;
83+
return tcpip_api_call(obk_w600_mqtt_client_connect_api, &d.call);
84+
}
85+
86+
struct obk_w600_mqtt_publish_call {
87+
struct tcpip_api_call_data call;
88+
mqtt_client_t* client;
89+
const char* topic;
90+
const void* payload;
91+
u16_t payload_len;
92+
u8_t qos;
93+
u8_t retain;
94+
mqtt_request_cb_t cb;
95+
void* arg;
96+
};
97+
static err_t obk_w600_mqtt_publish_api(struct tcpip_api_call_data* c) {
98+
struct obk_w600_mqtt_publish_call* d = (struct obk_w600_mqtt_publish_call*)c;
99+
return mqtt_publish(d->client, d->topic, d->payload, d->payload_len, d->qos, d->retain, d->cb, d->arg);
100+
}
101+
static err_t obk_w600_mqtt_publish(mqtt_client_t* client, const char* topic, const void* payload, u16_t payload_len,
102+
u8_t qos, u8_t retain, mqtt_request_cb_t cb, void* arg)
103+
{
104+
struct obk_w600_mqtt_publish_call d;
105+
memset(&d, 0, sizeof(d));
106+
d.client = client;
107+
d.topic = topic;
108+
d.payload = payload;
109+
d.payload_len = payload_len;
110+
d.qos = qos;
111+
d.retain = retain;
112+
d.cb = cb;
113+
d.arg = arg;
114+
return tcpip_api_call(obk_w600_mqtt_publish_api, &d.call);
115+
}
116+
117+
struct obk_w600_mqtt_subunsub_call {
118+
struct tcpip_api_call_data call;
119+
mqtt_client_t* client;
120+
const char* topic;
121+
u8_t qos;
122+
mqtt_request_cb_t cb;
123+
void* arg;
124+
u8_t sub;
125+
};
126+
static err_t obk_w600_mqtt_sub_unsub_api(struct tcpip_api_call_data* c) {
127+
struct obk_w600_mqtt_subunsub_call* d = (struct obk_w600_mqtt_subunsub_call*)c;
128+
return mqtt_sub_unsub(d->client, d->topic, d->qos, d->cb, d->arg, d->sub);
129+
}
130+
static err_t obk_w600_mqtt_sub_unsub(mqtt_client_t* client, const char* topic, u8_t qos,
131+
mqtt_request_cb_t cb, void* arg, u8_t sub)
132+
{
133+
struct obk_w600_mqtt_subunsub_call d;
134+
memset(&d, 0, sizeof(d));
135+
d.client = client;
136+
d.topic = topic;
137+
d.qos = qos;
138+
d.cb = cb;
139+
d.arg = arg;
140+
d.sub = sub;
141+
return tcpip_api_call(obk_w600_mqtt_sub_unsub_api, &d.call);
142+
}
143+
144+
struct obk_w600_mqtt_disconnect_call {
145+
struct tcpip_api_call_data call;
146+
mqtt_client_t* client;
147+
};
148+
static err_t obk_w600_mqtt_disconnect_api(struct tcpip_api_call_data* c) {
149+
struct obk_w600_mqtt_disconnect_call* d = (struct obk_w600_mqtt_disconnect_call*)c;
150+
mqtt_disconnect(d->client);
151+
return ERR_OK;
152+
}
153+
static void obk_w600_mqtt_disconnect(mqtt_client_t* client)
154+
{
155+
struct obk_w600_mqtt_disconnect_call d;
156+
memset(&d, 0, sizeof(d));
157+
d.client = client;
158+
(void)tcpip_api_call(obk_w600_mqtt_disconnect_api, &d.call);
159+
}
160+
161+
#define OBK_LWIP_DNS_GETHOSTBYNAME_ADDRTYPE(hostname, addr, found, arg, addrtype) \
162+
obk_w600_dns_gethostbyname_addrtype((hostname), (addr), (found), (arg), (addrtype))
163+
#define OBK_LWIP_MQTT_CLIENT_CONNECT(client, ipaddr, port, cb, arg, info) \
164+
obk_w600_mqtt_client_connect((client), (ipaddr), (port), (cb), (arg), (info))
165+
#define OBK_LWIP_MQTT_PUBLISH(client, topic, payload, payload_len, qos, retain, cb, arg) \
166+
obk_w600_mqtt_publish((client), (topic), (payload), (payload_len), (qos), (retain), (cb), (arg))
167+
#define OBK_LWIP_MQTT_SUB_UNSUB(client, topic, qos, cb, arg, sub) \
168+
obk_w600_mqtt_sub_unsub((client), (topic), (qos), (cb), (arg), (sub))
169+
#define OBK_LWIP_MQTT_DISCONNECT(client) \
170+
obk_w600_mqtt_disconnect((client))
171+
172+
#else
173+
174+
#define OBK_LWIP_DNS_GETHOSTBYNAME_ADDRTYPE(hostname, addr, found, arg, addrtype) \
175+
dns_gethostbyname_addrtype((hostname), (addr), (found), (arg), (addrtype))
176+
#define OBK_LWIP_MQTT_CLIENT_CONNECT(client, ipaddr, port, cb, arg, info) \
177+
mqtt_client_connect((client), (ipaddr), (port), (cb), (arg), (info))
178+
#define OBK_LWIP_MQTT_PUBLISH(client, topic, payload, payload_len, qos, retain, cb, arg) \
179+
mqtt_publish((client), (topic), (payload), (payload_len), (qos), (retain), (cb), (arg))
180+
#define OBK_LWIP_MQTT_SUB_UNSUB(client, topic, qos, cb, arg, sub) \
181+
mqtt_sub_unsub((client), (topic), (qos), (cb), (arg), (sub))
182+
#define OBK_LWIP_MQTT_DISCONNECT(client) \
183+
mqtt_disconnect((client))
184+
185+
#endif
186+
22187
#endif
23188

24189
#define BUILD_AND_VERSION_FOR_MQTT "Open" PLATFORM_MCU_NAME " " USER_SW_VER " " __DATE__ " " __TIME__
@@ -838,7 +1003,7 @@ static void MQTT_disconnect(mqtt_client_t* client)
8381003
return;
8391004
// this is what it was renamed to. why?
8401005
LOCK_TCPIP_CORE();
841-
mqtt_disconnect(client);
1006+
OBK_LWIP_MQTT_DISCONNECT(client);
8421007
UNLOCK_TCPIP_CORE();
8431008

8441009
}
@@ -937,7 +1102,7 @@ static OBK_Publish_Result MQTT_PublishTopicToClient(mqtt_client_t* client, const
9371102

9381103

9391104
LOCK_TCPIP_CORE();
940-
err = mqtt_publish(client, pub_topic, sVal, strlen(sVal), qos, retain, mqtt_pub_request_cb, 0);
1105+
err = OBK_LWIP_MQTT_PUBLISH(client, pub_topic, sVal, strlen(sVal), qos, retain, mqtt_pub_request_cb, 0);
9411106
UNLOCK_TCPIP_CORE();
9421107
os_free(pub_topic);
9431108

@@ -1152,7 +1317,7 @@ static void mqtt_connection_cb(mqtt_client_t* client, void* arg, mqtt_connection
11521317
for (i = 0; i < numCallbacks; i++) {
11531318
if (callbacks[i]) {
11541319
if (callbacks[i]->subscriptionTopic && callbacks[i]->subscriptionTopic[0]) {
1155-
err = mqtt_sub_unsub(client,
1320+
err = OBK_LWIP_MQTT_SUB_UNSUB(client,
11561321
callbacks[i]->subscriptionTopic, 1,
11571322
mqtt_request_cb, LWIP_CONST_CAST(void*, client_info),
11581323
1);
@@ -1170,7 +1335,7 @@ static void mqtt_connection_cb(mqtt_client_t* client, void* arg, mqtt_connection
11701335

11711336
snprintf(tmp, sizeof(tmp), "%s/connected", clientId);
11721337
//LOCK_TCPIP_CORE();
1173-
err = mqtt_publish(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0);
1338+
err = OBK_LWIP_MQTT_PUBLISH(client, tmp, "online", strlen("online"), 2, true, mqtt_pub_request_cb, 0);
11741339
//UNLOCK_TCPIP_CORE();
11751340
if (err != ERR_OK) {
11761341
addLogAdv(LOG_ERROR, LOG_FEATURE_MQTT, "Publish err: %d\n", err);
@@ -1181,11 +1346,11 @@ static void mqtt_connection_cb(mqtt_client_t* client, void* arg, mqtt_connection
11811346

11821347
g_just_connected = 1;
11831348

1184-
//mqtt_sub_unsub(client,
1349+
//OBK_LWIP_MQTT_SUB_UNSUB(client,
11851350
// "topic_qos1", 1,
11861351
// mqtt_request_cb, LWIP_CONST_CAST(void*, client_info),
11871352
// 1);
1188-
//mqtt_sub_unsub(client,
1353+
//OBK_LWIP_MQTT_SUB_UNSUB(client,
11891354
// "topic_qos0", 0,
11901355
// mqtt_request_cb, LWIP_CONST_CAST(void*, client_info),
11911356
// 1);
@@ -1303,7 +1468,7 @@ static int MQTT_do_connect(mqtt_client_t* client)
13031468
#ifdef PLATFORM_XR809
13041469
res = dns_gethostbyname(mqtt_host, &mqtt_ip_resolved, dnsFound, NULL);
13051470
#else
1306-
res = dns_gethostbyname_addrtype(mqtt_host, &mqtt_ip_resolved, dnsFound, NULL, LWIP_DNS_ADDRTYPE_IPV4);
1471+
res = OBK_LWIP_DNS_GETHOSTBYNAME_ADDRTYPE(mqtt_host, &mqtt_ip_resolved, dnsFound, NULL, LWIP_DNS_ADDRTYPE_IPV4);
13071472
#endif
13081473
if (ERR_OK == res)
13091474
{
@@ -1392,7 +1557,7 @@ static int MQTT_do_connect(mqtt_client_t* client)
13921557
For now MQTT version 3.1.1 is always used */
13931558

13941559
LOCK_TCPIP_CORE();
1395-
res = mqtt_client_connect(mqtt_client,
1560+
res = OBK_LWIP_MQTT_CLIENT_CONNECT(mqtt_client,
13961561
&mqtt_ip, mqtt_port,
13971562
mqtt_connection_cb, LWIP_CONST_CAST(void*, &mqtt_client_info),
13981563
&mqtt_client_info);
@@ -1404,7 +1569,7 @@ static int MQTT_do_connect(mqtt_client_t* client)
14041569
snprintf(mqtt_status_message, sizeof(mqtt_status_message), "mqtt_client_connect connect failed");
14051570
if (res == ERR_ISCONN)
14061571
{
1407-
mqtt_disconnect(mqtt_client);
1572+
OBK_LWIP_MQTT_DISCONNECT(mqtt_client);
14081573
}
14091574
}
14101575
else {
@@ -1695,7 +1860,7 @@ void MQTT_Test_Tick(void* param)
16951860
sprintf(info->value, "TestMSG: %li/%li Time: %i s, Rate: %i msg/s", info->msg_cnt, info->msg_num,
16961861
(int)info->bench_time, (int)info->bench_rate);
16971862
LOCK_TCPIP_CORE();
1698-
err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0);
1863+
err = OBK_LWIP_MQTT_PUBLISH(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0);
16991864
UNLOCK_TCPIP_CORE();
17001865
if (err == ERR_OK)
17011866
{
@@ -1725,7 +1890,7 @@ void MQTT_Test_Tick(void* param)
17251890
sprintf(info->value, "Benchmark completed. %li msg published. Total Time: %i s MsgRate: %i msg/s",
17261891
info->msg_cnt, (int)info->bench_time, (int)info->bench_rate);
17271892
LOCK_TCPIP_CORE();
1728-
err = mqtt_publish(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0);
1893+
err = OBK_LWIP_MQTT_PUBLISH(mqtt_client, info->topic, info->value, strlen(info->value), qos, retain, mqtt_pub_request_cb, 0);
17291894
UNLOCK_TCPIP_CORE();
17301895
if (err == ERR_OK)
17311896
{
@@ -2255,7 +2420,7 @@ int MQTT_RunEverySecondUpdate()
22552420
else
22562421
{
22572422
LOCK_TCPIP_CORE();
2258-
mqtt_disconnect(mqtt_client);
2423+
OBK_LWIP_MQTT_DISCONNECT(mqtt_client);
22592424
#if defined(MQTT_CLIENT_CLEANUP)
22602425
mqtt_client_cleanup(mqtt_client);
22612426
#endif
@@ -2301,7 +2466,7 @@ int MQTT_RunEverySecondUpdate()
23012466
{
23022467
addLogAdv(LOG_INFO, LOG_FEATURE_MQTT, "OTA started MQTT will be closed\n");
23032468
LOCK_TCPIP_CORE();
2304-
mqtt_disconnect(mqtt_client);
2469+
OBK_LWIP_MQTT_DISCONNECT(mqtt_client);
23052470
UNLOCK_TCPIP_CORE();
23062471
return 1;
23072472
}

0 commit comments

Comments
 (0)