13
13
from myDevices .utils .logger import exception , info , warn , error , debug , logJson
14
14
from myDevices .sensors import sensors
15
15
from myDevices .system .hardware import Hardware
16
- # from myDevices.cloud.scheduler import SchedulerEngine
16
+ from myDevices .cloud .scheduler import SchedulerEngine
17
17
from myDevices .cloud .download_speed import DownloadSpeed
18
18
from myDevices .cloud .updater import Updater
19
19
from myDevices .system .systemconfig import SystemConfig
@@ -115,7 +115,7 @@ def run(self):
115
115
if topic or message :
116
116
got_packet = True
117
117
try :
118
- if message :
118
+ if message or topic == cayennemqtt . JOBS_TOPIC :
119
119
# debug('WriterThread, topic: {} {}'.format(topic, message))
120
120
if not isinstance (message , str ):
121
121
message = dumps (message )
@@ -195,7 +195,7 @@ def Start(self):
195
195
if not self .Connect ():
196
196
error ('Error starting agent' )
197
197
return
198
- # self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
198
+ self .schedulerEngine = SchedulerEngine (self , 'client_scheduler' )
199
199
self .sensorsClient = sensors .SensorsClient ()
200
200
self .readQueue = Queue ()
201
201
self .writeQueue = Queue ()
@@ -214,6 +214,8 @@ def Start(self):
214
214
TimerThread (self .SendSystemState , 30 , 5 )
215
215
self .updater = Updater (self .config )
216
216
self .updater .start ()
217
+ events = self .schedulerEngine .get_scheduled_events ()
218
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
217
219
# self.sentHistoryData = {}
218
220
# self.historySendFails = 0
219
221
# self.historyThread = Thread(target=self.SendHistoryData)
@@ -367,8 +369,12 @@ def OnMessage(self, message):
367
369
368
370
def RunAction (self , action ):
369
371
"""Run a specified action"""
370
- debug ('RunAction' )
371
- self .ExecuteMessage (action )
372
+ debug ('RunAction: {}' .format (action ))
373
+ result = True
374
+ command = action .copy ()
375
+ self .mqttClient .transform_command (command )
376
+ result = self .ExecuteMessage (command )
377
+ return result
372
378
373
379
def ProcessMessage (self ):
374
380
"""Process a message from the server"""
@@ -381,28 +387,36 @@ def ProcessMessage(self):
381
387
self .ExecuteMessage (messageObject )
382
388
383
389
def ExecuteMessage (self , message ):
384
- """Execute an action described in a message object"""
390
+ """Execute an action described in a message object
391
+
392
+ Returns: True if action was executed, False otherwise."""
393
+ result = False
385
394
if not message :
386
- return
395
+ return result
387
396
channel = message ['channel' ]
388
397
info ('ExecuteMessage: {}' .format (message ))
389
398
if channel in (cayennemqtt .SYS_POWER_RESET , cayennemqtt .SYS_POWER_HALT ):
390
- self .ProcessPowerCommand (message )
399
+ result = self .ProcessPowerCommand (message )
391
400
elif channel .startswith (cayennemqtt .DEV_SENSOR ):
392
- self .ProcessSensorCommand (message )
401
+ result = self .ProcessSensorCommand (message )
393
402
elif channel .startswith (cayennemqtt .SYS_GPIO ):
394
- self .ProcessGpioCommand (message )
403
+ result = self .ProcessGpioCommand (message )
395
404
elif channel == cayennemqtt .AGENT_DEVICES :
396
- self .ProcessDeviceCommand (message )
405
+ result = self .ProcessDeviceCommand (message )
397
406
elif channel in (cayennemqtt .SYS_I2C , cayennemqtt .SYS_SPI , cayennemqtt .SYS_UART , cayennemqtt .SYS_ONEWIRE ):
398
- self .ProcessConfigCommand (message )
407
+ result = self .ProcessConfigCommand (message )
399
408
elif channel == cayennemqtt .AGENT_MANAGE :
400
- self .ProcessAgentCommand (message )
409
+ result = self .ProcessAgentCommand (message )
410
+ elif channel == cayennemqtt .AGENT_SCHEDULER :
411
+ result = self .ProcessSchedulerCommand (message )
401
412
else :
402
413
info ('Unknown message' )
414
+ return result
403
415
404
416
def ProcessPowerCommand (self , message ):
405
- """Process command to reboot/shutdown the system"""
417
+ """Process command to reboot/shutdown the system
418
+
419
+ Returns: True if command was processed, False otherwise."""
406
420
error_message = None
407
421
try :
408
422
self .EnqueueCommandResponse (message , error_message )
@@ -425,9 +439,13 @@ def ProcessPowerCommand(self, message):
425
439
data = []
426
440
cayennemqtt .DataChannel .add (data , message ['channel' ], value = 0 )
427
441
self .EnqueuePacket (data )
442
+ raise ExecuteMessageError (error_message )
443
+ return error_message == None
428
444
429
445
def ProcessAgentCommand (self , message ):
430
- """Process command to manage the agent state"""
446
+ """Process command to manage the agent state
447
+
448
+ Returns: True if command was processed, False otherwise."""
431
449
error = None
432
450
try :
433
451
if message ['suffix' ] == 'uninstall' :
@@ -448,9 +466,14 @@ def ProcessAgentCommand(self, message):
448
466
except Exception as ex :
449
467
error = '{}: {}' .format (type (ex ).__name__ , ex )
450
468
self .EnqueueCommandResponse (message , error )
469
+ if error :
470
+ raise ExecuteMessageError (error )
471
+ return error == None
451
472
452
473
def ProcessConfigCommand (self , message ):
453
- """Process system configuration command"""
474
+ """Process system configuration command
475
+
476
+ Returns: True if command was processed, False otherwise."""
454
477
error = None
455
478
try :
456
479
value = 1 - int (message ['payload' ]) #Invert the value since the config script uses 0 for enable and 1 for disable
@@ -462,9 +485,12 @@ def ProcessConfigCommand(self, message):
462
485
except Exception as ex :
463
486
error = '{}: {}' .format (type (ex ).__name__ , ex )
464
487
self .EnqueueCommandResponse (message , error )
465
-
488
+ return error == None
489
+
466
490
def ProcessGpioCommand (self , message ):
467
- """Process GPIO command"""
491
+ """Process GPIO command
492
+
493
+ Returns: True if command was processed, False otherwise."""
468
494
error = None
469
495
try :
470
496
channel = int (message ['channel' ].replace (cayennemqtt .SYS_GPIO + ':' , '' ))
@@ -475,9 +501,12 @@ def ProcessGpioCommand(self, message):
475
501
except Exception as ex :
476
502
error = '{}: {}' .format (type (ex ).__name__ , ex )
477
503
self .EnqueueCommandResponse (message , error )
504
+ return error == None
478
505
479
506
def ProcessSensorCommand (self , message ):
480
- """Process sensor command"""
507
+ """Process sensor command
508
+
509
+ Returns: True if command was processed, False otherwise."""
481
510
error = None
482
511
try :
483
512
sensor_info = message ['channel' ].replace (cayennemqtt .DEV_SENSOR + ':' , '' ).split (':' )
@@ -492,9 +521,12 @@ def ProcessSensorCommand(self, message):
492
521
except Exception as ex :
493
522
error = '{}: {}' .format (type (ex ).__name__ , ex )
494
523
self .EnqueueCommandResponse (message , error )
524
+ return error == None
495
525
496
526
def ProcessDeviceCommand (self , message ):
497
- """Process a device command to add/edit/remove a sensor"""
527
+ """Process a device command to add/edit/remove a sensor
528
+
529
+ Returns: True if command was processed, False otherwise."""
498
530
error = None
499
531
try :
500
532
payload = message ['payload' ]
@@ -513,9 +545,38 @@ def ProcessDeviceCommand(self, message):
513
545
except Exception as ex :
514
546
error = '{}: {}' .format (type (ex ).__name__ , ex )
515
547
self .EnqueueCommandResponse (message , error )
548
+ return error == None
549
+
550
+ def ProcessSchedulerCommand (self , message ):
551
+ """Process command to add/edit/remove a scheduled action
552
+
553
+ Returns: True if command was processed, False otherwise."""
554
+ error = None
555
+ try :
556
+ if message ['suffix' ] == 'add' :
557
+ result = self .schedulerEngine .add_scheduled_event (message ['payload' ], True )
558
+ elif message ['suffix' ] == 'edit' :
559
+ result = self .schedulerEngine .update_scheduled_event (message ['payload' ])
560
+ elif message ['suffix' ] == 'delete' :
561
+ result = self .schedulerEngine .remove_scheduled_event (message ['payload' ])
562
+ elif message ['suffix' ] == 'get' :
563
+ events = self .schedulerEngine .get_scheduled_events ()
564
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
565
+ else :
566
+ error = 'Unknown schedule command: {}' .format (message ['suffix' ])
567
+ debug ('ProcessSchedulerCommand result: {}' .format (result ))
568
+ if result is False :
569
+ error = 'Schedule command failed'
570
+ except Exception as ex :
571
+ error = '{}: {}' .format (type (ex ).__name__ , ex )
572
+ self .EnqueueCommandResponse (message , error )
573
+ return error == None
516
574
517
575
def EnqueueCommandResponse (self , message , error ):
518
576
"""Send response after processing a command message"""
577
+ if not hasattr (message , 'cmdId' ):
578
+ # If there is no command idea we assume this is a scheduled command and don't send a response message.
579
+ return
519
580
debug ('EnqueueCommandResponse error: {}' .format (error ))
520
581
if error :
521
582
response = 'error,{}={}' .format (message ['cmdId' ], error )
0 commit comments