Skip to content

Commit d476727

Browse files
committed
Add support for scheduler commands.
1 parent 45e08b4 commit d476727

File tree

2 files changed

+54
-17
lines changed

2 files changed

+54
-17
lines changed

myDevices/cloud/cayennemqtt.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
AGENT_VERSION = 'agent:version'
3131
AGENT_DEVICES = 'agent:devices'
3232
AGENT_MANAGE = 'agent:manage'
33+
AGENT_SCHEDULE = 'agent:schedule'
3334
DEV_SENSOR = 'dev'
3435

3536
# Channel Suffixes
@@ -68,7 +69,7 @@ def add(data_list, prefix, channel=None, suffix=None, value=None, type=None, uni
6869
class CayenneMQTTClient:
6970
"""Cayenne MQTT Client class.
7071
71-
This is the main client class for connecting to Cayenne and sending and recFUeiving data.
72+
This is the main client class for connecting to Cayenne and sending and receiving data.
7273
7374
Standard usage:
7475
* Set on_message callback, if you are receiving data.
@@ -150,6 +151,26 @@ def disconnect_callback(self, client, userdata, rc):
150151
print("Reconnect failed, retrying")
151152
time.sleep(5)
152153

154+
def transform_command(self, command, payload=[], channel=[]):
155+
"""Transform a command message into an object.
156+
157+
command is the command object that will be transformed in place.
158+
payload is an optional list of payload data items.
159+
channel is an optional list containing channel and suffix data.
160+
"""
161+
if not payload:
162+
command['payload'] = command.pop('value')
163+
channel = command['channel'].split('/')[-1].split(';')
164+
else:
165+
if len(payload) > 1:
166+
command['cmdId'] = payload[0]
167+
command['payload'] = payload[1]
168+
else:
169+
command['payload'] = payload[0]
170+
command['channel'] = channel[0]
171+
if len(channel) > 1:
172+
command['suffix'] = channel[1]
173+
153174
def message_callback(self, client, userdata, msg):
154175
"""The callback for when a message is received from the server.
155176
@@ -160,21 +181,10 @@ def message_callback(self, client, userdata, msg):
160181
try:
161182
message = {}
162183
if msg.topic[-len(COMMAND_JSON_TOPIC):] == COMMAND_JSON_TOPIC:
163-
payload = loads(msg.payload.decode())
164-
message['payload'] = payload['value']
165-
message['cmdId'] = payload['cmdId']
166-
channel = payload['channel'].split('/')[-1].split(';')
184+
message = loads(msg.payload.decode())
185+
self.transform_command(message)
167186
else:
168-
payload = msg.payload.decode().split(',')
169-
if len(payload) > 1:
170-
message['cmdId'] = payload[0]
171-
message['payload'] = payload[1]
172-
else:
173-
message['payload'] = payload[0]
174-
channel = msg.topic.split('/')[-1].split(';')
175-
message['channel'] = channel[0]
176-
if len(channel) > 1:
177-
message['suffix'] = channel[1]
187+
self.transform_command(message, msg.payload.decode().split(','), msg.topic.split('/')[-1].split(';'))
178188
debug('message_callback: {}'.format(message))
179189
if self.on_message:
180190
self.on_message(message)

myDevices/cloud/client.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from myDevices.utils.logger import exception, info, warn, error, debug, logJson
1414
from myDevices.sensors import sensors
1515
from myDevices.system.hardware import Hardware
16-
# from myDevices.cloud.scheduler import SchedulerEngine
16+
from myDevices.cloud.scheduler import SchedulerEngine
1717
from myDevices.cloud.download_speed import DownloadSpeed
1818
from myDevices.cloud.updater import Updater
1919
from myDevices.system.systemconfig import SystemConfig
@@ -195,7 +195,7 @@ def Start(self):
195195
if not self.Connect():
196196
error('Error starting agent')
197197
return
198-
# self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
198+
self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
199199
self.sensorsClient = sensors.SensorsClient()
200200
self.readQueue = Queue()
201201
self.writeQueue = Queue()
@@ -398,6 +398,8 @@ def ExecuteMessage(self, message):
398398
self.ProcessConfigCommand(message)
399399
elif channel == cayennemqtt.AGENT_MANAGE:
400400
self.ProcessAgentCommand(message)
401+
elif channel == cayennemqtt.AGENT_SCHEDULE:
402+
self.ProcessScheduleCommand(message)
401403
else:
402404
info('Unknown message')
403405

@@ -514,8 +516,33 @@ def ProcessDeviceCommand(self, message):
514516
error = '{}: {}'.format(type(ex).__name__, ex)
515517
self.EnqueueCommandResponse(message, error)
516518

519+
def ProcessScheduleCommand(self, message):
520+
"""Process command to add/edit/remove a scheduled action"""
521+
error = None
522+
try:
523+
if 'actions' in message['payload']:
524+
for action in message['payload']['actions']:
525+
self.mqttClient.transform_command(action)
526+
if message['suffix'] == 'add':
527+
result = self.schedulerEngine.add_scheduled_item(message['payload'], True)
528+
elif message['suffix'] == 'edit':
529+
result = self.schedulerEngine.update_scheduled_item(message['payload'])
530+
elif message['suffix'] == 'delete':
531+
result = self.schedulerEngine.remove_scheduled_item(message['payload'])
532+
else:
533+
error = 'Unknown schedule command: {}'.format(message['suffix'])
534+
debug('ProcessScheduleCommand result: {}'.format(result))
535+
if result is False:
536+
error = 'Schedule command failed'
537+
except Exception as ex:
538+
error = '{}: {}'.format(type(ex).__name__, ex)
539+
self.EnqueueCommandResponse(message, error)
540+
517541
def EnqueueCommandResponse(self, message, error):
518542
"""Send response after processing a command message"""
543+
if not hasattr(message, 'cmdId'):
544+
# If there is no command idea we assume this is a scheduled command and don't send a response message.
545+
return
519546
debug('EnqueueCommandResponse error: {}'.format(error))
520547
if error:
521548
response = 'error,{}={}'.format(message['cmdId'], error)

0 commit comments

Comments
 (0)