Skip to content

Commit 82017e9

Browse files
feat: Include message topic in all data events for big messages.
When message is larger than the buffer, and must produce several events include topic where it came from in each of those events
1 parent b5b8033 commit 82017e9

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

mqtt_client.c

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_cl
521521
} else {
522522
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
523523
}
524-
524+
525525
client->config->transport = config->network.transport;
526526

527527
if (config->network.if_name) {
@@ -1062,7 +1062,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10621062
}
10631063
#endif
10641064
} else {
1065-
// get topic
1065+
// get and save topic
10661066
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
10671067
if (msg_topic == NULL) {
10681068
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
@@ -1077,6 +1077,10 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10771077
return ESP_FAIL;
10781078
}
10791079
}
1080+
char *saved_msg_topic = strndup(msg_topic, msg_topic_len);
1081+
ESP_MEM_CHECK(TAG, saved_msg_topic, return ESP_ERR_NO_MEM);
1082+
size_t saved_msg_topic_len = msg_topic_len;
1083+
10801084
// post data event
10811085
client->event.retain = mqtt_get_retain(msg_buf);
10821086
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
@@ -1089,7 +1093,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10891093
client->event.qos = mqtt_get_qos(msg_buf);
10901094
client->event.dup = mqtt_get_dup(msg_buf);
10911095
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
1092-
post_data_event:
1096+
10931097
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
10941098
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
10951099
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
@@ -1101,24 +1105,39 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
11011105
client->event.topic_len = msg_topic_len;
11021106
esp_mqtt_dispatch_event(client);
11031107

1104-
if (msg_read_len < msg_total_len) {
1108+
client->event.topic = saved_msg_topic;
1109+
client->event.topic_len = saved_msg_topic_len;
1110+
while(msg_read_len < msg_total_len) {
11051111
size_t buf_len = client->mqtt_state.in_buffer_length;
1106-
11071112
msg_data = (char *)client->mqtt_state.in_buffer;
1108-
msg_topic = NULL;
1109-
msg_topic_len = 0;
11101113
msg_data_offset += msg_data_len;
1111-
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
1112-
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
1113-
client->config->network_timeout_ms);
1114+
1115+
size_t read_len;
1116+
if(msg_total_len - msg_read_len > buf_len - saved_msg_topic_len) {
1117+
read_len = buf_len - saved_msg_topic_len;
1118+
} else {
1119+
read_len = msg_total_len - msg_read_len;
1120+
}
1121+
1122+
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, read_len, client->config->network_timeout_ms);
11141123
if (ret <= 0) {
11151124
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
11161125
}
11171126

11181127
msg_data_len = ret;
11191128
msg_read_len += msg_data_len;
1120-
goto post_data_event;
1129+
1130+
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
1131+
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
1132+
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
1133+
1134+
client->event.data_len = msg_data_len;
1135+
client->event.current_data_offset = msg_data_offset;
1136+
1137+
esp_mqtt_dispatch_event(client);
1138+
11211139
}
1140+
free(saved_msg_topic);
11221141
return ESP_OK;
11231142
}
11241143

0 commit comments

Comments
 (0)