2
2
* @Author: jiejie
3
3
* @Github: https://github.com/jiejieTop
4
4
* @Date: 2019-12-09 21:31:25
5
- * @LastEditTime: 2021-01-14 10:00:12
5
+ * @LastEditTime : 2022-06-12 17:39:43
6
6
* @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
7
7
*/
8
8
#include "mqttclient.h"
@@ -154,6 +154,7 @@ static int mqtt_read_packet(mqtt_client_t* c, int* packet_type, platform_timer_t
154
154
if (NULL == packet_type )
155
155
RETURN_ERROR (MQTT_NULL_VALUE_ERROR );
156
156
157
+ platform_timer_init (timer );
157
158
platform_timer_cutdown (timer , c -> mqtt_cmd_timeout );
158
159
159
160
/* 1. read the header byte. This has the packet type in it */
@@ -192,6 +193,7 @@ static int mqtt_send_packet(mqtt_client_t* c, int length, platform_timer_t* time
192
193
int len = 0 ;
193
194
int sent = 0 ;
194
195
196
+ platform_timer_init (timer );
195
197
platform_timer_cutdown (timer , c -> mqtt_cmd_timeout );
196
198
197
199
/* send mqtt packet in a blocking manner or exit when it timer is expired */
@@ -302,7 +304,7 @@ static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_m
302
304
rc = MQTT_SUCCESS_ERROR ;
303
305
}
304
306
305
- memset (message -> payload , 0 , strlen (( const char * ) message -> payload ));
307
+ memset (message -> payload , 0 , strlen (message -> payload ));
306
308
memset (topic_name -> lenstring .data , 0 , topic_name -> lenstring .len );
307
309
308
310
RETURN_ERROR (rc );
@@ -317,7 +319,7 @@ static ack_handlers_t *mqtt_ack_handler_create(mqtt_client_t* c, int type, uint1
317
319
return NULL ;
318
320
319
321
mqtt_list_init (& ack_handler -> list );
320
-
322
+ platform_timer_init ( & ack_handler -> timer );
321
323
platform_timer_cutdown (& ack_handler -> timer , c -> mqtt_cmd_timeout ); /* No response within timeout will be destroyed or resent */
322
324
323
325
ack_handler -> type = type ;
@@ -341,7 +343,7 @@ static void mqtt_ack_handler_destroy(ack_handlers_t* ack_handler)
341
343
static void mqtt_ack_handler_resend (mqtt_client_t * c , ack_handlers_t * ack_handler )
342
344
{
343
345
platform_timer_t timer ;
344
-
346
+ platform_timer_init ( & timer );
345
347
platform_timer_cutdown (& timer , c -> mqtt_cmd_timeout );
346
348
platform_timer_cutdown (& ack_handler -> timer , c -> mqtt_cmd_timeout ); /* timeout, recutdown */
347
349
@@ -351,7 +353,6 @@ static void mqtt_ack_handler_resend(mqtt_client_t* c, ack_handlers_t* ack_handle
351
353
mqtt_send_packet (c , ack_handler -> payload_len , & timer ); /* resend data */
352
354
platform_mutex_unlock (& c -> mqtt_write_lock );
353
355
MQTT_LOG_W ("%s:%d %s()... resend %d package, packet_id is %d " , __FILE__ , __LINE__ , __FUNCTION__ , ack_handler -> type , ack_handler -> packet_id );
354
-
355
356
}
356
357
357
358
static int mqtt_ack_list_node_is_exist (mqtt_client_t * c , int type , uint16_t packet_id )
@@ -515,6 +516,7 @@ static void mqtt_clean_session(mqtt_client_t* c)
515
516
msg_handler -> topic_filter = NULL ;
516
517
platform_memory_free (msg_handler );
517
518
}
519
+ // MQTT_LOG_D("%s:%d %s() mqtt_msg_handler_list delete", __FILE__, __LINE__, __FUNCTION__);
518
520
mqtt_list_del_init (& c -> mqtt_msg_handler_list );
519
521
}
520
522
@@ -568,8 +570,10 @@ static int mqtt_try_resubscribe(mqtt_client_t* c)
568
570
569
571
MQTT_LOG_W ("%s:%d %s()... mqtt try resubscribe ..." , __FILE__ , __LINE__ , __FUNCTION__ );
570
572
571
- if (mqtt_list_is_empty (& c -> mqtt_msg_handler_list ))
573
+ if (mqtt_list_is_empty (& c -> mqtt_msg_handler_list )) {
574
+ // MQTT_LOG_D("%s:%d %s() mqtt_msg_handler_list is empty", __FILE__, __LINE__, __FUNCTION__);
572
575
RETURN_ERROR (MQTT_SUCCESS_ERROR );
576
+ }
573
577
574
578
LIST_FOR_EACH_SAFE (curr , next , & c -> mqtt_msg_handler_list ) {
575
579
msg_handler = LIST_ENTRY (curr , message_handlers_t , list );
@@ -626,7 +630,9 @@ static int mqtt_publish_ack_packet(mqtt_client_t *c, uint16_t packet_id, int pac
626
630
int len = 0 ;
627
631
int rc = MQTT_SUCCESS_ERROR ;
628
632
platform_timer_t timer ;
629
-
633
+ platform_timer_init (& timer );
634
+ platform_timer_cutdown (& timer , c -> mqtt_cmd_timeout );
635
+
630
636
platform_mutex_lock (& c -> mqtt_write_lock );
631
637
632
638
switch (packet_type ) {
@@ -851,7 +857,7 @@ static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
851
857
break ;
852
858
853
859
default :
854
- break ;
860
+ goto exit ;
855
861
}
856
862
857
863
rc = mqtt_keep_alive (c );
@@ -888,7 +894,7 @@ static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
888
894
if (0 == timeout_ms )
889
895
timeout_ms = c -> mqtt_cmd_timeout ;
890
896
891
-
897
+ platform_timer_init ( & timer );
892
898
platform_timer_cutdown (& timer , timeout_ms );
893
899
894
900
while (!platform_timer_is_expired (& timer )) {
@@ -966,8 +972,6 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
966
972
if (CLIENT_STATE_CONNECTED == mqtt_get_client_state (c ))
967
973
RETURN_ERROR (MQTT_SUCCESS_ERROR );
968
974
969
-
970
-
971
975
#ifndef MQTT_NETWORK_TYPE_NO_TLS
972
976
rc = network_init (c -> mqtt_network , c -> mqtt_host , c -> mqtt_port , c -> mqtt_ca );
973
977
#else
@@ -1036,7 +1040,7 @@ static int mqtt_connect_with_results(mqtt_client_t* c)
1036
1040
/*creat the thread fail and disconnect the mqtt socket connect*/
1037
1041
network_release (c -> mqtt_network );
1038
1042
rc = MQTT_CONNECT_FAILED_ERROR ;
1039
- MQTT_LOG_W ("%s:%d %s()... mqtt yield thread creat faile ..." , __FILE__ , __LINE__ , __FUNCTION__ );
1043
+ MQTT_LOG_W ("%s:%d %s()... mqtt yield thread creat failed ..." , __FILE__ , __LINE__ , __FUNCTION__ );
1040
1044
}
1041
1045
} else {
1042
1046
mqtt_set_client_state (c , CLIENT_STATE_CONNECTED ); /* reconnect, mqtt thread is already exists */
@@ -1196,8 +1200,9 @@ int mqtt_keep_alive(mqtt_client_t* c)
1196
1200
} else {
1197
1201
platform_timer_t timer ;
1198
1202
int len = MQTTSerialize_pingreq (c -> mqtt_write_buf , c -> mqtt_write_buf_size );
1199
- if (len > 0 && (rc = mqtt_send_packet (c , len , & timer )) == MQTT_SUCCESS_ERROR ) // send the ping packet
1200
- c -> mqtt_ping_outstanding ++ ;
1203
+ if (len > 0 )
1204
+ rc = mqtt_send_packet (c , len , & timer ); // 100ask, send the ping packet
1205
+ c -> mqtt_ping_outstanding ++ ;
1201
1206
}
1202
1207
}
1203
1208
@@ -1229,7 +1234,7 @@ int mqtt_release(mqtt_client_t* c)
1229
1234
if (NULL == c )
1230
1235
RETURN_ERROR (MQTT_NULL_VALUE_ERROR );
1231
1236
1232
-
1237
+ platform_timer_init ( & timer );
1233
1238
platform_timer_cutdown (& timer , c -> mqtt_cmd_timeout );
1234
1239
1235
1240
/* wait for the clean session to complete */
@@ -1276,7 +1281,7 @@ int mqtt_disconnect(mqtt_client_t* c)
1276
1281
platform_timer_t timer ;
1277
1282
int len = 0 ;
1278
1283
1279
-
1284
+ platform_timer_init ( & timer );
1280
1285
platform_timer_cutdown (& timer , c -> mqtt_cmd_timeout );
1281
1286
1282
1287
platform_mutex_lock (& c -> mqtt_write_lock );
@@ -1306,10 +1311,10 @@ int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, m
1306
1311
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state (c ))
1307
1312
RETURN_ERROR (MQTT_NOT_CONNECT_ERROR );
1308
1313
1309
- packet_id = mqtt_get_next_packet_id (c );
1310
-
1311
1314
platform_mutex_lock (& c -> mqtt_write_lock );
1312
1315
1316
+ packet_id = mqtt_get_next_packet_id (c );
1317
+
1313
1318
/* serialize subscribe packet and send it */
1314
1319
len = MQTTSerialize_subscribe (c -> mqtt_write_buf , c -> mqtt_write_buf_size , 0 , packet_id , 1 , & topic , (int * )& qos );
1315
1320
if (len <= 0 )
@@ -1350,10 +1355,10 @@ int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter)
1350
1355
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state (c ))
1351
1356
RETURN_ERROR (MQTT_NOT_CONNECT_ERROR );
1352
1357
1358
+ platform_mutex_lock (& c -> mqtt_write_lock );
1353
1359
1354
1360
packet_id = mqtt_get_next_packet_id (c );
1355
1361
1356
- platform_mutex_lock (& c -> mqtt_write_lock );
1357
1362
/* serialize unsubscribe packet and send it */
1358
1363
if ((len = MQTTSerialize_unsubscribe (c -> mqtt_write_buf , c -> mqtt_write_buf_size , 0 , packet_id , 1 , & topic )) <= 0 )
1359
1364
goto exit ;
@@ -1385,8 +1390,10 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
1385
1390
topic .cstring = (char * )topic_filter ;
1386
1391
1387
1392
if (CLIENT_STATE_CONNECTED != mqtt_get_client_state (c )) {
1393
+ msg -> payloadlen = 0 ; // clear
1388
1394
rc = MQTT_NOT_CONNECT_ERROR ;
1389
- goto exit ;
1395
+ RETURN_ERROR (rc );
1396
+ // goto exit; /* 100ask */
1390
1397
}
1391
1398
1392
1399
if ((NULL != msg -> payload ) && (0 == msg -> payloadlen ))
@@ -1417,7 +1424,7 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
1417
1424
goto exit ;
1418
1425
1419
1426
if (QOS0 != msg -> qos ) {
1420
- mqtt_set_publish_dup (c ,1 ); /* may resend this data, set the dup flag in advance */
1427
+ mqtt_set_publish_dup (c , 1 ); /* may resend this data, set the dup flag in advance */
1421
1428
1422
1429
if (QOS1 == msg -> qos ) {
1423
1430
/* expect to receive PUBACK, otherwise data will be resent */
@@ -1431,7 +1438,7 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg
1431
1438
1432
1439
exit :
1433
1440
msg -> payloadlen = 0 ; // clear
1434
-
1441
+
1435
1442
platform_mutex_unlock (& c -> mqtt_write_lock );
1436
1443
1437
1444
if ((MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc ) || (MQTT_MEM_NOT_ENOUGH_ERROR == rc )) {
0 commit comments