|
| 1 | +/* |
| 2 | +The MySensors library adds a new layer on top of the RF24 library. |
| 3 | +It handles radio network routing, relaying and ids. |
| 4 | +
|
| 5 | +Created by Henrik Ekblad <[email protected]> |
| 6 | +Modified by Daniel Wiegert |
| 7 | +This program is free software; you can redistribute it and/or |
| 8 | +modify it under the terms of the GNU General Public License |
| 9 | +version 2 as published by the Free Software Foundation. |
| 10 | +*/ |
| 11 | + |
| 12 | +#include "MyMQTT.h" |
| 13 | + |
| 14 | +char broker[] PROGMEM = MQTT_BROKER_PREFIX; |
| 15 | +char S_0[] PROGMEM = "Temperature"; |
| 16 | +char S_1[] PROGMEM = "Humidity"; |
| 17 | +char S_2[] PROGMEM = "Light"; |
| 18 | +char S_3[] PROGMEM = "Dimmer"; |
| 19 | +char S_4[] PROGMEM = "Pressure"; |
| 20 | +char S_5[] PROGMEM = "Forecast"; |
| 21 | +char S_6[] PROGMEM = "Rain"; |
| 22 | +char S_7[] PROGMEM = "Rainrate"; |
| 23 | +char S_8[] PROGMEM = "Wind"; |
| 24 | +char S_9[] PROGMEM = "Gust"; |
| 25 | +char S_10[] PROGMEM = "Direction"; |
| 26 | +char S_11[] PROGMEM = "UV"; |
| 27 | +char S_12[] PROGMEM = "Weight"; |
| 28 | +char S_13[] PROGMEM = "Distance"; |
| 29 | +char S_14[] PROGMEM = "Impedance"; |
| 30 | +char S_15[] PROGMEM = "Armed"; |
| 31 | +char S_16[] PROGMEM = "Tripped"; |
| 32 | +char S_17[] PROGMEM = "Watt"; |
| 33 | +char S_18[] PROGMEM = "KWH"; |
| 34 | +char S_19[] PROGMEM = "Scene_ON"; |
| 35 | +char S_20[] PROGMEM = "Scene_OFF"; |
| 36 | +char S_21[] PROGMEM = "Heater"; |
| 37 | +char S_22[] PROGMEM = "Heater_SW"; |
| 38 | +char S_23[] PROGMEM = "LightLVL"; |
| 39 | +char S_24[] PROGMEM = "Var1"; |
| 40 | +char S_25[] PROGMEM = "Var2"; |
| 41 | +char S_26[] PROGMEM = "Var3"; |
| 42 | +char S_27[] PROGMEM = "Var4"; |
| 43 | +char S_28[] PROGMEM = "Var5,"; |
| 44 | +char S_29[] PROGMEM = "Up"; |
| 45 | +char S_30[] PROGMEM = "Down"; |
| 46 | +char S_31[] PROGMEM = "Stop"; |
| 47 | +char S_32[] PROGMEM = "IR_Send"; |
| 48 | +char S_33[] PROGMEM = "IR_Receive"; |
| 49 | +char S_34[] PROGMEM = "Flow"; |
| 50 | +char S_35[] PROGMEM = "Volume"; |
| 51 | +char S_36[] PROGMEM = "Lock_Status"; |
| 52 | +char S_37[] PROGMEM = "Volt_Batt"; // Does not exist in mysensors lib! |
| 53 | +char S_38[] PROGMEM = ""; |
| 54 | +char S_39[] PROGMEM = ""; |
| 55 | +char S_40[] PROGMEM = ""; |
| 56 | +char S_41[] PROGMEM = ""; |
| 57 | +char S_42[] PROGMEM = ""; |
| 58 | +char S_43[] PROGMEM = ""; |
| 59 | +char S_44[] PROGMEM = ""; |
| 60 | +char S_45[] PROGMEM = ""; |
| 61 | +char S_46[] PROGMEM = ""; |
| 62 | +char S_47[] PROGMEM = ""; |
| 63 | +char S_48[] PROGMEM = ""; |
| 64 | +char S_49[] PROGMEM = "Sketch_name"; // Does not exist in mysensors lib! |
| 65 | +char S_50[] PROGMEM = "Sketch_version"; // Does not exist in mysensors lib! |
| 66 | + |
| 67 | + |
| 68 | +PROGMEM const char *sType[] = |
| 69 | +{ S_0, S_1, S_2, S_3, S_4, S_5, S_6, S_7, S_8, S_9, S_10, |
| 70 | + S_11, S_12, S_13, S_14, S_15, S_16, S_17, S_18, S_19, S_20, |
| 71 | + S_21, S_22, S_23, S_24, S_25, S_26, S_27, S_28, S_29, S_30, |
| 72 | + S_31, S_32, S_33, S_34, S_35, S_36, S_37, S_38, S_39, S_40, |
| 73 | + S_41, S_42, S_43, S_44, S_45, S_46, S_47, S_48, S_49, S_50}; |
| 74 | + |
| 75 | + |
| 76 | +MyMQTT::MyMQTT(uint8_t _cepin, uint8_t _cspin) : |
| 77 | +MySensor(_cepin, _cspin) { |
| 78 | +} |
| 79 | + |
| 80 | +void MyMQTT::begin(rf24_pa_dbm_e paLevel, uint8_t channel, rf24_datarate_e dataRate, void (*inDataCallback)(char *, int *)) { |
| 81 | + Serial.begin(BAUD_RATE); |
| 82 | + repeaterMode = true; |
| 83 | + isGateway = true; |
| 84 | + MQTTClient = false; |
| 85 | + setupRepeaterMode(); |
| 86 | + |
| 87 | + if (inDataCallback != NULL) { |
| 88 | + useWriteCallback = true; |
| 89 | + dataCallback = inDataCallback; |
| 90 | + } |
| 91 | + else { |
| 92 | + useWriteCallback = false; |
| 93 | + } |
| 94 | + |
| 95 | + nc.nodeId = 0; |
| 96 | + nc.distance = 0; |
| 97 | + |
| 98 | + // Start up the radio library |
| 99 | + setupRadio(paLevel, channel, dataRate); |
| 100 | + RF24::openReadingPipe(WRITE_PIPE, BASE_RADIO_ID); |
| 101 | + RF24::openReadingPipe(CURRENT_NODE_PIPE, BASE_RADIO_ID); |
| 102 | + RF24::startListening(); |
| 103 | + // Send startup log message on serial |
| 104 | + //Serial.print(PSTR("Started\n"));//TODO: progmem gives error..? error: sType causes a section type conflict with __c |
| 105 | + Serial.print("Started\n");//TODO: fix this... |
| 106 | +} |
| 107 | + |
| 108 | +void MyMQTT::processRadioMessage() { |
| 109 | + if (process()) { |
| 110 | + // A new message was received from one of the sensors |
| 111 | + MyMessage message = getLastMessage(); |
| 112 | + // Pass along the message from sensors to serial line |
| 113 | + SendMQTT(message); |
| 114 | + } |
| 115 | + |
| 116 | +} |
| 117 | + |
| 118 | +void MyMQTT::processMQTTMessage(char *inputString, int inputPos) { |
| 119 | + char *str, *p, *value=NULL; |
| 120 | + char i = 0; |
| 121 | + buffer[0]= 0; |
| 122 | + buffsize = 0; |
| 123 | +#ifdef TCPDUMP |
| 124 | + Serial.print("<<"); |
| 125 | + char buf[4]; |
| 126 | + for (int a=0; a<inputPos; a++) { sprintf(buf,"%02X ",(byte)inputString[a]); Serial.print(buf); } Serial.println(""); |
| 127 | +#endif |
| 128 | + if ((byte)inputString[0] >> 4 == MQTTCONNECT) { |
| 129 | + buffer[buffsize++] = MQTTCONNACK << 4; |
| 130 | + buffer[buffsize++] = 0x02; // Remaining length |
| 131 | + buffer[buffsize++] = 0x00; // Connection accepted |
| 132 | + buffer[buffsize++] = 0x00; // Reserved |
| 133 | + MQTTClient=true; |
| 134 | + } |
| 135 | + if ((byte)inputString[0] >> 4 == MQTTPINGREQ) { |
| 136 | + buffer[buffsize++] = MQTTPINGRESP << 4; |
| 137 | + buffer[buffsize++] = 0x00; |
| 138 | + } |
| 139 | + if ((byte)inputString[0] >> 4 == MQTTSUBSCRIBE) { |
| 140 | + buffer[buffsize++] = MQTTSUBACK << 4; // Just ack everything, we actually dont really care! |
| 141 | + buffer[buffsize++] = 0x03; // Remaining length |
| 142 | + buffer[buffsize++] = (byte)inputString[2]; // Message ID MSB |
| 143 | + buffer[buffsize++] = (byte)inputString[3]; // Message ID LSB |
| 144 | + buffer[buffsize++] = MQTTQOS0; // QOS level |
| 145 | + } |
| 146 | + if ((byte)inputString[0] >> 4 == MQTTUNSUBSCRIBE) { |
| 147 | + buffer[buffsize++] = MQTTUNSUBACK << 4; |
| 148 | + buffer[buffsize++] = 0x02; // Remaining length |
| 149 | + buffer[buffsize++] = (byte)inputString[2]; // Message ID MSB |
| 150 | + buffer[buffsize++] = (byte)inputString[3]; // Message ID LSB |
| 151 | + } |
| 152 | + if ((byte)inputString[0] >> 4 == MQTTDISCONNECT) { |
| 153 | + MQTTClient=false; |
| 154 | + } |
| 155 | + if (buffsize > 0) { |
| 156 | +#ifdef TCPDUMP |
| 157 | + Serial.print(">>"); |
| 158 | + char buf[4]; |
| 159 | + for (int a=0; a<buffsize; a++) { sprintf(buf,"%02X ",(byte)buffer[a]); Serial.print(buf); } Serial.println(""); |
| 160 | +#endif |
| 161 | + dataCallback(buffer,&buffsize); |
| 162 | + } |
| 163 | + // We publish everything we get, we dont care if its subscribed or not! |
| 164 | + if ((byte)inputString[0] >> 4 == MQTTPUBLISH || (MQTT_SEND_SUBSCRIPTION && (byte)inputString[0] >> 4 == MQTTSUBSCRIBE)) { |
| 165 | + buffer[0]= 0; |
| 166 | + buffsize = 0; |
| 167 | + if ((byte)inputString[0] >> 4 == MQTTSUBSCRIBE) { |
| 168 | + strncat(buffer,inputString+6,inputString[5]); |
| 169 | + } else { |
| 170 | + strncat(buffer,inputString+4,inputString[3]); |
| 171 | + } |
| 172 | + msg.sender = GATEWAY_ADDRESS; |
| 173 | + for (str = strtok_r(buffer, "/", &p); // split using semicolon |
| 174 | + str && i < 4; str = strtok_r(NULL, "/", &p) //get subsequent tokens (?) |
| 175 | + ) { |
| 176 | + switch (i) { |
| 177 | + case 0: // Radioid (destination) |
| 178 | + if (strcmp_P(str,broker)!=0) { |
| 179 | + return; //Message not for us or malformatted! |
| 180 | + } |
| 181 | + break; |
| 182 | + case 1: |
| 183 | + msg.destination = atoi(str); |
| 184 | + break; |
| 185 | + case 2: |
| 186 | + msg.sensor = atoi(str); |
| 187 | + break; |
| 188 | + case 3: |
| 189 | + char match=0; |
| 190 | + for (int j=0; strcpy_P(convBuf, (char*)pgm_read_word(&(sType[j]))) ; j++) { |
| 191 | + if (strcmp(str,convBuf)==0) { |
| 192 | + match=j; |
| 193 | + break; |
| 194 | + } |
| 195 | + } |
| 196 | + msg.type = match; |
| 197 | + break; |
| 198 | + } |
| 199 | + i++; |
| 200 | + } |
| 201 | + if ((char)inputString[1] > (char)(inputString[3]+2) && !((byte)inputString[0] >> 4 == MQTTSUBSCRIBE)) { |
| 202 | + strcpy(convBuf,inputString+(inputString[3]+4)); |
| 203 | + msg.set(convBuf); |
| 204 | + } else { |
| 205 | + msg.set(""); |
| 206 | + } |
| 207 | + msg.sender = GATEWAY_ADDRESS; |
| 208 | + mSetCommand(msg,C_SET); |
| 209 | + mSetRequestAck(msg,false); |
| 210 | + mSetAck(msg,false); |
| 211 | + mSetVersion(msg, PROTOCOL_VERSION); |
| 212 | + sendRoute(msg); |
| 213 | + } |
| 214 | +} |
| 215 | + |
| 216 | + |
| 217 | +void MyMQTT::SendMQTT(MyMessage &msg) { |
| 218 | +//serial(PSTR("%d;%d;%d;%d;%s\n"),msg.sender, msg.sensor, mGetCommand(msg), msg.type, msg.getString(convBuf)); |
| 219 | + buffsize = 0; |
| 220 | + if (!MQTTClient) return; |
| 221 | + if (msg.isAck()) { |
| 222 | +Serial.println("msg is ack!"); |
| 223 | + if (msg.sender==255 && mGetCommand(msg)==C_INTERNAL && msg.type==I_ID_REQUEST) { |
| 224 | + // TODO: sending ACK request on id_response fucks node up. doesn't work. |
| 225 | + // The idea was to confirm id and save to EEPROM_LATEST_NODE_ADDRESS. |
| 226 | + } |
| 227 | + } else { |
| 228 | + // we have to check every message if its a newly assigned id or not. |
| 229 | + // Ack on I_ID_RESPONSE does not work, and checking on C_PRESENTATION isn't reliable. |
| 230 | + char newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1; |
| 231 | + if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID; |
| 232 | + if (msg.sender==newNodeID) { |
| 233 | + saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID); |
| 234 | + } |
| 235 | + if (mGetCommand(msg)==C_INTERNAL && msg.type==I_CONFIG) { // CONFIG |
| 236 | + // As for now there is only one 'config' request. |
| 237 | + // We force SI! Resistance is futile! |
| 238 | + // |
| 239 | + // Todo : Support for more config types, Maybe just read from own EEPROM? |
| 240 | + // Use internal EEPROM_CONTROLLER_CONFIG_ADDRESS and special MQTT address to write to |
| 241 | + // EEPROM_CONTROLLER_CONFIG_ADDRESS & EEPROM_LOCAL_CONFIG_ADDRESS |
| 242 | + msg.destination = msg.sender; //NodeID |
| 243 | + msg.sender = GATEWAY_ADDRESS; |
| 244 | + msg.sensor = 255; //SensorID |
| 245 | + msg.type = I_CONFIG; //msgType |
| 246 | + mSetCommand(msg,C_INTERNAL); //Subtype |
| 247 | + mSetRequestAck(msg,false); |
| 248 | + mSetAck(msg,false); |
| 249 | + msg.set("M"); |
| 250 | + mSetVersion(msg, PROTOCOL_VERSION); |
| 251 | + sendRoute(msg); |
| 252 | + } else if (mGetCommand(msg)==C_PRESENTATION && (msg.type==S_ARDUINO_NODE || msg.type==S_ARDUINO_REPEATER_NODE)) { |
| 253 | + //Doesnt work to check new sensorID here, this message does not always arrive.. See above. |
| 254 | + } else if (msg.sender==255 && mGetCommand(msg)==C_INTERNAL && msg.type==I_ID_REQUEST) { |
| 255 | + char newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1; |
| 256 | + if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID; |
| 257 | + if (newNodeID >= MQTT_LAST_SENSORID) return; // Sorry no more id's left :( |
| 258 | + msg.destination = msg.sender; //NodeID |
| 259 | + msg.sender = GATEWAY_ADDRESS; |
| 260 | + msg.sensor = 255; //SensorID |
| 261 | + msg.type = I_ID_RESPONSE; //MsgType |
| 262 | + mSetCommand(msg,C_INTERNAL); //Subtype |
| 263 | + mSetRequestAck(msg,false); //Request ack doesn't work, node/gateway gets stuck in enless loop. |
| 264 | + mSetAck(msg,false); |
| 265 | + msg.set((uint8_t)newNodeID); //Payload |
| 266 | + mSetVersion(msg, PROTOCOL_VERSION); |
| 267 | + sendRoute(msg); |
| 268 | +// if (sendRoute(msg)) saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID); // If send OK save to eeprom. DOES NOT ALWAYS RETURN 'OK'!? |
| 269 | + } else if (mGetCommand(msg)!=0) { |
| 270 | + if (mGetCommand(msg)==3) msg.type=msg.type+38; |
| 271 | + buffer[buffsize++] = MQTTPUBLISH << 4; // 0: |
| 272 | + buffer[buffsize++] = 0x09; // 1: Remaining length with no payload, we'll set this later to correct value, buffsize -2 |
| 273 | + buffer[buffsize++] = 0x00; // 2: Length MSB (Remaing length can never exceed ff,so MSB must be 0!) |
| 274 | + buffer[buffsize++] = 0x08; // 3: Length LSB (ADDR), We'll set this later |
| 275 | + strcpy_P(buffer+4, broker); |
| 276 | + buffsize+=strlen_P(broker); |
| 277 | + buffsize+=sprintf(&buffer[buffsize],"/%i/%i/",msg.sender,msg.sensor); |
| 278 | + buffsize+=strncpysType_retL(buffer,msg.type,buffsize); |
| 279 | + buffer[3]=buffsize-4; // Set correct address length on byte 4. |
| 280 | +#ifdef DEBUG |
| 281 | + Serial.println((char*)&buffer[4]); |
| 282 | +#endif |
| 283 | + msg.getString(convBuf); |
| 284 | + for (int a=0; a<strlen(convBuf); a++) { // Payload |
| 285 | + buffer[buffsize++] = convBuf[a]; |
| 286 | + } |
| 287 | + buffer[1]=buffsize-2; // Set correct Remaining length on byte 2. |
| 288 | + |
| 289 | +#ifdef TCPDUMP |
| 290 | + Serial.print(">>"); |
| 291 | + char buf[4]; |
| 292 | + for (int a=0; a<buffsize; a++) { sprintf(buf,"%02X ",(byte)buffer[a]); Serial.print(buf); } Serial.println(""); |
| 293 | +#endif |
| 294 | + dataCallback(buffer,&buffsize); |
| 295 | + } |
| 296 | + } |
| 297 | +} |
| 298 | + |
| 299 | +char MyMQTT::strncpysType_retL(char *str, char index, char start) { |
| 300 | + char c; |
| 301 | + char l; |
| 302 | + char *p = (char *)pgm_read_word(&(sType[index])); |
| 303 | + str+=start; |
| 304 | + while ((c = pgm_read_byte(p))) { |
| 305 | + *str=c; |
| 306 | + str++; |
| 307 | + p++; |
| 308 | + l++; |
| 309 | + } |
| 310 | + *str=0; |
| 311 | + return l; |
| 312 | +} |
0 commit comments