@@ -17,15 +17,13 @@ PubSubClient::PubSubClient()
1717 this ->stream = NULL ;
1818 setCallback (NULL );
1919}
20-
2120// cppcheck-suppress uninitMemberVar
2221PubSubClient::PubSubClient (Client& client)
2322{
2423 this ->_state = MQTT_DISCONNECTED;
2524 setClient (client);
2625 this ->stream = NULL ;
2726}
28-
2927// cppcheck-suppress uninitMemberVar
3028PubSubClient::PubSubClient (IPAddress addr, uint16_t port, Client& client)
3129{
@@ -63,7 +61,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
6361 setClient (client);
6462 setStream (stream);
6563}
66-
6764// cppcheck-suppress uninitMemberVar
6865PubSubClient::PubSubClient (uint8_t *ip, uint16_t port, Client& client)
6966{
@@ -101,7 +98,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
10198 setClient (client);
10299 setStream (stream);
103100}
104-
105101// cppcheck-suppress uninitMemberVar
106102PubSubClient::PubSubClient (const char * domain, uint16_t port, Client& client)
107103{
@@ -143,22 +139,29 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
143139
144140bool PubSubClient::connect (const char *id)
145141{
146- return connect (id,NULL ,NULL ,0 ,0 ,0 ,0 );
142+ return connect (id,NULL ,NULL ,0 ,0 ,0 ,0 , 1 );
147143}
148144
149145bool PubSubClient::connect (const char *id, const char *user, const char *pass)
150146{
151- return connect (id,user,pass,0 ,0 ,0 ,0 );
147+ return connect (id,user,pass,0 ,0 ,0 ,0 , 1 );
152148}
153149
154150bool PubSubClient::connect (const char *id, const char * willTopic, uint8_t willQos,
155151 bool willRetain, const char * willMessage)
156152{
157- return connect (id,NULL ,NULL ,willTopic,willQos,willRetain,willMessage);
153+ return connect (id,NULL ,NULL ,willTopic,willQos,willRetain,willMessage, 1 );
158154}
159155
160156bool PubSubClient::connect (const char *id, const char *user, const char *pass,
161157 const char * willTopic, uint8_t willQos, bool willRetain, const char * willMessage)
158+ {
159+ return connect (id,user,pass,willTopic,willQos,willRetain,willMessage,1 );
160+ }
161+
162+ bool PubSubClient::connect (const char *id, const char *user, const char *pass,
163+ const char * willTopic, uint8_t willQos, bool willRetain, const char * willMessage,
164+ bool cleanSession)
162165{
163166 if (!connected ()) {
164167 int result = 0 ;
@@ -171,7 +174,7 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
171174 if (result == 1 ) {
172175 nextMsgId = 1 ;
173176 // Leave room in the buffer for header and variable length field
174- uint16_t length = 5 ;
177+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
175178 unsigned int j;
176179
177180#if MQTT_VERSION == MQTT_VERSION_3_1
@@ -187,9 +190,12 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
187190
188191 uint8_t v;
189192 if (willTopic) {
190- v = 0x06 |(willQos<<3 )|(willRetain<<5 );
193+ v = 0x04 |(willQos<<3 )|(willRetain<<5 );
191194 } else {
192- v = 0x02 ;
195+ v = 0x00 ;
196+ }
197+ if (cleanSession) {
198+ v = v|0x02 ;
193199 }
194200
195201 if (user != NULL ) {
@@ -204,20 +210,26 @@ bool PubSubClient::connect(const char *id, const char *user, const char *pass,
204210
205211 buffer[length++] = ((MQTT_KEEPALIVE) >> 8 );
206212 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF );
213+
214+ CHECK_STRING_LENGTH (length,id)
207215 length = writeString (id,buffer,length);
208216 if (willTopic) {
217+ CHECK_STRING_LENGTH (length,willTopic)
209218 length = writeString (willTopic,buffer,length);
219+ CHECK_STRING_LENGTH (length,willMessage)
210220 length = writeString (willMessage,buffer,length);
211221 }
212222
213223 if (user != NULL ) {
224+ CHECK_STRING_LENGTH (length,user)
214225 length = writeString (user,buffer,length);
215226 if (pass != NULL ) {
227+ CHECK_STRING_LENGTH (length,pass)
216228 length = writeString (pass,buffer,length);
217229 }
218230 }
219231
220- write (MQTTCONNECT,buffer,length-5 );
232+ write (MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE );
221233
222234 lastInActivity = lastOutActivity = millis ();
223235
@@ -256,6 +268,7 @@ bool PubSubClient::readByte(uint8_t * result)
256268{
257269 uint32_t previousMillis = millis ();
258270 while (!_client->available ()) {
271+ yield ();
259272 uint32_t currentMillis = millis ();
260273 if (currentMillis - previousMillis >= ((int32_t ) MQTT_SOCKET_TIMEOUT * 1000 )) {
261274 return false ;
@@ -291,6 +304,12 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength)
291304 uint8_t start = 0 ;
292305
293306 do {
307+ if (len == 5 ) {
308+ // Invalid remaining length encoding - kill the connection
309+ _state = MQTT_DISCONNECTED;
310+ _client->stop ();
311+ return 0 ;
312+ }
294313 if (!readByte (&digit)) {
295314 return 0 ;
296315 }
@@ -364,16 +383,15 @@ bool PubSubClient::loop()
364383 uint8_t type = buffer[0 ]&0xF0 ;
365384 if (type == MQTTPUBLISH) {
366385 if (callback) {
367- uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ];
386+ uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ]; /* topic length in bytes */
387+ memmove (buffer+llen+2 ,buffer+llen+3 ,tl); /* move topic inside buffer 1 byte to front */
388+ buffer[llen+2 +tl] = 0 ; /* end the topic as a 'C' string with \x00 */
389+ char *topic = (char *) buffer+llen+2 ;
368390 uint8_t *payload;
369- char topic[tl+1 ];
370- for (uint16_t i=0 ; i<tl; i++) {
371- topic[i] = buffer[llen+3 +i];
372- }
373- topic[tl] = 0 ;
374391 // msgId only present for QOS>0
375392 if ((buffer[0 ]&0x06 ) == MQTTQOS1) {
376- uint16_t msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
393+ uint16_t msgId = 0 ;
394+ msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
377395 payload = buffer+llen+3 +tl+2 ;
378396 callback (topic,payload,len-llen-3 -tl-2 );
379397
@@ -396,6 +414,9 @@ bool PubSubClient::loop()
396414 } else if (type == MQTTPINGRESP) {
397415 pingOutstanding = false ;
398416 }
417+ } else if (!connected ()) {
418+ // readPacket has closed the connection
419+ return false ;
399420 }
400421 }
401422 return true ;
@@ -422,12 +443,12 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
422443 bool retained)
423444{
424445 if (connected ()) {
425- if (MQTT_MAX_PACKET_SIZE < 5 + 2 +strlen (topic) + plength) {
446+ if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2 +strlen (topic) + plength) {
426447 // Too long
427448 return false ;
428449 }
429450 // Leave room in the buffer for header and variable length field
430- uint16_t length = 5 ;
451+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
431452 length = writeString (topic,buffer,length);
432453 uint16_t i;
433454 for (i=0 ; i<plength; i++) {
@@ -437,11 +458,16 @@ bool PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned i
437458 if (retained) {
438459 header |= 1 ;
439460 }
440- return write (header,buffer,length-5 );
461+ return write (header,buffer,length-MQTT_MAX_HEADER_SIZE );
441462 }
442463 return false ;
443464}
444465
466+ bool PubSubClient::publish_P (const char * topic, const char * payload, bool retained)
467+ {
468+ return publish_P (topic, (const uint8_t *)payload, strlen (payload), retained);
469+ }
470+
445471bool PubSubClient::publish_P (const char * topic, const uint8_t * payload, unsigned int plength,
446472 bool retained)
447473{
@@ -487,12 +513,46 @@ bool PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned
487513 return rc == tlen + 4 + plength;
488514}
489515
490- bool PubSubClient::write (uint8_t header, uint8_t * buf, uint16_t length)
516+ bool PubSubClient::beginPublish (const char * topic, unsigned int plength, bool retained)
517+ {
518+ if (connected ()) {
519+ // Send the header and variable length field
520+ uint16_t length = MQTT_MAX_HEADER_SIZE;
521+ length = writeString (topic,buffer,length);
522+ uint8_t header = MQTTPUBLISH;
523+ if (retained) {
524+ header |= 1 ;
525+ }
526+ size_t hlen = buildHeader (header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
527+ uint16_t rc = _client->write (buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
528+ lastOutActivity = millis ();
529+ return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
530+ }
531+ return false ;
532+ }
533+
534+ int PubSubClient::endPublish ()
535+ {
536+ return 1 ;
537+ }
538+
539+ size_t PubSubClient::write (uint8_t data)
540+ {
541+ lastOutActivity = millis ();
542+ return _client->write (data);
543+ }
544+
545+ size_t PubSubClient::write (const uint8_t *buffer, size_t size)
546+ {
547+ lastOutActivity = millis ();
548+ return _client->write (buffer,size);
549+ }
550+
551+ size_t PubSubClient::buildHeader (uint8_t header, uint8_t * buf, uint16_t length)
491552{
492553 uint8_t lenBuf[4 ];
493554 uint8_t llen = 0 ;
494555 uint8_t pos = 0 ;
495- uint16_t rc;
496556 uint16_t len = length;
497557 do {
498558 uint8_t digit;
@@ -507,12 +567,19 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
507567
508568 buf[4 -llen] = header;
509569 for (int i=0 ; i<llen; i++) {
510- buf[5 -llen+i] = lenBuf[i];
570+ buf[MQTT_MAX_HEADER_SIZE -llen+i] = lenBuf[i];
511571 }
572+ return llen+1 ; // Full header size is variable length bit plus the 1-byte fixed header
573+ }
574+
575+ bool PubSubClient::write (uint8_t header, uint8_t * buf, uint16_t length)
576+ {
577+ uint16_t rc;
578+ uint8_t hlen = buildHeader (header, buf, length);
512579
513580#ifdef MQTT_MAX_TRANSFER_SIZE
514- uint8_t * writeBuf = buf+(4 -llen );
515- uint16_t bytesRemaining = length+1 +llen ; // Match the length type
581+ uint8_t * writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen );
582+ uint16_t bytesRemaining = length+hlen ; // Match the length type
516583 uint8_t bytesToWrite;
517584 bool result = true ;
518585 while ((bytesRemaining > 0 ) && result) {
@@ -524,9 +591,9 @@ bool PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length)
524591 }
525592 return result;
526593#else
527- rc = _client->write (buf+(4 -llen ),length+1 +llen );
594+ rc = _client->write (buf+(MQTT_MAX_HEADER_SIZE-hlen ),length+hlen );
528595 lastOutActivity = millis ();
529- return (rc == 1 +llen +length);
596+ return (rc == hlen +length);
530597#endif
531598}
532599
@@ -537,7 +604,6 @@ bool PubSubClient::subscribe(const char* topic)
537604
538605bool PubSubClient::subscribe (const char * topic, uint8_t qos)
539606{
540- // original: if (qos < 0 || qos > 1) { (qos is uint8_t, hence qos < 0 impossible, tekka)
541607 if (qos > 1 ) {
542608 return false ;
543609 }
@@ -547,7 +613,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
547613 }
548614 if (connected ()) {
549615 // Leave room in the buffer for header and variable length field
550- uint16_t length = 5 ;
616+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
551617 nextMsgId++;
552618 if (nextMsgId == 0 ) {
553619 nextMsgId = 1 ;
@@ -556,7 +622,7 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos)
556622 buffer[length++] = (nextMsgId & 0xFF );
557623 length = writeString ((char *)topic, buffer,length);
558624 buffer[length++] = qos;
559- return write (MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5 );
625+ return write (MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE );
560626 }
561627 return false ;
562628}
@@ -568,15 +634,15 @@ bool PubSubClient::unsubscribe(const char* topic)
568634 return false ;
569635 }
570636 if (connected ()) {
571- uint16_t length = 5 ;
637+ uint16_t length = MQTT_MAX_HEADER_SIZE ;
572638 nextMsgId++;
573639 if (nextMsgId == 0 ) {
574640 nextMsgId = 1 ;
575641 }
576642 buffer[length++] = (nextMsgId >> 8 );
577643 buffer[length++] = (nextMsgId & 0xFF );
578644 length = writeString (topic, buffer,length);
579- return write (MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5 );
645+ return write (MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE );
580646 }
581647 return false ;
582648}
@@ -587,6 +653,7 @@ void PubSubClient::disconnect()
587653 buffer[1 ] = 0 ;
588654 _client->write (buffer,2 );
589655 _state = MQTT_DISCONNECTED;
656+ _client->flush ();
590657 _client->stop ();
591658 lastInActivity = lastOutActivity = millis ();
592659}
@@ -644,7 +711,6 @@ PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port)
644711 this ->port = port;
645712 return *this ;
646713}
647-
648714// cppcheck-suppress passedByValue
649715PubSubClient& PubSubClient::setCallback (MQTT_CALLBACK_SIGNATURE)
650716{
0 commit comments