@@ -318,11 +318,41 @@ IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t le
318318 FUNC_EXIT_RC (rc )
319319}
320320
321- static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len (AWS_IoT_Client * pClient ,
321+ static IoT_Error_t _aws_iot_mqtt_internal_readWrapper ( AWS_IoT_Client * pClient , size_t offset , size_t size , Timer * pTimer , size_t * read_len ) {
322+ IoT_Error_t rc ;
323+ int byteToRead ;
324+ size_t byteRead = 0 ;
325+
326+ byteToRead = ( offset + size ) - pClient -> clientData .readBufIndex ;
327+
328+ if ( byteToRead > 0 )
329+ {
330+ rc = pClient -> networkStack .read ( & ( pClient -> networkStack ),
331+ pClient -> clientData .readBuf + pClient -> clientData .readBufIndex ,
332+ (size_t )byteToRead ,
333+ pTimer ,
334+ & byteRead );
335+ pClient -> clientData .readBufIndex += byteRead ;
336+
337+ /* refresh byte to read */
338+ byteToRead = ( offset + size ) - ((int )pClient -> clientData .readBufIndex );
339+ * read_len = size - (size_t )byteToRead ;
340+ }
341+ else
342+ {
343+ * read_len = size ;
344+ rc = SUCCESS ;
345+ }
346+
347+
348+
349+ return rc ;
350+ }
351+ static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len (AWS_IoT_Client * pClient , size_t * offset ,
322352 size_t * rem_len , Timer * pTimer ) {
323- unsigned char encodedByte ;
324353 size_t multiplier , len ;
325354 IoT_Error_t rc ;
355+ size_t read_len ;
326356
327357 FUNC_ENTRY ;
328358
@@ -335,22 +365,23 @@ static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Cl
335365 /* bad data */
336366 FUNC_EXIT_RC (MQTT_DECODE_REMAINING_LENGTH_ERROR );
337367 }
368+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , len , 1 , pTimer , & read_len );
338369
339- rc = pClient -> networkStack .read (& (pClient -> networkStack ), & encodedByte , 1 , pTimer , & len );
340370 if (SUCCESS != rc ) {
341371 FUNC_EXIT_RC (rc );
342372 }
343373
344- * rem_len += ((encodedByte & 127 ) * multiplier );
374+ * rem_len += (( pClient -> clientData . readBuf [ len ] & 127 ) * multiplier );
345375 multiplier *= 128 ;
346- } while ((encodedByte & 128 ) != 0 );
347-
376+ } while (( pClient -> clientData . readBuf [ len ] & 128 ) != 0 );
377+ * offset = len + 1 ;
348378 FUNC_EXIT_RC (rc );
349379}
350380
351381static IoT_Error_t _aws_iot_mqtt_internal_read_packet (AWS_IoT_Client * pClient , Timer * pTimer , uint8_t * pPacketType ) {
352- size_t len , rem_len , total_bytes_read , bytes_to_be_read , read_len ;
382+ size_t rem_len , total_bytes_read , bytes_to_be_read , read_len ;
353383 IoT_Error_t rc ;
384+ size_t offset = 0 ;
354385 MQTTHeader header = {0 };
355386 Timer packetTimer ;
356387 init_timer (& packetTimer );
@@ -361,30 +392,22 @@ static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, T
361392 bytes_to_be_read = 0 ;
362393 read_len = 0 ;
363394
364- rc = pClient -> networkStack . read ( & ( pClient -> networkStack ), pClient -> clientData . readBuf , 1 , pTimer , & read_len );
395+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , offset , 1 , pTimer , & read_len );
365396 /* 1. read the header byte. This has the packet type in it */
366397 if (NETWORK_SSL_NOTHING_TO_READ == rc ) {
367398 return MQTT_NOTHING_TO_READ ;
368399 } else if (SUCCESS != rc ) {
369400 return rc ;
370401 }
371402
372- len = 1 ;
373-
374- /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to
375- * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving
376- * if the remaining time in pTimer is too short.
377- */
378- pTimer = & packetTimer ;
379-
380403 /* 2. read the remaining length. This is variable in itself */
381- rc = _aws_iot_mqtt_internal_decode_packet_remaining_len (pClient , & rem_len , pTimer );
404+ rc = _aws_iot_mqtt_internal_decode_packet_remaining_len (pClient , & offset , & rem_len , pTimer );
382405 if (SUCCESS != rc ) {
383406 return rc ;
384- }
385-
407+ }
408+
386409 /* if the buffer is too short then the message will be dropped silently */
387- if (rem_len >= pClient -> clientData .readBufSize ) {
410+ if (( rem_len + offset ) >= pClient -> clientData .readBufSize ) {
388411 bytes_to_be_read = pClient -> clientData .readBufSize ;
389412 do {
390413 rc = pClient -> networkStack .read (& (pClient -> networkStack ), pClient -> clientData .readBuf , bytes_to_be_read ,
@@ -398,21 +421,29 @@ static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, T
398421 }
399422 }
400423 } while (total_bytes_read < rem_len && SUCCESS == rc );
401- return MQTT_RX_BUFFER_TOO_SHORT_ERROR ;
402- }
403424
404- /* put the original remaining length into the read buffer */
405- len += aws_iot_mqtt_internal_write_len_to_buffer (pClient -> clientData .readBuf + 1 , (uint32_t ) rem_len );
425+ /* Check buffer was correctly emptied, otherwise, return error message. */
426+ if ( total_bytes_read == rem_len )
427+ {
428+ aws_iot_mqtt_internal_flushBuffers ( pClient );
429+ return MQTT_RX_BUFFER_TOO_SHORT_ERROR ;
430+ }
431+ else
432+ {
433+ return rc ;
434+ }
435+ }
406436
407437 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
408438 if (rem_len > 0 ) {
409- rc = pClient -> networkStack .read (& (pClient -> networkStack ), pClient -> clientData .readBuf + len , rem_len , pTimer ,
410- & read_len );
439+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , offset , rem_len , pTimer , & read_len );
411440 if (SUCCESS != rc || read_len != rem_len ) {
412441 return FAILURE ;
413442 }
414443 }
415444
445+ /* Pack has been received, we can flush the buffers for next call. */
446+ aws_iot_mqtt_internal_flushBuffers ( pClient );
416447 header .byte = pClient -> clientData .readBuf [0 ];
417448 * pPacketType = MQTT_HEADER_FIELD_TYPE (header .byte );
418449
@@ -613,6 +644,10 @@ IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, Timer *pTi
613644 return rc ;
614645}
615646
647+ IoT_Error_t aws_iot_mqtt_internal_flushBuffers ( AWS_IoT_Client * pClient ) {
648+ pClient -> clientData .readBufIndex = 0 ;
649+ }
650+
616651/* only used in single-threaded mode where one command at a time is in process */
617652IoT_Error_t aws_iot_mqtt_internal_wait_for_read (AWS_IoT_Client * pClient , uint8_t packetType , Timer * pTimer ) {
618653 IoT_Error_t rc ;
0 commit comments