Skip to content

Commit 4933009

Browse files
committed
Do HTTP requests after scheduled jobs run.
1 parent 6e263f6 commit 4933009

File tree

4 files changed

+163
-39
lines changed

4 files changed

+163
-39
lines changed

myDevices/cloud/cayennemqtt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
COMMAND_TOPIC = 'cmd'
1010
COMMAND_JSON_TOPIC = 'cmd.json'
1111
COMMAND_RESPONSE_TOPIC = 'response'
12+
JOBS_TOPIC = 'jobs.json'
1213

1314
# Data Channels
1415
SYS_HARDWARE_MAKE = 'sys:hw:make'
@@ -30,7 +31,7 @@
3031
AGENT_VERSION = 'agent:version'
3132
AGENT_DEVICES = 'agent:devices'
3233
AGENT_MANAGE = 'agent:manage'
33-
AGENT_SCHEDULE = 'agent:schedule'
34+
AGENT_SCHEDULER = 'agent:scheduler'
3435
DEV_SENSOR = 'dev'
3536

3637
# Channel Suffixes

myDevices/cloud/client.py

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def run(self):
115115
if topic or message:
116116
got_packet = True
117117
try:
118-
if message:
118+
if message or topic == cayennemqtt.JOBS_TOPIC:
119119
# debug('WriterThread, topic: {} {}'.format(topic, message))
120120
if not isinstance(message, str):
121121
message = dumps(message)
@@ -214,6 +214,8 @@ def Start(self):
214214
TimerThread(self.SendSystemState, 30, 5)
215215
self.updater = Updater(self.config)
216216
self.updater.start()
217+
events = self.schedulerEngine.get_scheduled_events()
218+
self.EnqueuePacket(events, cayennemqtt.JOBS_TOPIC)
217219
# self.sentHistoryData = {}
218220
# self.historySendFails = 0
219221
# self.historyThread = Thread(target=self.SendHistoryData)
@@ -367,8 +369,12 @@ def OnMessage(self, message):
367369

368370
def RunAction(self, action):
369371
"""Run a specified action"""
370-
debug('RunAction')
371-
self.ExecuteMessage(action)
372+
debug('RunAction: {}'.format(action))
373+
result = True
374+
command = action.copy()
375+
self.mqttClient.transform_command(command)
376+
result = self.ExecuteMessage(command)
377+
return result
372378

373379
def ProcessMessage(self):
374380
"""Process a message from the server"""
@@ -381,30 +387,36 @@ def ProcessMessage(self):
381387
self.ExecuteMessage(messageObject)
382388

383389
def ExecuteMessage(self, message):
384-
"""Execute an action described in a message object"""
390+
"""Execute an action described in a message object
391+
392+
Returns: True if action was executed, False otherwise."""
393+
result = False
385394
if not message:
386-
return
395+
return result
387396
channel = message['channel']
388397
info('ExecuteMessage: {}'.format(message))
389398
if channel in (cayennemqtt.SYS_POWER_RESET, cayennemqtt.SYS_POWER_HALT):
390-
self.ProcessPowerCommand(message)
399+
result = self.ProcessPowerCommand(message)
391400
elif channel.startswith(cayennemqtt.DEV_SENSOR):
392-
self.ProcessSensorCommand(message)
401+
result = self.ProcessSensorCommand(message)
393402
elif channel.startswith(cayennemqtt.SYS_GPIO):
394-
self.ProcessGpioCommand(message)
403+
result = self.ProcessGpioCommand(message)
395404
elif channel == cayennemqtt.AGENT_DEVICES:
396-
self.ProcessDeviceCommand(message)
405+
result = self.ProcessDeviceCommand(message)
397406
elif channel in (cayennemqtt.SYS_I2C, cayennemqtt.SYS_SPI, cayennemqtt.SYS_UART, cayennemqtt.SYS_ONEWIRE):
398-
self.ProcessConfigCommand(message)
407+
result = self.ProcessConfigCommand(message)
399408
elif channel == cayennemqtt.AGENT_MANAGE:
400-
self.ProcessAgentCommand(message)
401-
elif channel == cayennemqtt.AGENT_SCHEDULE:
402-
self.ProcessScheduleCommand(message)
409+
result = self.ProcessAgentCommand(message)
410+
elif channel == cayennemqtt.AGENT_SCHEDULER:
411+
result = self.ProcessSchedulerCommand(message)
403412
else:
404413
info('Unknown message')
414+
return result
405415

406416
def ProcessPowerCommand(self, message):
407-
"""Process command to reboot/shutdown the system"""
417+
"""Process command to reboot/shutdown the system
418+
419+
Returns: True if command was processed, False otherwise."""
408420
error_message = None
409421
try:
410422
self.EnqueueCommandResponse(message, error_message)
@@ -427,9 +439,13 @@ def ProcessPowerCommand(self, message):
427439
data = []
428440
cayennemqtt.DataChannel.add(data, message['channel'], value=0)
429441
self.EnqueuePacket(data)
442+
raise ExecuteMessageError(error_message)
443+
return error_message == None
430444

431445
def ProcessAgentCommand(self, message):
432-
"""Process command to manage the agent state"""
446+
"""Process command to manage the agent state
447+
448+
Returns: True if command was processed, False otherwise."""
433449
error = None
434450
try:
435451
if message['suffix'] == 'uninstall':
@@ -450,9 +466,14 @@ def ProcessAgentCommand(self, message):
450466
except Exception as ex:
451467
error = '{}: {}'.format(type(ex).__name__, ex)
452468
self.EnqueueCommandResponse(message, error)
469+
if error:
470+
raise ExecuteMessageError(error)
471+
return error == None
453472

454473
def ProcessConfigCommand(self, message):
455-
"""Process system configuration command"""
474+
"""Process system configuration command
475+
476+
Returns: True if command was processed, False otherwise."""
456477
error = None
457478
try:
458479
value = 1 - int(message['payload']) #Invert the value since the config script uses 0 for enable and 1 for disable
@@ -464,9 +485,12 @@ def ProcessConfigCommand(self, message):
464485
except Exception as ex:
465486
error = '{}: {}'.format(type(ex).__name__, ex)
466487
self.EnqueueCommandResponse(message, error)
467-
488+
return error == None
489+
468490
def ProcessGpioCommand(self, message):
469-
"""Process GPIO command"""
491+
"""Process GPIO command
492+
493+
Returns: True if command was processed, False otherwise."""
470494
error = None
471495
try:
472496
channel = int(message['channel'].replace(cayennemqtt.SYS_GPIO + ':', ''))
@@ -477,9 +501,12 @@ def ProcessGpioCommand(self, message):
477501
except Exception as ex:
478502
error = '{}: {}'.format(type(ex).__name__, ex)
479503
self.EnqueueCommandResponse(message, error)
504+
return error == None
480505

481506
def ProcessSensorCommand(self, message):
482-
"""Process sensor command"""
507+
"""Process sensor command
508+
509+
Returns: True if command was processed, False otherwise."""
483510
error = None
484511
try:
485512
sensor_info = message['channel'].replace(cayennemqtt.DEV_SENSOR + ':', '').split(':')
@@ -494,9 +521,12 @@ def ProcessSensorCommand(self, message):
494521
except Exception as ex:
495522
error = '{}: {}'.format(type(ex).__name__, ex)
496523
self.EnqueueCommandResponse(message, error)
524+
return error == None
497525

498526
def ProcessDeviceCommand(self, message):
499-
"""Process a device command to add/edit/remove a sensor"""
527+
"""Process a device command to add/edit/remove a sensor
528+
529+
Returns: True if command was processed, False otherwise."""
500530
error = None
501531
try:
502532
payload = message['payload']
@@ -515,28 +545,32 @@ def ProcessDeviceCommand(self, message):
515545
except Exception as ex:
516546
error = '{}: {}'.format(type(ex).__name__, ex)
517547
self.EnqueueCommandResponse(message, error)
548+
return error == None
518549

519-
def ProcessScheduleCommand(self, message):
520-
"""Process command to add/edit/remove a scheduled action"""
550+
def ProcessSchedulerCommand(self, message):
551+
"""Process command to add/edit/remove a scheduled action
552+
553+
Returns: True if command was processed, False otherwise."""
521554
error = None
522555
try:
523-
if 'actions' in message['payload']:
524-
for action in message['payload']['actions']:
525-
self.mqttClient.transform_command(action)
526556
if message['suffix'] == 'add':
527557
result = self.schedulerEngine.add_scheduled_event(message['payload'], True)
528558
elif message['suffix'] == 'edit':
529559
result = self.schedulerEngine.update_scheduled_event(message['payload'])
530560
elif message['suffix'] == 'delete':
531561
result = self.schedulerEngine.remove_scheduled_event(message['payload'])
562+
elif message['suffix'] == 'get':
563+
events = self.schedulerEngine.get_scheduled_events()
564+
self.EnqueuePacket(events, cayennemqtt.JOBS_TOPIC)
532565
else:
533566
error = 'Unknown schedule command: {}'.format(message['suffix'])
534-
debug('ProcessScheduleCommand result: {}'.format(result))
567+
debug('ProcessSchedulerCommand result: {}'.format(result))
535568
if result is False:
536569
error = 'Schedule command failed'
537570
except Exception as ex:
538571
error = '{}: {}'.format(type(ex).__name__, ex)
539572
self.EnqueueCommandResponse(message, error)
573+
return error == None
540574

541575
def EnqueueCommandResponse(self, message, error):
542576
"""Send response after processing a command message"""

myDevices/cloud/scheduler.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from time import sleep
66

77
import myDevices.schedule as schedule
8+
from myDevices.requests_futures.sessions import FuturesSession
89
from myDevices.utils.logger import debug, error, exception, info, logJson, setDebug, warn
910

1011

@@ -82,13 +83,13 @@ def update_scheduled_event(self, event):
8283
try:
8384
old_item = self.schedule_items[event['id']]
8485
schedule.cancel_job(old_item['job'])
86+
result = self.create_job(schedule_item)
87+
debug('Update scheduled event result: {}'.format(result))
88+
if result == True:
89+
self.update_database_record(event['id'], event)
90+
self.schedule_items[event['id']] = schedule_item
8591
except KeyError:
8692
debug('Old schedule with id = {} not found'.format(event['id']))
87-
result = self.create_job(schedule_item)
88-
debug('Update scheduled event result: {}'.format(result))
89-
if result == True:
90-
self.update_database_record(event['id'], event)
91-
self.schedule_items[event['id']] = schedule_item
9293
except:
9394
exception('Failed to update scheduled event')
9495
return result
@@ -122,6 +123,12 @@ def get_scheduled_events(self):
122123
try:
123124
with self.mutex:
124125
events = [schedule_item['event'] for schedule_item in self.schedule_items.values()]
126+
for event in events:
127+
try:
128+
# Don't include last run value that is only used locally.
129+
del event['last_run']
130+
except:
131+
pass
125132
except:
126133
exception('Failed to get scheduled events')
127134
return events
@@ -178,8 +185,8 @@ def create_job(self, schedule_item):
178185
schedule_item['job'] = schedule.every(config['interval'], config['start_date']).months.at(config['start_date'])
179186
if config['unit'] == 'year':
180187
schedule_item['job'] = schedule.every(config['interval'], config['start_date']).years.at(config['start_date'])
181-
if 'last_run' in config:
182-
schedule_item['job'].set_last_run(config['last_run'])
188+
if 'last_run' in schedule_item['event']:
189+
schedule_item['job'].set_last_run(schedule_item['event']['last_run'])
183190
schedule_item['job'].do(self.run_scheduled_item, schedule_item)
184191
except:
185192
exception('Failed setting up scheduler')
@@ -197,17 +204,43 @@ def run_scheduled_item(self, schedule_item):
197204
result = True
198205
event = schedule_item['event']
199206
config = event['config']
200-
config['last_run'] = datetime.strftime(datetime.utcnow(), '%Y-%m-%d %H:%M')
207+
event['last_run'] = datetime.strftime(datetime.utcnow(), '%Y-%m-%d %H:%M')
201208
with self.mutex:
202209
self.update_database_record(event['id'], event)
210+
action_executed = False
203211
for action in event['actions']:
204212
info('Executing scheduled action: {}'.format(action))
205-
result = self.client.RunAction(action)
213+
result = self.client.RunAction(action)
206214
if result == False:
207215
error('Failed to execute action: {}'.format(action))
216+
else:
217+
action_executed = True
208218
if config['type'] == 'date' and result == True:
209219
with self.mutex:
210220
schedule.cancel_job(schedule_item['job'])
221+
if action_executed and 'http_push' in event:
222+
info('Scheduler making HTTP request')
223+
http_push = event['http_push']
224+
try:
225+
future = None
226+
session = FuturesSession(max_workers=1)
227+
session.headers = http_push['headers']
228+
if http_push['method'] == 'GET':
229+
future = session.get(http_push['url'])
230+
if http_push['method'] == 'POST':
231+
future = session.post(http_push['url'], dumps(http_push['payload']))
232+
if http_push['method'] == 'PUT':
233+
future = session.put(http_push['url'], dumps(http_push['payload']))
234+
if http_push['method'] == 'DELETE':
235+
future = session.delete(http_push['url'])
236+
except Exception as ex:
237+
error('Scheduler HTTP request exception: {}'.format(ex))
238+
return None
239+
try:
240+
response = future.result(30)
241+
info('Scheduler HTTP response: {}'.format(response))
242+
except:
243+
pass
211244

212245
def add_database_record(self, id, event):
213246
"""Add a scheduled event to the database

0 commit comments

Comments
 (0)