Skip to content

Commit 9eca431

Browse files
committed
Create MyMQTT.cpp
MQTTGateway beta 0.1
1 parent b08e168 commit 9eca431

File tree

1 file changed

+312
-0
lines changed

1 file changed

+312
-0
lines changed

libraries/MySensors/MyMQTT.cpp

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*
2+
The MySensors library adds a new layer on top of the RF24 library.
3+
It handles radio network routing, relaying and ids.
4+
5+
Created by Henrik Ekblad <[email protected]>
6+
Modified by Daniel Wiegert
7+
This program is free software; you can redistribute it and/or
8+
modify it under the terms of the GNU General Public License
9+
version 2 as published by the Free Software Foundation.
10+
*/
11+
12+
#include "MyMQTT.h"
13+
14+
char broker[] PROGMEM = MQTT_BROKER_PREFIX;
15+
char S_0[] PROGMEM = "Temperature";
16+
char S_1[] PROGMEM = "Humidity";
17+
char S_2[] PROGMEM = "Light";
18+
char S_3[] PROGMEM = "Dimmer";
19+
char S_4[] PROGMEM = "Pressure";
20+
char S_5[] PROGMEM = "Forecast";
21+
char S_6[] PROGMEM = "Rain";
22+
char S_7[] PROGMEM = "Rainrate";
23+
char S_8[] PROGMEM = "Wind";
24+
char S_9[] PROGMEM = "Gust";
25+
char S_10[] PROGMEM = "Direction";
26+
char S_11[] PROGMEM = "UV";
27+
char S_12[] PROGMEM = "Weight";
28+
char S_13[] PROGMEM = "Distance";
29+
char S_14[] PROGMEM = "Impedance";
30+
char S_15[] PROGMEM = "Armed";
31+
char S_16[] PROGMEM = "Tripped";
32+
char S_17[] PROGMEM = "Watt";
33+
char S_18[] PROGMEM = "KWH";
34+
char S_19[] PROGMEM = "Scene_ON";
35+
char S_20[] PROGMEM = "Scene_OFF";
36+
char S_21[] PROGMEM = "Heater";
37+
char S_22[] PROGMEM = "Heater_SW";
38+
char S_23[] PROGMEM = "LightLVL";
39+
char S_24[] PROGMEM = "Var1";
40+
char S_25[] PROGMEM = "Var2";
41+
char S_26[] PROGMEM = "Var3";
42+
char S_27[] PROGMEM = "Var4";
43+
char S_28[] PROGMEM = "Var5,";
44+
char S_29[] PROGMEM = "Up";
45+
char S_30[] PROGMEM = "Down";
46+
char S_31[] PROGMEM = "Stop";
47+
char S_32[] PROGMEM = "IR_Send";
48+
char S_33[] PROGMEM = "IR_Receive";
49+
char S_34[] PROGMEM = "Flow";
50+
char S_35[] PROGMEM = "Volume";
51+
char S_36[] PROGMEM = "Lock_Status";
52+
char S_37[] PROGMEM = "Volt_Batt"; // Does not exist in mysensors lib!
53+
char S_38[] PROGMEM = "";
54+
char S_39[] PROGMEM = "";
55+
char S_40[] PROGMEM = "";
56+
char S_41[] PROGMEM = "";
57+
char S_42[] PROGMEM = "";
58+
char S_43[] PROGMEM = "";
59+
char S_44[] PROGMEM = "";
60+
char S_45[] PROGMEM = "";
61+
char S_46[] PROGMEM = "";
62+
char S_47[] PROGMEM = "";
63+
char S_48[] PROGMEM = "";
64+
char S_49[] PROGMEM = "Sketch_name"; // Does not exist in mysensors lib!
65+
char S_50[] PROGMEM = "Sketch_version"; // Does not exist in mysensors lib!
66+
67+
68+
PROGMEM const char *sType[] =
69+
{ S_0, S_1, S_2, S_3, S_4, S_5, S_6, S_7, S_8, S_9, S_10,
70+
S_11, S_12, S_13, S_14, S_15, S_16, S_17, S_18, S_19, S_20,
71+
S_21, S_22, S_23, S_24, S_25, S_26, S_27, S_28, S_29, S_30,
72+
S_31, S_32, S_33, S_34, S_35, S_36, S_37, S_38, S_39, S_40,
73+
S_41, S_42, S_43, S_44, S_45, S_46, S_47, S_48, S_49, S_50};
74+
75+
76+
MyMQTT::MyMQTT(uint8_t _cepin, uint8_t _cspin) :
77+
MySensor(_cepin, _cspin) {
78+
}
79+
80+
void MyMQTT::begin(rf24_pa_dbm_e paLevel, uint8_t channel, rf24_datarate_e dataRate, void (*inDataCallback)(char *, int *)) {
81+
Serial.begin(BAUD_RATE);
82+
repeaterMode = true;
83+
isGateway = true;
84+
MQTTClient = false;
85+
setupRepeaterMode();
86+
87+
if (inDataCallback != NULL) {
88+
useWriteCallback = true;
89+
dataCallback = inDataCallback;
90+
}
91+
else {
92+
useWriteCallback = false;
93+
}
94+
95+
nc.nodeId = 0;
96+
nc.distance = 0;
97+
98+
// Start up the radio library
99+
setupRadio(paLevel, channel, dataRate);
100+
RF24::openReadingPipe(WRITE_PIPE, BASE_RADIO_ID);
101+
RF24::openReadingPipe(CURRENT_NODE_PIPE, BASE_RADIO_ID);
102+
RF24::startListening();
103+
// Send startup log message on serial
104+
//Serial.print(PSTR("Started\n"));//TODO: progmem gives error..? error: sType causes a section type conflict with __c
105+
Serial.print("Started\n");//TODO: fix this...
106+
}
107+
108+
void MyMQTT::processRadioMessage() {
109+
if (process()) {
110+
// A new message was received from one of the sensors
111+
MyMessage message = getLastMessage();
112+
// Pass along the message from sensors to serial line
113+
SendMQTT(message);
114+
}
115+
116+
}
117+
118+
void MyMQTT::processMQTTMessage(char *inputString, int inputPos) {
119+
char *str, *p, *value=NULL;
120+
char i = 0;
121+
buffer[0]= 0;
122+
buffsize = 0;
123+
#ifdef TCPDUMP
124+
Serial.print("<<");
125+
char buf[4];
126+
for (int a=0; a<inputPos; a++) { sprintf(buf,"%02X ",(byte)inputString[a]); Serial.print(buf); } Serial.println("");
127+
#endif
128+
if ((byte)inputString[0] >> 4 == MQTTCONNECT) {
129+
buffer[buffsize++] = MQTTCONNACK << 4;
130+
buffer[buffsize++] = 0x02; // Remaining length
131+
buffer[buffsize++] = 0x00; // Connection accepted
132+
buffer[buffsize++] = 0x00; // Reserved
133+
MQTTClient=true;
134+
}
135+
if ((byte)inputString[0] >> 4 == MQTTPINGREQ) {
136+
buffer[buffsize++] = MQTTPINGRESP << 4;
137+
buffer[buffsize++] = 0x00;
138+
}
139+
if ((byte)inputString[0] >> 4 == MQTTSUBSCRIBE) {
140+
buffer[buffsize++] = MQTTSUBACK << 4; // Just ack everything, we actually dont really care!
141+
buffer[buffsize++] = 0x03; // Remaining length
142+
buffer[buffsize++] = (byte)inputString[2]; // Message ID MSB
143+
buffer[buffsize++] = (byte)inputString[3]; // Message ID LSB
144+
buffer[buffsize++] = MQTTQOS0; // QOS level
145+
}
146+
if ((byte)inputString[0] >> 4 == MQTTUNSUBSCRIBE) {
147+
buffer[buffsize++] = MQTTUNSUBACK << 4;
148+
buffer[buffsize++] = 0x02; // Remaining length
149+
buffer[buffsize++] = (byte)inputString[2]; // Message ID MSB
150+
buffer[buffsize++] = (byte)inputString[3]; // Message ID LSB
151+
}
152+
if ((byte)inputString[0] >> 4 == MQTTDISCONNECT) {
153+
MQTTClient=false;
154+
}
155+
if (buffsize > 0) {
156+
#ifdef TCPDUMP
157+
Serial.print(">>");
158+
char buf[4];
159+
for (int a=0; a<buffsize; a++) { sprintf(buf,"%02X ",(byte)buffer[a]); Serial.print(buf); } Serial.println("");
160+
#endif
161+
dataCallback(buffer,&buffsize);
162+
}
163+
// We publish everything we get, we dont care if its subscribed or not!
164+
if ((byte)inputString[0] >> 4 == MQTTPUBLISH || (MQTT_SEND_SUBSCRIPTION && (byte)inputString[0] >> 4 == MQTTSUBSCRIBE)) {
165+
buffer[0]= 0;
166+
buffsize = 0;
167+
if ((byte)inputString[0] >> 4 == MQTTSUBSCRIBE) {
168+
strncat(buffer,inputString+6,inputString[5]);
169+
} else {
170+
strncat(buffer,inputString+4,inputString[3]);
171+
}
172+
msg.sender = GATEWAY_ADDRESS;
173+
for (str = strtok_r(buffer, "/", &p); // split using semicolon
174+
str && i < 4; str = strtok_r(NULL, "/", &p) //get subsequent tokens (?)
175+
) {
176+
switch (i) {
177+
case 0: // Radioid (destination)
178+
if (strcmp_P(str,broker)!=0) {
179+
return; //Message not for us or malformatted!
180+
}
181+
break;
182+
case 1:
183+
msg.destination = atoi(str);
184+
break;
185+
case 2:
186+
msg.sensor = atoi(str);
187+
break;
188+
case 3:
189+
char match=0;
190+
for (int j=0; strcpy_P(convBuf, (char*)pgm_read_word(&(sType[j]))) ; j++) {
191+
if (strcmp(str,convBuf)==0) {
192+
match=j;
193+
break;
194+
}
195+
}
196+
msg.type = match;
197+
break;
198+
}
199+
i++;
200+
}
201+
if ((char)inputString[1] > (char)(inputString[3]+2) && !((byte)inputString[0] >> 4 == MQTTSUBSCRIBE)) {
202+
strcpy(convBuf,inputString+(inputString[3]+4));
203+
msg.set(convBuf);
204+
} else {
205+
msg.set("");
206+
}
207+
msg.sender = GATEWAY_ADDRESS;
208+
mSetCommand(msg,C_SET);
209+
mSetRequestAck(msg,false);
210+
mSetAck(msg,false);
211+
mSetVersion(msg, PROTOCOL_VERSION);
212+
sendRoute(msg);
213+
}
214+
}
215+
216+
217+
void MyMQTT::SendMQTT(MyMessage &msg) {
218+
//serial(PSTR("%d;%d;%d;%d;%s\n"),msg.sender, msg.sensor, mGetCommand(msg), msg.type, msg.getString(convBuf));
219+
buffsize = 0;
220+
if (!MQTTClient) return;
221+
if (msg.isAck()) {
222+
Serial.println("msg is ack!");
223+
if (msg.sender==255 && mGetCommand(msg)==C_INTERNAL && msg.type==I_ID_REQUEST) {
224+
// TODO: sending ACK request on id_response fucks node up. doesn't work.
225+
// The idea was to confirm id and save to EEPROM_LATEST_NODE_ADDRESS.
226+
}
227+
} else {
228+
// we have to check every message if its a newly assigned id or not.
229+
// Ack on I_ID_RESPONSE does not work, and checking on C_PRESENTATION isn't reliable.
230+
char newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
231+
if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID;
232+
if (msg.sender==newNodeID) {
233+
saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID);
234+
}
235+
if (mGetCommand(msg)==C_INTERNAL && msg.type==I_CONFIG) { // CONFIG
236+
// As for now there is only one 'config' request.
237+
// We force SI! Resistance is futile!
238+
//
239+
// Todo : Support for more config types, Maybe just read from own EEPROM?
240+
// Use internal EEPROM_CONTROLLER_CONFIG_ADDRESS and special MQTT address to write to
241+
// EEPROM_CONTROLLER_CONFIG_ADDRESS & EEPROM_LOCAL_CONFIG_ADDRESS
242+
msg.destination = msg.sender; //NodeID
243+
msg.sender = GATEWAY_ADDRESS;
244+
msg.sensor = 255; //SensorID
245+
msg.type = I_CONFIG; //msgType
246+
mSetCommand(msg,C_INTERNAL); //Subtype
247+
mSetRequestAck(msg,false);
248+
mSetAck(msg,false);
249+
msg.set("M");
250+
mSetVersion(msg, PROTOCOL_VERSION);
251+
sendRoute(msg);
252+
} else if (mGetCommand(msg)==C_PRESENTATION && (msg.type==S_ARDUINO_NODE || msg.type==S_ARDUINO_REPEATER_NODE)) {
253+
//Doesnt work to check new sensorID here, this message does not always arrive.. See above.
254+
} else if (msg.sender==255 && mGetCommand(msg)==C_INTERNAL && msg.type==I_ID_REQUEST) {
255+
char newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
256+
if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID;
257+
if (newNodeID >= MQTT_LAST_SENSORID) return; // Sorry no more id's left :(
258+
msg.destination = msg.sender; //NodeID
259+
msg.sender = GATEWAY_ADDRESS;
260+
msg.sensor = 255; //SensorID
261+
msg.type = I_ID_RESPONSE; //MsgType
262+
mSetCommand(msg,C_INTERNAL); //Subtype
263+
mSetRequestAck(msg,false); //Request ack doesn't work, node/gateway gets stuck in enless loop.
264+
mSetAck(msg,false);
265+
msg.set((uint8_t)newNodeID); //Payload
266+
mSetVersion(msg, PROTOCOL_VERSION);
267+
sendRoute(msg);
268+
// if (sendRoute(msg)) saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID); // If send OK save to eeprom. DOES NOT ALWAYS RETURN 'OK'!?
269+
} else if (mGetCommand(msg)!=0) {
270+
if (mGetCommand(msg)==3) msg.type=msg.type+38;
271+
buffer[buffsize++] = MQTTPUBLISH << 4; // 0:
272+
buffer[buffsize++] = 0x09; // 1: Remaining length with no payload, we'll set this later to correct value, buffsize -2
273+
buffer[buffsize++] = 0x00; // 2: Length MSB (Remaing length can never exceed ff,so MSB must be 0!)
274+
buffer[buffsize++] = 0x08; // 3: Length LSB (ADDR), We'll set this later
275+
strcpy_P(buffer+4, broker);
276+
buffsize+=strlen_P(broker);
277+
buffsize+=sprintf(&buffer[buffsize],"/%i/%i/",msg.sender,msg.sensor);
278+
buffsize+=strncpysType_retL(buffer,msg.type,buffsize);
279+
buffer[3]=buffsize-4; // Set correct address length on byte 4.
280+
#ifdef DEBUG
281+
Serial.println((char*)&buffer[4]);
282+
#endif
283+
msg.getString(convBuf);
284+
for (int a=0; a<strlen(convBuf); a++) { // Payload
285+
buffer[buffsize++] = convBuf[a];
286+
}
287+
buffer[1]=buffsize-2; // Set correct Remaining length on byte 2.
288+
289+
#ifdef TCPDUMP
290+
Serial.print(">>");
291+
char buf[4];
292+
for (int a=0; a<buffsize; a++) { sprintf(buf,"%02X ",(byte)buffer[a]); Serial.print(buf); } Serial.println("");
293+
#endif
294+
dataCallback(buffer,&buffsize);
295+
}
296+
}
297+
}
298+
299+
char MyMQTT::strncpysType_retL(char *str, char index, char start) {
300+
char c;
301+
char l;
302+
char *p = (char *)pgm_read_word(&(sType[index]));
303+
str+=start;
304+
while ((c = pgm_read_byte(p))) {
305+
*str=c;
306+
str++;
307+
p++;
308+
l++;
309+
}
310+
*str=0;
311+
return l;
312+
}

0 commit comments

Comments
 (0)