Skip to content

Commit 79b8ab5

Browse files
committed
fix(mosq): Split the functionality into peer & mqtt
1 parent 04e897b commit 79b8ab5

File tree

5 files changed

+329
-256
lines changed

5 files changed

+329
-256
lines changed

components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ if(CONFIG_EXAMPLE_PEER_LIB_ESP_PEER)
22
set(PEER_BACKEND_SRC "webrtc.c")
33
endif()
44

5-
idf_component_register(SRCS "serverless_mqtt.c"
5+
idf_component_register(SRCS "serverless_mqtt.c" "peer_impl.c"
66
"wifi_connect.c"
77
"${PEER_BACKEND_SRC}"
88
INCLUDE_DIRS "."
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
3+
*
4+
* SPDX-License-Identifier: Unlicense OR CC0-1.0
5+
*/
6+
#include <stdio.h>
7+
#include "freertos/FreeRTOS.h"
8+
#include "freertos/event_groups.h"
9+
#include "mqtt_client.h"
10+
#include "esp_wifi.h"
11+
#include "esp_log.h"
12+
#include "esp_check.h"
13+
#include "juice/juice.h"
14+
#include "cJSON.h"
15+
#include "peer_impl.h"
16+
17+
#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1)
18+
#define OUR_PEER "1"
19+
#define THEIR_PEER "2"
20+
#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2)
21+
#define OUR_PEER "2"
22+
#define THEIR_PEER "1"
23+
#endif
24+
25+
#define PEER_SYNC0 BIT(0)
26+
#define PEER_SYNC1 BIT(1)
27+
#define PEER_SYNC2 BIT(2)
28+
#define PEER_FAIL BIT(3)
29+
#define PEER_GATHER_DONE BIT(4)
30+
#define PEER_DESC_PUBLISHED BIT(5)
31+
#define PEER_CONNECTED BIT(6)
32+
33+
#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL)
34+
35+
#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER
36+
#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER
37+
#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN
38+
39+
static const char *TAG = "serverless_mqtt" OUR_PEER;
40+
static char s_buffer[MAX_BUFFER_SIZE];
41+
static EventGroupHandle_t s_state = NULL;
42+
static juice_agent_t *s_agent = NULL;
43+
static cJSON *s_peer_desc_json = NULL;
44+
static char *s_peer_desc = NULL;
45+
static esp_mqtt_client_handle_t s_local_mqtt = NULL;
46+
static on_peer_recv_t s_on_recv = NULL;
47+
48+
char *wifi_get_ipv4(wifi_interface_t interface);
49+
static esp_err_t sync_peers(void);
50+
static esp_err_t create_candidates(void);
51+
52+
void peer_get_buffer(char ** buffer, size_t *buffer_len)
53+
{
54+
if (buffer && buffer_len) {
55+
*buffer = s_buffer;
56+
*buffer_len = MAX_BUFFER_SIZE;
57+
}
58+
}
59+
60+
void peer_send(char* data, size_t size)
61+
{
62+
juice_send(s_agent, data, size);
63+
}
64+
65+
esp_err_t peer_init(on_peer_recv_t cb)
66+
{
67+
esp_err_t ret = ESP_FAIL;
68+
ESP_GOTO_ON_FALSE(cb, ESP_ERR_INVALID_ARG, err, TAG, "Invalid peer receive callback");
69+
ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates");
70+
ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer");
71+
EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000));
72+
if (bits & PEER_CONNECTED) {
73+
ESP_LOGI(TAG, "Peer is connected!");
74+
return ESP_OK;
75+
}
76+
err:
77+
ESP_LOGE(TAG, "Failed to init peer");
78+
return ret;
79+
}
80+
81+
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
82+
{
83+
esp_mqtt_event_handle_t event = event_data;
84+
esp_mqtt_client_handle_t client = event->client;
85+
switch ((esp_mqtt_event_id_t)event_id) {
86+
case MQTT_EVENT_CONNECTED:
87+
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
88+
if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) {
89+
ESP_LOGE(TAG, "Failed to subscribe to the sync topic");
90+
}
91+
xEventGroupSetBits(s_state, PEER_SYNC0);
92+
break;
93+
case MQTT_EVENT_DISCONNECTED:
94+
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
95+
xEventGroupSetBits(s_state, PEER_FAIL);
96+
break;
97+
98+
case MQTT_EVENT_DATA:
99+
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
100+
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
101+
printf("DATA=%.*s\r\n", event->data_len, event->data);
102+
if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) {
103+
break;
104+
}
105+
EventBits_t bits = xEventGroupGetBits(s_state);
106+
if (event->data_len > 1 && s_agent) {
107+
cJSON *root = cJSON_Parse(event->data);
108+
if (root == NULL) {
109+
break;
110+
}
111+
cJSON *desc = cJSON_GetObjectItem(root, "desc");
112+
if (desc == NULL) {
113+
cJSON_Delete(root);
114+
break;
115+
}
116+
printf("desc->valuestring:%s\n", desc->valuestring);
117+
juice_set_remote_description(s_agent, desc->valuestring);
118+
char cand_name[] = "cand0";
119+
while (true) {
120+
cJSON *cand = cJSON_GetObjectItem(root, cand_name);
121+
if (cand == NULL) {
122+
break;
123+
}
124+
printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring);
125+
juice_add_remote_candidate(s_agent, cand->valuestring);
126+
cand_name[4]++;
127+
}
128+
cJSON_Delete(root);
129+
xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process
130+
// and destroy the mqtt client
131+
}
132+
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
133+
if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) {
134+
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) {
135+
xEventGroupSetBits(s_state, PEER_SYNC2);
136+
} else {
137+
xEventGroupSetBits(s_state, PEER_FAIL);
138+
}
139+
}
140+
#else
141+
if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) {
142+
if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) {
143+
xEventGroupSetBits(s_state, PEER_SYNC1);
144+
} else {
145+
xEventGroupSetBits(s_state, PEER_FAIL);
146+
}
147+
} else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) {
148+
xEventGroupSetBits(s_state, PEER_SYNC2);
149+
}
150+
#endif
151+
break;
152+
case MQTT_EVENT_ERROR:
153+
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
154+
xEventGroupSetBits(s_state, PEER_FAIL);
155+
break;
156+
default:
157+
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
158+
break;
159+
}
160+
}
161+
162+
static esp_err_t sync_peers(void)
163+
{
164+
esp_err_t ret = ESP_OK;
165+
esp_mqtt_client_config_t mqtt_cfg = {
166+
.broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI,
167+
.task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE,
168+
};
169+
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
170+
ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client");
171+
ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL),
172+
err, TAG, "Failed to register mqtt event handler");
173+
ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client");
174+
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
175+
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
176+
ESP_LOGI(TAG, "Waiting for the other peer...");
177+
const int max_sync_retry = 60;
178+
int retry = 0;
179+
while (true) {
180+
EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000));
181+
if (bits & PEER_SYNC2) {
182+
break;
183+
}
184+
if (bits & PEER_SYNC1) {
185+
continue;
186+
}
187+
ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer");
188+
ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry);
189+
#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1
190+
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0,
191+
ESP_FAIL, TAG, "Failed to publish mqtt message");
192+
#endif
193+
}
194+
ESP_LOGI(TAG, "Sync done");
195+
ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0,
196+
ESP_FAIL, TAG, "Failed to publish peer's description");
197+
ESP_LOGI(TAG, "Waiting for the other peer description and candidates...");
198+
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)),
199+
ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates");
200+
err:
201+
free(s_peer_desc);
202+
esp_mqtt_client_destroy(client);
203+
return ret;
204+
}
205+
206+
static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr)
207+
{
208+
ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state));
209+
if (state == JUICE_STATE_CONNECTED) {
210+
xEventGroupSetBits(s_state, PEER_CONNECTED);
211+
} else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) {
212+
esp_restart();
213+
}
214+
}
215+
216+
static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr)
217+
{
218+
static uint8_t cand_nr = 0;
219+
if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates
220+
char cand_name[] = "cand0";
221+
cand_name[4] += cand_nr++;
222+
cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp);
223+
}
224+
}
225+
226+
static void juice_gathering_done(juice_agent_t *agent, void *user_ptr)
227+
{
228+
ESP_LOGI(TAG, "Gathering done");
229+
if (s_state) {
230+
xEventGroupSetBits(s_state, PEER_GATHER_DONE);
231+
}
232+
}
233+
234+
235+
236+
static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr)
237+
{
238+
if (s_local_mqtt) {
239+
s_on_recv(data, size);
240+
}
241+
}
242+
243+
static esp_err_t create_candidates(void)
244+
{
245+
ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group");
246+
s_peer_desc_json = cJSON_CreateObject();
247+
esp_err_t ret = ESP_OK;
248+
juice_set_log_level(JUICE_LOG_LEVEL_INFO);
249+
juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER,
250+
.bind_address = wifi_get_ipv4(WIFI_IF_STA),
251+
.stun_server_port = 19302,
252+
.cb_state_changed = juice_state,
253+
.cb_candidate = juice_candidate,
254+
.cb_gathering_done = juice_gathering_done,
255+
.cb_recv = juice_recv,
256+
};
257+
258+
s_agent = juice_create(&config);
259+
ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent");
260+
ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS,
261+
ESP_FAIL, err, TAG, "Failed to get local description");
262+
ESP_LOGI(TAG, "desc: %s", s_buffer);
263+
cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer);
264+
265+
ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS,
266+
ESP_FAIL, err, TAG, "Failed to start gathering candidates");
267+
ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)),
268+
ESP_FAIL, err, TAG, "Failed to connect to the sync broker");
269+
s_peer_desc = cJSON_Print(s_peer_desc_json);
270+
ESP_LOGI(TAG, "desc: %s", s_peer_desc);
271+
cJSON_Delete(s_peer_desc_json);
272+
return ESP_OK;
273+
274+
err:
275+
juice_destroy(s_agent);
276+
s_agent = NULL;
277+
cJSON_Delete(s_peer_desc_json);
278+
s_peer_desc_json = NULL;
279+
return ret;
280+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD
3+
*
4+
* SPDX-License-Identifier: Unlicense OR CC0-1.0
5+
*/
6+
7+
#include <stdio.h>
8+
#include "esp_random.h"
9+
#include "esp_sleep.h"
10+
#include "mosq_broker.h"
11+
12+
typedef void (*on_peer_recv_t)(const char *data, size_t size);
13+
14+
esp_err_t peer_init(on_peer_recv_t cb);
15+
16+
void peer_get_buffer(char ** buffer, size_t *buffer_len);
17+
18+
void peer_send(char* data, size_t size);

0 commit comments

Comments
 (0)