11#include <stdint.h>
22#include <stdlib.h>
3+ #include <time.h>
34#include "esp_err.h"
45#include "esp_log.h"
56#include "esp_heap_caps.h"
@@ -1071,9 +1072,13 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10711072 uint8_t * msg_buf = client -> mqtt_state .in_buffer ;
10721073 size_t msg_read_len = client -> mqtt_state .in_buffer_read_len ;
10731074 size_t msg_total_len = client -> mqtt_state .message_length ;
1074- size_t msg_topic_len = msg_read_len , msg_data_len = msg_read_len ;
1075+ size_t msg_topic_len = msg_read_len ;
1076+ size_t msg_data_len = msg_read_len ;
10751077 size_t msg_data_offset = 0 ;
1076- char * msg_topic = NULL , * msg_data = NULL ;
1078+ size_t saved_msg_topic_len = 0 ;
1079+ char * saved_msg_topic = NULL ;
1080+ char * msg_topic = NULL ;
1081+ char * msg_data = NULL ;
10771082
10781083 if (client -> mqtt_state .connection .information .protocol_ver == MQTT_PROTOCOL_V_5 ) {
10791084#ifdef MQTT_PROTOCOL_5
@@ -1083,7 +1088,6 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10831088 }
10841089#endif
10851090 } else {
1086- // get and save topic
10871091 msg_topic = mqtt_get_publish_topic (msg_buf , & msg_topic_len );
10881092 if (msg_topic == NULL ) {
10891093 ESP_LOGE (TAG , "%s: mqtt_get_publish_topic() failed" , __func__ );
@@ -1098,11 +1102,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10981102 return ESP_FAIL ;
10991103 }
11001104 }
1101- char * saved_msg_topic = strndup (msg_topic , msg_topic_len );
1102- ESP_MEM_CHECK (TAG , saved_msg_topic , return ESP_ERR_NO_MEM );
1103- size_t saved_msg_topic_len = msg_topic_len ;
11041105
1105- // post data event
11061106 client -> event .retain = mqtt_get_retain (msg_buf );
11071107 if (client -> mqtt_state .connection .information .protocol_ver == MQTT_PROTOCOL_V_5 ) {
11081108#ifdef MQTT_PROTOCOL_5
@@ -1115,48 +1115,45 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
11151115 client -> event .dup = mqtt_get_dup (msg_buf );
11161116 client -> event .total_data_len = msg_data_len + msg_total_len - msg_read_len ;
11171117
1118- ESP_LOGD (TAG , "Get data len= %" NEWLIB_NANO_COMPAT_FORMAT ", topic len=%" NEWLIB_NANO_COMPAT_FORMAT ", total_data: %d offset: %" NEWLIB_NANO_COMPAT_FORMAT ,
1119- NEWLIB_NANO_COMPAT_CAST (msg_data_len ), NEWLIB_NANO_COMPAT_CAST (msg_topic_len ),
1120- client -> event .total_data_len , NEWLIB_NANO_COMPAT_CAST (msg_data_offset ));
1121- client -> event .event_id = MQTT_EVENT_DATA ;
1122- client -> event .data = msg_data_len > 0 ? msg_data : NULL ;
1123- client -> event .data_len = msg_data_len ;
1124- client -> event .current_data_offset = msg_data_offset ;
1125- client -> event .topic = msg_topic ;
1126- client -> event .topic_len = msg_topic_len ;
1127- esp_mqtt_dispatch_event (client );
1128-
1129- client -> event .topic = saved_msg_topic ;
1130- client -> event .topic_len = saved_msg_topic_len ;
1131- while (msg_read_len < msg_total_len ) {
1132- size_t buf_len = client -> mqtt_state .in_buffer_length ;
1133- msg_data = (char * )client -> mqtt_state .in_buffer ;
1134- msg_data_offset += msg_data_len ;
1135-
1136- size_t read_len ;
1137- if (msg_total_len - msg_read_len > buf_len - saved_msg_topic_len ) {
1138- read_len = buf_len - saved_msg_topic_len ;
1139- } else {
1140- read_len = msg_total_len - msg_read_len ;
1141- }
1142-
1143- int ret = esp_transport_read (client -> transport , (char * )client -> mqtt_state .in_buffer , read_len , client -> config -> network_timeout_ms );
1144- if (ret <= 0 ) {
1145- return esp_mqtt_handle_transport_read_error (ret , client , false) == 0 ? ESP_OK : ESP_FAIL ;
1146- }
1147-
1148- msg_data_len = ret ;
1149- msg_read_len += msg_data_len ;
1150-
1118+ bool send_event = true;
1119+ while (send_event ) {
11511120 ESP_LOGD (TAG , "Get data len= %" NEWLIB_NANO_COMPAT_FORMAT ", topic len=%" NEWLIB_NANO_COMPAT_FORMAT ", total_data: %d offset: %" NEWLIB_NANO_COMPAT_FORMAT ,
11521121 NEWLIB_NANO_COMPAT_CAST (msg_data_len ), NEWLIB_NANO_COMPAT_CAST (msg_topic_len ),
11531122 client -> event .total_data_len , NEWLIB_NANO_COMPAT_CAST (msg_data_offset ));
1154-
1123+ client -> event .event_id = MQTT_EVENT_DATA ;
1124+ client -> event .data = msg_data_len > 0 ? msg_data : NULL ;
11551125 client -> event .data_len = msg_data_len ;
11561126 client -> event .current_data_offset = msg_data_offset ;
1157-
1127+ client -> event .topic = msg_topic ;
1128+ client -> event .topic_len = msg_topic_len ;
11581129 esp_mqtt_dispatch_event (client );
1130+ send_event = false;
1131+
1132+ if (msg_read_len < msg_total_len ) {
1133+ send_event = true;
1134+ #ifdef CONFIG_MQTT_TOPIC_PRESENT_ALL_DATA_EVENTS
1135+ if (!saved_msg_topic ) {
1136+ saved_msg_topic = strndup (msg_topic , msg_topic_len );
1137+ ESP_MEM_CHECK (TAG , saved_msg_topic , return ESP_ERR_NO_MEM );
1138+ saved_msg_topic_len = msg_topic_len ;
1139+ }
1140+ #endif
1141+ size_t buf_len = client -> mqtt_state .in_buffer_length ;
1142+
1143+ msg_data = (char * )client -> mqtt_state .in_buffer ;
1144+ msg_topic = saved_msg_topic ;
1145+ msg_topic_len = saved_msg_topic_len ;
1146+ msg_data_offset += msg_data_len ;
1147+ int ret = esp_transport_read (client -> transport , (char * )client -> mqtt_state .in_buffer ,
1148+ msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len ,
1149+ client -> config -> network_timeout_ms );
1150+ if (ret <= 0 ) {
1151+ return esp_mqtt_handle_transport_read_error (ret , client , false) == 0 ? ESP_OK : ESP_FAIL ;
1152+ }
11591153
1154+ msg_data_len = ret ;
1155+ msg_read_len += msg_data_len ;
1156+ }
11601157 }
11611158 free (saved_msg_topic );
11621159 return ESP_OK ;
0 commit comments