Skip to content

Commit 9c76b70

Browse files
fix: Multiple event data
Fix a regression introduced when adding the presence of topic in all event data.
1 parent 2d2060d commit 9c76b70

File tree

2 files changed

+49
-42
lines changed

2 files changed

+49
-42
lines changed

Kconfig

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,14 @@ menu "ESP-MQTT Configurations"
177177
help
178178
Messages which stays in the outbox longer than this value before being published will be discarded.
179179

180+
config MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
181+
bool "Enable publish topic in all data events"
182+
default n
183+
depends on MQTT_USE_CUSTOM_CONFIG
184+
help
185+
Set to true to have publish topic in all data events. This changes the behaviour
186+
when the message is bigger than the receive buffer size. The first event of the sequence
187+
always have the topic.
188+
Note: This will allocate memory to store the topic only in case of messge bigger than the buffer size.
189+
180190
endmenu

mqtt_client.c

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <stdint.h>
22
#include <stdlib.h>
3+
#include <time.h>
34
#include "esp_err.h"
45
#include "esp_log.h"
56
#include "esp_heap_caps.h"
@@ -1071,9 +1072,13 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10711072
uint8_t *msg_buf = client->mqtt_state.in_buffer;
10721073
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
10731074
size_t msg_total_len = client->mqtt_state.message_length;
1074-
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
1075+
size_t msg_topic_len = msg_read_len;
1076+
size_t msg_data_len = msg_read_len;
10751077
size_t msg_data_offset = 0;
1076-
char *msg_topic = NULL, *msg_data = NULL;
1078+
size_t saved_msg_topic_len = 0;
1079+
char *saved_msg_topic = NULL;
1080+
char *msg_topic = NULL;
1081+
char *msg_data = NULL;
10771082

10781083
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
10791084
#ifdef MQTT_PROTOCOL_5
@@ -1083,7 +1088,6 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10831088
}
10841089
#endif
10851090
} else {
1086-
// get and save topic
10871091
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
10881092
if (msg_topic == NULL) {
10891093
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@@ -1098,11 +1102,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10981102
return ESP_FAIL;
10991103
}
11001104
}
1101-
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
1102-
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
1103-
size_t saved_msg_topic_len = msg_topic_len;
11041105

1105-
// post data event
11061106
client->event.retain = mqtt_get_retain(msg_buf);
11071107
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
11081108
#ifdef MQTT_PROTOCOL_5
@@ -1115,48 +1115,45 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
11151115
client->event.dup = mqtt_get_dup(msg_buf);
11161116
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
11171117

1118-
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
1119-
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
1120-
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
1121-
client->event.event_id = MQTT_EVENT_DATA;
1122-
client->event.data = msg_data_len > 0 ? msg_data : NULL;
1123-
client->event.data_len = msg_data_len;
1124-
client->event.current_data_offset = msg_data_offset;
1125-
client->event.topic = msg_topic;
1126-
client->event.topic_len = msg_topic_len;
1127-
esp_mqtt_dispatch_event(client);
1128-
1129-
client->event.topic = saved_msg_topic;
1130-
client->event.topic_len = saved_msg_topic_len;
1131-
while(msg_read_len < msg_total_len) {
1132-
size_t buf_len = client->mqtt_state.in_buffer_length;
1133-
msg_data = (char *)client->mqtt_state.in_buffer;
1134-
msg_data_offset += msg_data_len;
1135-
1136-
size_t read_len;
1137-
if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
1138-
read_len = buf_len - saved_msg_topic_len;
1139-
} else {
1140-
read_len = msg_total_len - msg_read_len;
1141-
}
1142-
1143-
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
1144-
if (ret <= 0) {
1145-
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
1146-
}
1147-
1148-
msg_data_len = ret;
1149-
msg_read_len += msg_data_len;
1150-
1118+
bool send_event = true;
1119+
while (send_event) {
11511120
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
11521121
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
11531122
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
1154-
1123+
client->event.event_id = MQTT_EVENT_DATA;
1124+
client->event.data = msg_data_len > 0 ? msg_data : NULL;
11551125
client->event.data_len = msg_data_len;
11561126
client->event.current_data_offset = msg_data_offset;
1157-
1127+
client->event.topic = msg_topic;
1128+
client->event.topic_len = msg_topic_len;
11581129
esp_mqtt_dispatch_event(client);
1130+
send_event = false;
1131+
1132+
if (msg_read_len < msg_total_len) {
1133+
send_event = true;
1134+
#ifdef CONFIG_MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
1135+
if (!saved_msg_topic) {
1136+
saved_msg_topic = strndup(msg_topic, msg_topic_len);
1137+
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
1138+
saved_msg_topic_len = msg_topic_len;
1139+
}
1140+
#endif
1141+
size_t buf_len = client->mqtt_state.in_buffer_length;
1142+
1143+
msg_data = (char *)client->mqtt_state.in_buffer;
1144+
msg_topic = saved_msg_topic;
1145+
msg_topic_len = saved_msg_topic_len;
1146+
msg_data_offset += msg_data_len;
1147+
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
1148+
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
1149+
client->config->network_timeout_ms);
1150+
if (ret <= 0) {
1151+
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
1152+
}
11591153

1154+
msg_data_len = ret;
1155+
msg_read_len += msg_data_len;
1156+
}
11601157
}
11611158
free(saved_msg_topic);
11621159
return ESP_OK;

0 commit comments

Comments
 (0)