Skip to content

Commit d3e3c4e

Browse files
Merge branch 'bugfix/consolidate_err_messages' into 'master'
client: Report failure on timeout in mid-message timeout (GitHub PR) See merge request espressif/esp-mqtt!165
2 parents 7894dd0 + ddde502 commit d3e3c4e

File tree

1 file changed

+44
-18
lines changed

1 file changed

+44
-18
lines changed

mqtt_client.c

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,36 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
4343
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
4444
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
4545

46-
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client)
46+
/**
47+
* @brief Processes error reported from transport layer (considering the message read status)
48+
*
49+
* @param err: Error reported from TCP transport
50+
* @param client: MQTT client handle
51+
* @param mid_message: True if the error occured when reading incomplete message
52+
*
53+
* @return - 0 on Timeout
54+
* - -1 on Timeout with incomplete message
55+
* - -2 on Error or EOF
56+
*/
57+
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
4758
{
48-
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
49-
ESP_LOGD(TAG, "%s: transport_read(): EOF", __func__);
50-
return 0;
51-
}
52-
5359
if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
60+
if (mid_message) {
61+
// No error message, because we could've read with timeout 0 (caller will decide)
62+
return -1;
63+
}
64+
// Not an error, continue
5465
ESP_LOGD(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
5566
return 0;
5667
}
5768

69+
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
70+
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
71+
}
72+
5873
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
5974
esp_mqtt_client_dispatch_transport_error(client);
60-
return -1;
75+
return -2;
6176
}
6277

6378
#if MQTT_ENABLE_SSL
@@ -1094,7 +1109,7 @@ static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
10941109
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
10951110
client->config->network_timeout_ms);
10961111
if (ret <= 0) {
1097-
return esp_mqtt_handle_transport_read_error(ret, client) == 0 ? ESP_OK : ESP_FAIL;
1112+
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
10981113
}
10991114

11001115
msg_data_len = ret;
@@ -1178,11 +1193,13 @@ static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_
11781193

11791194
/*
11801195
* Returns:
1181-
* -1 in case of failure
1196+
* -2 in case of failure or EOF (clean connection closure)
1197+
* -1 timeout while in-the-middle of the messge
11821198
* 0 if no message has been received
11831199
* 1 if a message has been received and placed to client->mqtt_state:
11841200
* message length: client->mqtt_state.message_length
11851201
* message content: client->mqtt_state.in_buffer
1202+
*
11861203
*/
11871204
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
11881205
{
@@ -1198,7 +1215,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
11981215
*/
11991216
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
12001217
if (read_len <= 0) {
1201-
return esp_mqtt_handle_transport_read_error(read_len, client);
1218+
return esp_mqtt_handle_transport_read_error(read_len, client, false);
12021219
}
12031220
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
12041221
/*
@@ -1224,7 +1241,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
12241241
*/
12251242
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
12261243
if (read_len <= 0) {
1227-
return esp_mqtt_handle_transport_read_error(read_len, client);
1244+
return esp_mqtt_handle_transport_read_error(read_len, client, true);
12281245
}
12291246
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
12301247
buf++;
@@ -1245,7 +1262,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
12451262
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
12461263
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
12471264
if (read_len <= 0) {
1248-
return esp_mqtt_handle_transport_read_error(read_len, client);
1265+
return esp_mqtt_handle_transport_read_error(read_len, client, true);
12491266
}
12501267
client->mqtt_state.in_buffer_read_len += read_len;
12511268
buf += read_len;
@@ -1276,7 +1293,7 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
12761293
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
12771294
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
12781295
if (read_len <= 0) {
1279-
return esp_mqtt_handle_transport_read_error(read_len, client);
1296+
return esp_mqtt_handle_transport_read_error(read_len, client, true);
12801297
}
12811298
client->mqtt_state.in_buffer_read_len += read_len;
12821299
if (client->mqtt_state.in_buffer_read_len < total_len) {
@@ -1290,23 +1307,32 @@ static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_t
12901307
return 1;
12911308
err:
12921309
esp_mqtt_client_dispatch_transport_error(client);
1293-
return -1;
1310+
return -2;
12941311
}
12951312

12961313
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
12971314
{
12981315
uint8_t msg_type = 0, msg_qos = 0;
12991316
uint16_t msg_id = 0;
1317+
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;
13001318

13011319
/* non-blocking receive in order not to block other tasks */
13021320
int recv = mqtt_message_receive(client, 0);
1303-
if (recv < 0) {
1321+
if (recv == 0) { // Timeout
1322+
return ESP_OK;
1323+
}
1324+
if (recv == -1) { // Mid-message timeout
1325+
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
1326+
// Report error only if didn't receive anything since previous iteration
1327+
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
1328+
return ESP_FAIL;
1329+
}
1330+
return ESP_OK; // Treat as standard timeout (keep reading the message)
1331+
}
1332+
if (recv < 0) { // Other error
13041333
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
13051334
return ESP_FAIL;
13061335
}
1307-
if (recv == 0) {
1308-
return ESP_OK;
1309-
}
13101336
int read_len = client->mqtt_state.message_length;
13111337

13121338
// If the message was valid, get the type, quality of service and id of the message

0 commit comments

Comments
 (0)