Skip to content

Commit 4a09a77

Browse files
authored
Merge pull request #8 from myDevicesIoT/feature/mqtt-support
Feature/mqtt support
2 parents 96eb53c + 1e77550 commit 4a09a77

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2401
-2041
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ Hardware Info
170170
Hardware info, including make, model, etc. is retrieved via ``myDevices.system.hardware.py``. This should be modified or overridden to provide the appropriate hardware info for your board.
171171

172172
Pin Mapping
173-
The mapping of the on-board pins is provided in ``myDevices.system.hardware.py`` with the ``MAPPING`` list. This list provides the available GPIO pin numbers as well as the voltage ("V33", "V50"), ground ("GND") and do-not-connect ("DNC") pins. This should be updated with the mapping for your board. However, the Cayenne dashboard is currently built to display the Raspberry Pi GPIO layout so if your board's pin layout is significantly different it may not display correctly in the GPIO tab.
173+
The mapping of the on-board pins is provided in ``myDevices.devices.digital.gpio.py`` with the ``MAPPING`` list. This list provides the available GPIO pin numbers as well as the voltage ("V33", "V50"), ground ("GND") and do-not-connect ("DNC") pins. This should be updated with the mapping for your board. However, the Cayenne dashboard is currently built to display the Raspberry Pi GPIO layout so if your board's pin layout is significantly different it may not display correctly in the GPIO tab.
174174

175175
Settings
176176
--------

myDevices-test.tar.gz

-77.8 KB
Binary file not shown.

myDevices/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1-
1+
"""
2+
This package contains the Cayenne agent, which is a full featured client for the Cayenne IoT project builder: https://cayenne.mydevices.com. It sends system information as well as sensor and actuator data and responds to actuator messages initiated from the Cayenne dashboard and mobile apps.
3+
"""
4+
__version__ = '2.0.0'

myDevices/__main__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,20 @@ def setMemoryLimit(rsrc, megs=200):
2929
pidfile = '/var/run/myDevices/cayenne.pid'
3030
def signal_handler(signal, frame):
3131
"""Handle program interrupt so the agent can exit cleanly"""
32-
if client:
32+
if client and client.connected:
3333
if signal == SIGINT:
3434
info('Program interrupt received, client exiting')
3535
client.Destroy()
3636
remove(pidfile)
3737
else:
3838
client.Restart()
39+
elif signal == SIGINT:
40+
remove(pidfile)
41+
raise SystemExit
3942
signal(SIGUSR1, signal_handler)
4043
signal(SIGINT, signal_handler)
4144

45+
4246
def exceptionHook(exc_type, exc_value, exc_traceback):
4347
"""Make sure any uncaught exceptions are logged"""
4448
debug('Daemon::exceptionHook ')
@@ -89,8 +93,7 @@ def writePidToFile(pidfile):
8993
with open(pidfile, 'r') as file:
9094
pid = int(file.read())
9195
if ProcessInfo.IsRunning(pid) and pid != getpid():
92-
Daemon.Exit()
93-
return
96+
raise SystemExit
9497
pid = str(getpid())
9598
with open(pidfile, 'w') as file:
9699
file.write(pid)
@@ -122,11 +125,12 @@ def main(argv):
122125
writePidToFile(pidfile)
123126
logToFile(logfile)
124127
config = Config(configfile)
125-
HOST = config.get('CONFIG', 'ServerAddress', 'cloud.mydevices.com')
126-
PORT = config.getInt('CONFIG', 'ServerPort', 8181)
128+
HOST = config.get('CONFIG', 'ServerAddress', 'mqtt.mydevices.com')
129+
PORT = config.getInt('CONFIG', 'ServerPort', 8883)
127130
CayenneApiHost = config.get('CONFIG', 'CayenneApi', 'https://api.mydevices.com')
128131
global client
129132
client = CloudServerClient(HOST, PORT, CayenneApiHost)
133+
client.Start()
130134

131135
if __name__ == "__main__":
132136
try:

myDevices/cloud/apiclient.py

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
from concurrent.futures import ThreadPoolExecutor
33
import json
44
from myDevices.utils.logger import error, exception
5+
from myDevices.system.hardware import Hardware
6+
from myDevices.system.systeminfo import SystemInfo
7+
from myDevices.cloud import cayennemqtt
8+
from myDevices.devices.digital.gpio import NativeGPIO
59

610
class CayenneApiClient:
711
def __init__(self, host):
@@ -36,31 +40,55 @@ def sendRequest(self, method, uri, body=None):
3640
return None
3741
return response
3842
exception("No data received")
43+
44+
def getMessageBody(self, inviteCode):
45+
body = {'id': inviteCode}
46+
hardware = Hardware()
47+
if hardware.Serial and hardware.isRaspberryPi():
48+
body['type'] = 'rpi'
49+
body['hardware_id'] = hardware.Serial
50+
else:
51+
hardware_id = hardware.getMac()
52+
if hardware_id:
53+
body['type'] = 'mac'
54+
body['hardware_id'] = hardware_id
55+
try:
56+
system_data = []
57+
cayennemqtt.DataChannel.add(system_data, cayennemqtt.SYS_HARDWARE_MAKE, value=hardware.getManufacturer(), type='string', unit='utf8')
58+
cayennemqtt.DataChannel.add(system_data, cayennemqtt.SYS_HARDWARE_MODEL, value=hardware.getModel(), type='string', unit='utf8')
59+
system_info = SystemInfo()
60+
capacity_data = system_info.getMemoryInfo((cayennemqtt.CAPACITY,))
61+
capacity_data += system_info.getDiskInfo((cayennemqtt.CAPACITY,))
62+
for item in capacity_data:
63+
system_data.append(item)
64+
body['properties'] = {}
65+
body['properties']['pinmap'] = NativeGPIO().MAPPING
66+
if system_data:
67+
body['properties']['sysinfo'] = system_data
68+
except:
69+
exception('Error getting system info')
70+
return json.dumps(body)
3971

4072
def authenticate(self, inviteCode):
41-
body = json.dumps({'id': inviteCode})
73+
body = self.getMessageBody(inviteCode)
4274
url = '/things/key/authenticate'
4375
return self.sendRequest('POST', url, body)
4476

4577
def activate(self, inviteCode):
46-
body = json.dumps({'id': inviteCode})
78+
body = self.getMessageBody(inviteCode)
4779
url = '/things/key/activate'
4880
return self.sendRequest('POST', url, body)
4981

50-
def getId(self, content):
82+
def getCredentials(self, content):
5183
if content is None:
5284
return None
5385
body = content.decode("utf-8")
5486
if body is None or body is "":
5587
return None
56-
return json.loads(body)['id']
88+
return json.loads(body)
5789

5890
def loginDevice(self, inviteCode):
59-
response = self.authenticate(inviteCode)
91+
response = self.activate(inviteCode)
6092
if response and response.status_code == 200:
61-
return self.getId(response.content)
62-
if not response or response.status_code == 412:
63-
response = self.activate(inviteCode)
64-
if response and response.status_code == 200:
65-
return self.getId(response.content)
93+
return self.getCredentials(response.content)
6694
return None

myDevices/cloud/cayennemqtt.py

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
import time
2+
from json import loads, decoder
3+
from ssl import PROTOCOL_TLSv1_2
4+
import paho.mqtt.client as mqtt
5+
from myDevices.utils.logger import debug, error, exception, info, logJson, warn
6+
7+
# Topics
8+
DATA_TOPIC = 'data/json'
9+
COMMAND_TOPIC = 'cmd'
10+
COMMAND_JSON_TOPIC = 'cmd.json'
11+
COMMAND_RESPONSE_TOPIC = 'response'
12+
13+
# Data Channels
14+
SYS_HARDWARE_MAKE = 'sys:hw:make'
15+
SYS_HARDWARE_MODEL = 'sys:hw:model'
16+
SYS_OS_NAME = 'sys:os:name'
17+
SYS_OS_VERSION = 'sys:os:version'
18+
SYS_NET = 'sys:net'
19+
SYS_STORAGE = 'sys:storage'
20+
SYS_RAM = 'sys:ram'
21+
SYS_CPU = 'sys:cpu'
22+
SYS_I2C = 'sys:i2c'
23+
SYS_SPI = 'sys:spi'
24+
SYS_UART = 'sys:uart'
25+
SYS_ONEWIRE = 'sys:1wire'
26+
SYS_DEVICETREE = 'sys:devicetree'
27+
SYS_GPIO = 'sys:gpio'
28+
SYS_POWER_RESET = 'sys:pwr:reset'
29+
SYS_POWER_HALT = 'sys:pwr:halt'
30+
AGENT_VERSION = 'agent:version'
31+
AGENT_DEVICES = 'agent:devices'
32+
AGENT_MANAGE = 'agent:manage'
33+
DEV_SENSOR = 'dev'
34+
35+
# Channel Suffixes
36+
IP = 'ip'
37+
SPEEDTEST = 'speedtest'
38+
SSID = 'ssid'
39+
USAGE = 'usage'
40+
CAPACITY = 'capacity'
41+
LOAD = 'load'
42+
TEMPERATURE = 'temp'
43+
VALUE = 'value'
44+
FUNCTION = 'function'
45+
46+
47+
class DataChannel:
48+
@staticmethod
49+
def add(data_list, prefix, channel=None, suffix=None, value=None, type=None, unit=None, name=None):
50+
"""Create data channel dict and append it to a list"""
51+
data_channel = prefix
52+
if channel is not None:
53+
data_channel += ':' + str(channel)
54+
if suffix is not None:
55+
data_channel += ';' + str(suffix)
56+
data = {}
57+
data['channel'] = data_channel
58+
data['value'] = value
59+
if type is not None:
60+
data['type'] = type
61+
if unit is not None:
62+
data['unit'] = unit
63+
if name is not None:
64+
data['name'] = name
65+
data_list.append(data)
66+
67+
68+
class CayenneMQTTClient:
69+
"""Cayenne MQTT Client class.
70+
71+
This is the main client class for connecting to Cayenne and sending and recFUeiving data.
72+
73+
Standard usage:
74+
* Set on_message callback, if you are receiving data.
75+
* Connect to Cayenne using the begin() function.
76+
* Call loop() at intervals (or loop_forever() once) to perform message processing.
77+
* Send data to Cayenne using write functions: virtualWrite(), celsiusWrite(), etc.
78+
* Receive and process data from Cayenne in the on_message callback.
79+
80+
The on_message callback can be used by creating a function and assigning it to CayenneMQTTClient.on_message member.
81+
The callback function should have the following signature: on_message(topic, message)
82+
If it exists this callback is used as the default message handler.
83+
"""
84+
client = None
85+
root_topic = ""
86+
connected = False
87+
on_message = None
88+
89+
def begin(self, username, password, clientid, hostname='mqtt.mydevices.com', port=8883):
90+
"""Initializes the client and connects to Cayenne.
91+
92+
username is the Cayenne username.
93+
password is the Cayenne password.
94+
clientid is the Cayennne client ID for the device.
95+
hostname is the MQTT broker hostname.
96+
port is the MQTT broker port.
97+
"""
98+
self.root_topic = 'v1/{}/things/{}'.format(username, clientid)
99+
self.client = mqtt.Client(client_id=clientid, clean_session=True, userdata=self)
100+
self.client.on_connect = self.connect_callback
101+
self.client.on_disconnect = self.disconnect_callback
102+
self.client.on_message = self.message_callback
103+
self.client.username_pw_set(username, password)
104+
if port != 1883:
105+
self.client.tls_set(ca_certs='/etc/ssl/certs/ca-certificates.crt', tls_version=PROTOCOL_TLSv1_2)
106+
self.client.connect(hostname, port, 60)
107+
info('Connecting to {}:{}'.format(hostname, port))
108+
109+
def connect_callback(self, client, userdata, flags, rc):
110+
"""The callback for when the client connects to the server.
111+
112+
client is the client instance for this callback.
113+
userdata is the private user data as set in Client() or userdata_set().
114+
flags are the response flags sent by the broker.
115+
rc is the connection result.
116+
"""
117+
if rc != 0:
118+
# MQTT broker error codes
119+
broker_errors = {
120+
1 : 'unacceptable protocol version',
121+
2 : 'identifier rejected',
122+
3 : 'server unavailable',
123+
4 : 'bad user name or password',
124+
5 : 'not authorized',
125+
}
126+
raise Exception("Connection failed, " + broker_errors.get(rc, "result code " + str(rc)))
127+
else:
128+
info("Connected with result code "+str(rc))
129+
self.connected = True
130+
# Subscribing in on_connect() means that if we lose the connection and
131+
# reconnect then subscriptions will be renewed.
132+
client.subscribe(self.get_topic_string(COMMAND_TOPIC, True))
133+
client.subscribe(self.get_topic_string(COMMAND_JSON_TOPIC, False))
134+
135+
def disconnect_callback(self, client, userdata, rc):
136+
"""The callback for when the client disconnects from the server.
137+
138+
client is the client instance for this callback.
139+
userdata is the private user data as set in Client() or userdata_set().
140+
rc is the connection result.
141+
"""
142+
info("Disconnected with result code "+str(rc))
143+
self.connected = False
144+
reconnected = False
145+
while not reconnected:
146+
try:
147+
self.client.reconnect()
148+
reconnected = True
149+
except:
150+
print("Reconnect failed, retrying")
151+
time.sleep(5)
152+
153+
def message_callback(self, client, userdata, msg):
154+
"""The callback for when a message is received from the server.
155+
156+
client is the client instance for this callback.
157+
userdata is the private user data as set in Client() or userdata_set().
158+
msg is the received message.
159+
"""
160+
try:
161+
message = {}
162+
if msg.topic[-len(COMMAND_JSON_TOPIC):] == COMMAND_JSON_TOPIC:
163+
payload = loads(msg.payload.decode())
164+
message['payload'] = payload['value']
165+
message['cmdId'] = payload['cmdId']
166+
channel = payload['channel'].split('/')[-1].split(';')
167+
else:
168+
payload = msg.payload.decode().split(',')
169+
if len(payload) > 1:
170+
message['cmdId'] = payload[0]
171+
message['payload'] = payload[1]
172+
else:
173+
message['payload'] = payload[0]
174+
channel = msg.topic.split('/')[-1].split(';')
175+
message['channel'] = channel[0]
176+
if len(channel) > 1:
177+
message['suffix'] = channel[1]
178+
debug('message_callback: {}'.format(message))
179+
if self.on_message:
180+
self.on_message(message)
181+
except:
182+
exception('Error processing message: {} {}'.format(msg.topic, str(msg.payload)))
183+
184+
def get_topic_string(self, topic, append_wildcard=False):
185+
"""Return a topic string.
186+
187+
topic: the topic substring
188+
append_wildcard: if True append the single level topics wildcard (+)"""
189+
if append_wildcard:
190+
return '{}/{}/+'.format(self.root_topic, topic)
191+
else:
192+
return '{}/{}'.format(self.root_topic, topic)
193+
194+
def disconnect(self):
195+
"""Disconnect from Cayenne.
196+
"""
197+
self.client.disconnect()
198+
199+
def loop(self, timeout=1.0):
200+
"""Process Cayenne messages.
201+
202+
This should be called regularly to ensure Cayenne messages are sent and received.
203+
204+
timeout: The time in seconds to wait for incoming/outgoing network
205+
traffic before timing out and returning.
206+
"""
207+
self.client.loop(timeout)
208+
209+
def loop_start(self):
210+
"""This is part of the threaded client interface. Call this once to
211+
start a new thread to process network traffic. This provides an
212+
alternative to repeatedly calling loop() yourself.
213+
"""
214+
self.client.loop_start()
215+
216+
def loop_stop(self):
217+
"""This is part of the threaded client interface. Call this once to
218+
stop the network thread previously created with loop_start(). This call
219+
will block until the network thread finishes.
220+
"""
221+
self.client.loop_stop()
222+
223+
def publish_packet(self, topic, packet, qos=0, retain=False):
224+
"""Publish a packet.
225+
226+
topic: topic substring.
227+
packet: JSON packet to publish.
228+
qos: quality of service level to use.
229+
retain: if True, the message will be set as the "last known good"/retained message for the topic.
230+
"""
231+
debug('Publish to {}'.format(self.get_topic_string(topic)))
232+
self.client.publish(self.get_topic_string(topic), packet, qos, retain)
233+
234+
def publish_response(self, msg_id, error_message=None):
235+
"""Send a command response to Cayenne.
236+
237+
This should be sent when a command message has been received.
238+
msg_id is the ID of the message received.
239+
error_message is the error message to send. This should be set to None if there is no error.
240+
"""
241+
topic = self.get_topic_string(COMMAND_RESPONSE_TOPIC)
242+
if error_message:
243+
payload = "error,%s=%s" % (msg_id, error_message)
244+
else:
245+
payload = "ok,%s" % (msg_id)
246+
self.client.publish(topic, payload)

0 commit comments

Comments
 (0)