13
13
from myDevices .utils .logger import exception , info , warn , error , debug , logJson
14
14
from myDevices .sensors import sensors
15
15
from myDevices .system .hardware import Hardware
16
- # from myDevices.cloud.scheduler import SchedulerEngine
16
+ from myDevices .cloud .scheduler import SchedulerEngine
17
17
from myDevices .cloud .updater import Updater
18
18
from myDevices .system .systemconfig import SystemConfig
19
19
from myDevices .utils .daemon import Daemon
@@ -113,7 +113,7 @@ def run(self):
113
113
if topic or message :
114
114
got_packet = True
115
115
try :
116
- if message :
116
+ if message or topic == cayennemqtt . JOBS_TOPIC :
117
117
# debug('WriterThread, topic: {} {}'.format(topic, message))
118
118
if not isinstance (message , str ):
119
119
message = dumps (message )
@@ -193,7 +193,7 @@ def Start(self):
193
193
if not self .Connect ():
194
194
error ('Error starting agent' )
195
195
return
196
- # self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
196
+ self .schedulerEngine = SchedulerEngine (self , 'client_scheduler' )
197
197
self .sensorsClient = sensors .SensorsClient ()
198
198
self .readQueue = Queue ()
199
199
self .writeQueue = Queue ()
@@ -211,6 +211,8 @@ def Start(self):
211
211
# TimerThread(self.SendSystemState, 30, 5)
212
212
self .updater = Updater (self .config )
213
213
self .updater .start ()
214
+ events = self .schedulerEngine .get_scheduled_events ()
215
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
214
216
# self.sentHistoryData = {}
215
217
# self.historySendFails = 0
216
218
# self.historyThread = Thread(target=self.SendHistoryData)
@@ -377,8 +379,12 @@ def OnMessage(self, message):
377
379
378
380
def RunAction (self , action ):
379
381
"""Run a specified action"""
380
- debug ('RunAction' )
381
- self .ExecuteMessage (action )
382
+ debug ('RunAction: {}' .format (action ))
383
+ result = True
384
+ command = action .copy ()
385
+ self .mqttClient .transform_command (command )
386
+ result = self .ExecuteMessage (command )
387
+ return result
382
388
383
389
def ProcessMessage (self ):
384
390
"""Process a message from the server"""
@@ -391,28 +397,36 @@ def ProcessMessage(self):
391
397
self .ExecuteMessage (messageObject )
392
398
393
399
def ExecuteMessage (self , message ):
394
- """Execute an action described in a message object"""
400
+ """Execute an action described in a message object
401
+
402
+ Returns: True if action was executed, False otherwise."""
403
+ result = False
395
404
if not message :
396
- return
405
+ return result
397
406
channel = message ['channel' ]
398
407
info ('ExecuteMessage: {}' .format (message ))
399
408
if channel in (cayennemqtt .SYS_POWER_RESET , cayennemqtt .SYS_POWER_HALT ):
400
- self .ProcessPowerCommand (message )
409
+ result = self .ProcessPowerCommand (message )
401
410
elif channel .startswith (cayennemqtt .DEV_SENSOR ):
402
- self .ProcessSensorCommand (message )
411
+ result = self .ProcessSensorCommand (message )
403
412
elif channel .startswith (cayennemqtt .SYS_GPIO ):
404
- self .ProcessGpioCommand (message )
413
+ result = self .ProcessGpioCommand (message )
405
414
elif channel == cayennemqtt .AGENT_DEVICES :
406
- self .ProcessDeviceCommand (message )
415
+ result = self .ProcessDeviceCommand (message )
407
416
elif channel in (cayennemqtt .SYS_I2C , cayennemqtt .SYS_SPI , cayennemqtt .SYS_UART , cayennemqtt .SYS_ONEWIRE ):
408
- self .ProcessConfigCommand (message )
417
+ result = self .ProcessConfigCommand (message )
409
418
elif channel == cayennemqtt .AGENT_MANAGE :
410
- self .ProcessAgentCommand (message )
419
+ result = self .ProcessAgentCommand (message )
420
+ elif channel == cayennemqtt .AGENT_SCHEDULER :
421
+ result = self .ProcessSchedulerCommand (message )
411
422
else :
412
423
info ('Unknown message' )
424
+ return result
413
425
414
426
def ProcessPowerCommand (self , message ):
415
- """Process command to reboot/shutdown the system"""
427
+ """Process command to reboot/shutdown the system
428
+
429
+ Returns: True if command was processed, False otherwise."""
416
430
error_message = None
417
431
try :
418
432
self .EnqueueCommandResponse (message , error_message )
@@ -435,9 +449,13 @@ def ProcessPowerCommand(self, message):
435
449
data = []
436
450
cayennemqtt .DataChannel .add (data , message ['channel' ], value = 0 )
437
451
self .EnqueuePacket (data )
452
+ raise ExecuteMessageError (error_message )
453
+ return error_message == None
438
454
439
455
def ProcessAgentCommand (self , message ):
440
- """Process command to manage the agent state"""
456
+ """Process command to manage the agent state
457
+
458
+ Returns: True if command was processed, False otherwise."""
441
459
error = None
442
460
try :
443
461
if message ['suffix' ] == 'uninstall' :
@@ -458,9 +476,14 @@ def ProcessAgentCommand(self, message):
458
476
except Exception as ex :
459
477
error = '{}: {}' .format (type (ex ).__name__ , ex )
460
478
self .EnqueueCommandResponse (message , error )
479
+ if error :
480
+ raise ExecuteMessageError (error )
481
+ return error == None
461
482
462
483
def ProcessConfigCommand (self , message ):
463
- """Process system configuration command"""
484
+ """Process system configuration command
485
+
486
+ Returns: True if command was processed, False otherwise."""
464
487
error = None
465
488
try :
466
489
value = 1 - int (message ['payload' ]) #Invert the value since the config script uses 0 for enable and 1 for disable
@@ -472,9 +495,12 @@ def ProcessConfigCommand(self, message):
472
495
except Exception as ex :
473
496
error = '{}: {}' .format (type (ex ).__name__ , ex )
474
497
self .EnqueueCommandResponse (message , error )
475
-
498
+ return error == None
499
+
476
500
def ProcessGpioCommand (self , message ):
477
- """Process GPIO command"""
501
+ """Process GPIO command
502
+
503
+ Returns: True if command was processed, False otherwise."""
478
504
error = None
479
505
try :
480
506
channel = int (message ['channel' ].replace (cayennemqtt .SYS_GPIO + ':' , '' ))
@@ -485,9 +511,12 @@ def ProcessGpioCommand(self, message):
485
511
except Exception as ex :
486
512
error = '{}: {}' .format (type (ex ).__name__ , ex )
487
513
self .EnqueueCommandResponse (message , error )
514
+ return error == None
488
515
489
516
def ProcessSensorCommand (self , message ):
490
- """Process sensor command"""
517
+ """Process sensor command
518
+
519
+ Returns: True if command was processed, False otherwise."""
491
520
error = None
492
521
try :
493
522
sensor_info = message ['channel' ].replace (cayennemqtt .DEV_SENSOR + ':' , '' ).split (':' )
@@ -502,9 +531,12 @@ def ProcessSensorCommand(self, message):
502
531
except Exception as ex :
503
532
error = '{}: {}' .format (type (ex ).__name__ , ex )
504
533
self .EnqueueCommandResponse (message , error )
534
+ return error == None
505
535
506
536
def ProcessDeviceCommand (self , message ):
507
- """Process a device command to add/edit/remove a sensor"""
537
+ """Process a device command to add/edit/remove a sensor
538
+
539
+ Returns: True if command was processed, False otherwise."""
508
540
error = None
509
541
try :
510
542
payload = message ['payload' ]
@@ -523,9 +555,38 @@ def ProcessDeviceCommand(self, message):
523
555
except Exception as ex :
524
556
error = '{}: {}' .format (type (ex ).__name__ , ex )
525
557
self .EnqueueCommandResponse (message , error )
558
+ return error == None
559
+
560
+ def ProcessSchedulerCommand (self , message ):
561
+ """Process command to add/edit/remove a scheduled action
562
+
563
+ Returns: True if command was processed, False otherwise."""
564
+ error = None
565
+ try :
566
+ if message ['suffix' ] == 'add' :
567
+ result = self .schedulerEngine .add_scheduled_event (message ['payload' ], True )
568
+ elif message ['suffix' ] == 'edit' :
569
+ result = self .schedulerEngine .update_scheduled_event (message ['payload' ])
570
+ elif message ['suffix' ] == 'delete' :
571
+ result = self .schedulerEngine .remove_scheduled_event (message ['payload' ])
572
+ elif message ['suffix' ] == 'get' :
573
+ events = self .schedulerEngine .get_scheduled_events ()
574
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
575
+ else :
576
+ error = 'Unknown schedule command: {}' .format (message ['suffix' ])
577
+ debug ('ProcessSchedulerCommand result: {}' .format (result ))
578
+ if result is False :
579
+ error = 'Schedule command failed'
580
+ except Exception as ex :
581
+ error = '{}: {}' .format (type (ex ).__name__ , ex )
582
+ self .EnqueueCommandResponse (message , error )
583
+ return error == None
526
584
527
585
def EnqueueCommandResponse (self , message , error ):
528
586
"""Send response after processing a command message"""
587
+ if not 'cmdId' in message :
588
+ # If there is no command id we assume this is a scheduled command and don't send a response message.
589
+ return
529
590
debug ('EnqueueCommandResponse error: {}' .format (error ))
530
591
if error :
531
592
response = 'error,{}={}' .format (message ['cmdId' ], error )
0 commit comments