13
13
from myDevices .utils .logger import exception , info , warn , error , debug , logJson
14
14
from myDevices .sensors import sensors
15
15
from myDevices .system .hardware import Hardware
16
- # from myDevices.cloud.scheduler import SchedulerEngine
16
+ from myDevices .cloud .scheduler import SchedulerEngine
17
17
from myDevices .cloud .download_speed import DownloadSpeed
18
18
from myDevices .cloud .updater import Updater
19
19
from myDevices .system .systemconfig import SystemConfig
@@ -110,16 +110,24 @@ def run(self):
110
110
if self .cloudClient .mqttClient .connected == False :
111
111
info ('WriterThread mqttClient not connected' )
112
112
continue
113
+ got_packet = False
113
114
topic , message = self .cloudClient .DequeuePacket ()
114
- if message :
115
- # debug('WriterThread, topic: {} {}'.format(topic, message))
116
- if not isinstance (message , str ):
117
- message = dumps (message )
118
- self .cloudClient .mqttClient .publish_packet (topic , message )
119
- message = None
120
- self .cloudClient .writeQueue .task_done ()
115
+ if topic or message :
116
+ got_packet = True
117
+ try :
118
+ if message or topic == cayennemqtt .JOBS_TOPIC :
119
+ # debug('WriterThread, topic: {} {}'.format(topic, message))
120
+ if not isinstance (message , str ):
121
+ message = dumps (message )
122
+ self .cloudClient .mqttClient .publish_packet (topic , message )
123
+ message = None
124
+ except :
125
+ exception ("WriterThread publish packet error" )
126
+ finally :
127
+ if got_packet :
128
+ self .cloudClient .writeQueue .task_done ()
121
129
except :
122
- exception ("WriterThread Unexpected error" )
130
+ exception ("WriterThread unexpected error" )
123
131
return
124
132
125
133
def stop (self ):
@@ -187,7 +195,7 @@ def Start(self):
187
195
if not self .Connect ():
188
196
error ('Error starting agent' )
189
197
return
190
- # self.schedulerEngine = SchedulerEngine(self, 'client_scheduler')
198
+ self .schedulerEngine = SchedulerEngine (self , 'client_scheduler' )
191
199
self .sensorsClient = sensors .SensorsClient ()
192
200
self .readQueue = Queue ()
193
201
self .writeQueue = Queue ()
@@ -206,6 +214,8 @@ def Start(self):
206
214
TimerThread (self .SendSystemState , 30 , 5 )
207
215
self .updater = Updater (self .config )
208
216
self .updater .start ()
217
+ events = self .schedulerEngine .get_scheduled_events ()
218
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
209
219
# self.sentHistoryData = {}
210
220
# self.historySendFails = 0
211
221
# self.historyThread = Thread(target=self.SendHistoryData)
@@ -359,8 +369,12 @@ def OnMessage(self, message):
359
369
360
370
def RunAction (self , action ):
361
371
"""Run a specified action"""
362
- debug ('RunAction' )
363
- self .ExecuteMessage (action )
372
+ debug ('RunAction: {}' .format (action ))
373
+ result = True
374
+ command = action .copy ()
375
+ self .mqttClient .transform_command (command )
376
+ result = self .ExecuteMessage (command )
377
+ return result
364
378
365
379
def ProcessMessage (self ):
366
380
"""Process a message from the server"""
@@ -373,28 +387,36 @@ def ProcessMessage(self):
373
387
self .ExecuteMessage (messageObject )
374
388
375
389
def ExecuteMessage (self , message ):
376
- """Execute an action described in a message object"""
390
+ """Execute an action described in a message object
391
+
392
+ Returns: True if action was executed, False otherwise."""
393
+ result = False
377
394
if not message :
378
- return
395
+ return result
379
396
channel = message ['channel' ]
380
397
info ('ExecuteMessage: {}' .format (message ))
381
398
if channel in (cayennemqtt .SYS_POWER_RESET , cayennemqtt .SYS_POWER_HALT ):
382
- self .ProcessPowerCommand (message )
399
+ result = self .ProcessPowerCommand (message )
383
400
elif channel .startswith (cayennemqtt .DEV_SENSOR ):
384
- self .ProcessSensorCommand (message )
401
+ result = self .ProcessSensorCommand (message )
385
402
elif channel .startswith (cayennemqtt .SYS_GPIO ):
386
- self .ProcessGpioCommand (message )
403
+ result = self .ProcessGpioCommand (message )
387
404
elif channel == cayennemqtt .AGENT_DEVICES :
388
- self .ProcessDeviceCommand (message )
405
+ result = self .ProcessDeviceCommand (message )
389
406
elif channel in (cayennemqtt .SYS_I2C , cayennemqtt .SYS_SPI , cayennemqtt .SYS_UART , cayennemqtt .SYS_ONEWIRE ):
390
- self .ProcessConfigCommand (message )
407
+ result = self .ProcessConfigCommand (message )
391
408
elif channel == cayennemqtt .AGENT_MANAGE :
392
- self .ProcessAgentCommand (message )
409
+ result = self .ProcessAgentCommand (message )
410
+ elif channel == cayennemqtt .AGENT_SCHEDULER :
411
+ result = self .ProcessSchedulerCommand (message )
393
412
else :
394
413
info ('Unknown message' )
414
+ return result
395
415
396
416
def ProcessPowerCommand (self , message ):
397
- """Process command to reboot/shutdown the system"""
417
+ """Process command to reboot/shutdown the system
418
+
419
+ Returns: True if command was processed, False otherwise."""
398
420
error_message = None
399
421
try :
400
422
self .EnqueueCommandResponse (message , error_message )
@@ -405,6 +427,7 @@ def ProcessPowerCommand(self, message):
405
427
cayennemqtt .DataChannel .add (data , message ['channel' ], value = 1 )
406
428
self .EnqueuePacket (data )
407
429
self .writeQueue .join ()
430
+ info ('Calling execute: {}' .format (commands [message ['channel' ]]))
408
431
output , result = executeCommand (commands [message ['channel' ]])
409
432
debug ('ProcessPowerCommand: {}, result: {}, output: {}' .format (message , result , output ))
410
433
if result != 0 :
@@ -416,9 +439,13 @@ def ProcessPowerCommand(self, message):
416
439
data = []
417
440
cayennemqtt .DataChannel .add (data , message ['channel' ], value = 0 )
418
441
self .EnqueuePacket (data )
442
+ raise ExecuteMessageError (error_message )
443
+ return error_message == None
419
444
420
445
def ProcessAgentCommand (self , message ):
421
- """Process command to manage the agent state"""
446
+ """Process command to manage the agent state
447
+
448
+ Returns: True if command was processed, False otherwise."""
422
449
error = None
423
450
try :
424
451
if message ['suffix' ] == 'uninstall' :
@@ -439,9 +466,14 @@ def ProcessAgentCommand(self, message):
439
466
except Exception as ex :
440
467
error = '{}: {}' .format (type (ex ).__name__ , ex )
441
468
self .EnqueueCommandResponse (message , error )
469
+ if error :
470
+ raise ExecuteMessageError (error )
471
+ return error == None
442
472
443
473
def ProcessConfigCommand (self , message ):
444
- """Process system configuration command"""
474
+ """Process system configuration command
475
+
476
+ Returns: True if command was processed, False otherwise."""
445
477
error = None
446
478
try :
447
479
value = 1 - int (message ['payload' ]) #Invert the value since the config script uses 0 for enable and 1 for disable
@@ -453,9 +485,12 @@ def ProcessConfigCommand(self, message):
453
485
except Exception as ex :
454
486
error = '{}: {}' .format (type (ex ).__name__ , ex )
455
487
self .EnqueueCommandResponse (message , error )
456
-
488
+ return error == None
489
+
457
490
def ProcessGpioCommand (self , message ):
458
- """Process GPIO command"""
491
+ """Process GPIO command
492
+
493
+ Returns: True if command was processed, False otherwise."""
459
494
error = None
460
495
try :
461
496
channel = int (message ['channel' ].replace (cayennemqtt .SYS_GPIO + ':' , '' ))
@@ -466,9 +501,12 @@ def ProcessGpioCommand(self, message):
466
501
except Exception as ex :
467
502
error = '{}: {}' .format (type (ex ).__name__ , ex )
468
503
self .EnqueueCommandResponse (message , error )
504
+ return error == None
469
505
470
506
def ProcessSensorCommand (self , message ):
471
- """Process sensor command"""
507
+ """Process sensor command
508
+
509
+ Returns: True if command was processed, False otherwise."""
472
510
error = None
473
511
try :
474
512
sensor_info = message ['channel' ].replace (cayennemqtt .DEV_SENSOR + ':' , '' ).split (':' )
@@ -483,9 +521,12 @@ def ProcessSensorCommand(self, message):
483
521
except Exception as ex :
484
522
error = '{}: {}' .format (type (ex ).__name__ , ex )
485
523
self .EnqueueCommandResponse (message , error )
524
+ return error == None
486
525
487
526
def ProcessDeviceCommand (self , message ):
488
- """Process a device command to add/edit/remove a sensor"""
527
+ """Process a device command to add/edit/remove a sensor
528
+
529
+ Returns: True if command was processed, False otherwise."""
489
530
error = None
490
531
try :
491
532
payload = message ['payload' ]
@@ -504,9 +545,38 @@ def ProcessDeviceCommand(self, message):
504
545
except Exception as ex :
505
546
error = '{}: {}' .format (type (ex ).__name__ , ex )
506
547
self .EnqueueCommandResponse (message , error )
548
+ return error == None
549
+
550
+ def ProcessSchedulerCommand (self , message ):
551
+ """Process command to add/edit/remove a scheduled action
552
+
553
+ Returns: True if command was processed, False otherwise."""
554
+ error = None
555
+ try :
556
+ if message ['suffix' ] == 'add' :
557
+ result = self .schedulerEngine .add_scheduled_event (message ['payload' ], True )
558
+ elif message ['suffix' ] == 'edit' :
559
+ result = self .schedulerEngine .update_scheduled_event (message ['payload' ])
560
+ elif message ['suffix' ] == 'delete' :
561
+ result = self .schedulerEngine .remove_scheduled_event (message ['payload' ])
562
+ elif message ['suffix' ] == 'get' :
563
+ events = self .schedulerEngine .get_scheduled_events ()
564
+ self .EnqueuePacket (events , cayennemqtt .JOBS_TOPIC )
565
+ else :
566
+ error = 'Unknown schedule command: {}' .format (message ['suffix' ])
567
+ debug ('ProcessSchedulerCommand result: {}' .format (result ))
568
+ if result is False :
569
+ error = 'Schedule command failed'
570
+ except Exception as ex :
571
+ error = '{}: {}' .format (type (ex ).__name__ , ex )
572
+ self .EnqueueCommandResponse (message , error )
573
+ return error == None
507
574
508
575
def EnqueueCommandResponse (self , message , error ):
509
576
"""Send response after processing a command message"""
577
+ if not 'cmdId' in message :
578
+ # If there is no command id we assume this is a scheduled command and don't send a response message.
579
+ return
510
580
debug ('EnqueueCommandResponse error: {}' .format (error ))
511
581
if error :
512
582
response = 'error,{}={}' .format (message ['cmdId' ], error )
0 commit comments