Skip to content

Commit 7c82728

Browse files
committed
Merge pull request arduino#80 from mapnull/hotfix-mqtt-read_mqtt_packets
MQTTGateway: fix garbage in MQTT payload
2 parents ba1ecd8 + 11a0032 commit 7c82728

File tree

1 file changed

+44
-29
lines changed

1 file changed

+44
-29
lines changed

libraries/MySensors/examples/MQTTGateway/MQTTGateway.ino

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ http://forum.mysensors.org/topic/303/mqtt-broker-gateway
6161
#include <MyMQTT.h>
6262
#include <Ethernet.h>
6363

64-
6564
// * Use this for IBOARD modded to use standard MISO/MOSI/SCK, see note *1 above!
6665
/*
6766
#define RADIO_CE_PIN 3 // radio chip enable
6867
#define RADIO_SPI_SS_PIN 8 // radio SPI serial select
6968
#define RADIO_ERROR_LED_PIN A2 // Error led pin
7069
#define RADIO_RX_LED_PIN A1 // Receive led pin
71-
#define RADIO_TX_LED_PIN A0 // the PCB, on board LED*/
70+
#define RADIO_TX_LED_PIN A0 // the PCB, on board LED
71+
*/
7272

7373
// * Use this for default configured pro mini / nano etc :
7474
///*
@@ -88,42 +88,57 @@ EthernetServer server = EthernetServer(TCP_PORT);
8888
MyMQTT gw(RADIO_CE_PIN, RADIO_SPI_SS_PIN);
8989

9090
void processEthernetMessages() {
91-
char inputString[MQTT_MAX_PACKET_SIZE] = "";
92-
uint8_t inputSize = 0;
93-
EthernetClient client = server.available();
94-
if (client) {
95-
while (client.available()) {
96-
char inChar = client.read();
97-
inputString[inputSize] = inChar;
98-
inputSize++;
99-
}
91+
char inputString[MQTT_MAX_PACKET_SIZE] = "";
92+
uint8_t inputSize = 0;
93+
uint8_t readCnt = 0;
94+
uint8_t length = 0;
95+
96+
EthernetClient client = server.available();
97+
if (client) {
98+
while (client.available()) {
99+
uint8_t inChar = client.read();
100+
readCnt++;
101+
102+
if (inputSize < MQTT_MAX_PACKET_SIZE-1) {
103+
inputString[inputSize] = (char)inChar;
104+
inputSize++;
105+
}
106+
107+
if (readCnt == 2) {
108+
length = (inChar & 127) * 1;
109+
}
110+
if (readCnt == (length+2)) {
111+
break;
112+
}
113+
}
114+
inputString[inputSize] = 0;
100115
#ifdef TCPDUMP
101-
Serial.print("<<");
102-
char buf[4];
103-
for (uint8_t a=0; a<inputSize; a++) { sprintf(buf,"%02X ", (uint8_t)inputString[a]); Serial.print(buf); } Serial.println("");
116+
Serial.print("<<");
117+
char buf[4];
118+
for (uint8_t a=0; a<inputSize; a++) { sprintf(buf, "%02X ", (uint8_t)inputString[a]); Serial.print(buf); } Serial.println();
104119
#endif
105-
gw.processMQTTMessage(inputString, inputSize);
106-
}
120+
gw.processMQTTMessage(inputString, inputSize);
121+
}
107122
}
108123

109124
void writeEthernet(const char *writeBuffer, uint8_t *writeSize) {
110125
#ifdef TCPDUMP
111-
Serial.print(">>");
112-
char buf[4];
113-
for (uint8_t a=0; a<*writeSize; a++) { sprintf(buf,"%02X ",(uint8_t)writeBuffer[a]); Serial.print(buf); } Serial.println("");
126+
Serial.print(">>");
127+
char buf[4];
128+
for (uint8_t a=0; a<*writeSize; a++) { sprintf(buf,"%02X ",(uint8_t)writeBuffer[a]); Serial.print(buf); } Serial.println();
114129
#endif
115-
server.write((const uint8_t *)writeBuffer, *writeSize);
130+
server.write((const uint8_t *)writeBuffer, *writeSize);
116131
}
117132

118133

119134
int main(void) {
120-
init();
121-
Ethernet.begin(TCP_MAC, TCP_IP);
122-
delay(1000); // Wait for Ethernet to get configured.
123-
gw.begin(RF24_PA_LEVEL_GW, RF24_CHANNEL, RF24_DATARATE, writeEthernet, RADIO_RX_LED_PIN, RADIO_TX_LED_PIN, RADIO_ERROR_LED_PIN);
124-
server.begin();
125-
while (1) {
126-
processEthernetMessages();
127-
gw.processRadioMessage();
128-
}
135+
init();
136+
Ethernet.begin(TCP_MAC, TCP_IP);
137+
delay(1000); // Wait for Ethernet to get configured.
138+
gw.begin(RF24_PA_LEVEL_GW, RF24_CHANNEL, RF24_DATARATE, writeEthernet, RADIO_RX_LED_PIN, RADIO_TX_LED_PIN, RADIO_ERROR_LED_PIN);
139+
server.begin();
140+
while (1) {
141+
processEthernetMessages();
142+
gw.processRadioMessage();
143+
}
129144
}

0 commit comments

Comments
 (0)