Skip to content

Commit 4a407a6

Browse files
authored
Merge pull request #1020 from onkelandy/stateengine
Stateengine Plugin Updates
2 parents e91229b + d5496b0 commit 4a407a6

16 files changed

+511
-135
lines changed

stateengine/StateEngineAction.py

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(self, abitem, name: str):
7575
self.__delay = StateEngineValue.SeValue(self._abitem, "delay")
7676
self.__repeat = None
7777
self.__instanteval = None
78+
self.__overwrite = None
7879
self.nextconditionset = StateEngineValue.SeValue(self._abitem, "nextconditionset", True, "regex")
7980
self.conditionset = StateEngineValue.SeValue(self._abitem, "conditionset", True, "regex")
8081
self.previousconditionset = StateEngineValue.SeValue(self._abitem, "previousconditionset", True, "regex")
@@ -124,9 +125,17 @@ def update_delay(self, value):
124125
return _issue
125126

126127
def update_instanteval(self, value):
128+
if self.__instanteval is None:
129+
self.__instanteval = StateEngineValue.SeValue(self._abitem, "instanteval", False, "bool")
127130
_issue = self._update_value(self.__instanteval, value, 'instanteval')
128131
return _issue
129132

133+
def update_overwrite(self, value):
134+
if self.__overwrite is None:
135+
self.__overwrite = StateEngineValue.SeValue(self._abitem, "overwrite", False, "bool")
136+
_issue = self._update_value(self.__overwrite, value, 'overwrite')
137+
return _issue
138+
130139
def update_mindelta(self, value):
131140
self._log_warning("Mindelta is only relevant for set (force) actions - ignoring {}", value)
132141
_issue = {self._name: {'issue': 'Mindelta not relevant for this action type', 'attribute': ['mindelta'],
@@ -206,6 +215,10 @@ def write_to_logger(self):
206215
instanteval = self.__instanteval.write_to_logger()
207216
else:
208217
instanteval = False
218+
if self.__overwrite is not None:
219+
overwrite = self.__overwrite.write_to_logger()
220+
else:
221+
overwrite = None
209222
if self.nextconditionset is not None:
210223
nextconditionset = self.nextconditionset.write_to_logger()
211224
else:
@@ -230,7 +243,7 @@ def write_to_logger(self):
230243
self._info_dict.update({'function': str(self._function), 'nextconditionset': nextconditionset,
231244
'conditionset': conditionset, 'repeat': str(repeat), 'delay': str(delay), 'mode': mode,
232245
'order': str(order), 'previousconditionset': previousconditionset,
233-
'instanteval': str(instanteval), 'previousstate_conditionset': previousstate_conditionset,
246+
'instanteval': str(instanteval), 'overwrite': str(overwrite), 'previousstate_conditionset': previousstate_conditionset,
234247
'actionstatus': {}})
235248

236249
def set_source(self, current_condition, previous_condition, previousstate_condition, next_condition):
@@ -600,42 +613,82 @@ def _waitforexecute(self, state, actionname: str, namevar: str = "", repeat_text
600613

601614
self._log_decrease_indent(50)
602615
self._log_increase_indent()
616+
vals = {
617+
'state': state,
618+
'actionname': actionname,
619+
'namevar': self._name,
620+
'repeat_text': repeat_text,
621+
'value': None,
622+
'current_condition': current_condition,
623+
'previous_condition': previous_condition,
624+
'previousstate_condition': previousstate_condition,
625+
'next_condition': next_condition
626+
}
603627
if delay == 0:
628+
self._abitem.scheduler_remove(self._scheduler_name)
604629
self._log_info("Action '{}': Running.", namevar)
605-
self.real_execute(state, actionname, namevar, repeat_text, None, False, current_condition, previous_condition, previousstate_condition, next_condition)
630+
self.real_execute(
631+
state, actionname, namevar, repeat_text,
632+
None, False, # False = kein Instant Eval
633+
current_condition, previous_condition,
634+
previousstate_condition, next_condition
635+
)
606636
else:
607637
instanteval = None if self.__instanteval is None else self.__instanteval.get()
608-
self._log_info("Action '{0}': Add {1} second timer '{2}' "
609-
"for delayed execution.{3} Instant Eval: {4}", self._name, delay,
610-
self._scheduler_name, repeat_text, instanteval)
638+
overwrite = None if self.__overwrite is None else self.__overwrite.get()
639+
self._log_info(
640+
"Action '{0}': Add {1} second timer '{2}' for delayed execution.{3} Instant Eval: {4}. Overwrite: {5}",
641+
self._name, delay, self._scheduler_name, repeat_text, instanteval, overwrite
642+
)
643+
611644
next_run = self.shtime.now() + datetime.timedelta(seconds=delay)
645+
612646
if instanteval is True:
613647
self._log_increase_indent()
614648
self._log_debug("Evaluating value for delayed action '{}'.", namevar)
615-
value = self.real_execute(state, actionname, namevar, repeat_text, None, True, current_condition, previous_condition, previousstate_condition, next_condition)
616-
self._log_debug("Value for delayed action is going to be '{}'.", value)
649+
650+
vals['value'] = self.real_execute(
651+
state, actionname, self._name, repeat_text,
652+
None, True,
653+
current_condition, previous_condition,
654+
previousstate_condition, next_condition
655+
)
656+
657+
self._log_debug("Value for delayed action is going to be '{}'.", vals['value'])
617658
self._log_decrease_indent()
618-
else:
619-
value = None
620-
self._abitem.add_scheduler_entry(self._scheduler_name)
659+
621660
self.update_webif_actionstatus(state, self._name, 'Scheduled')
622-
self._se_plugin.scheduler_add(self._scheduler_name, self._delayed_execute,
623-
value={'actionname': actionname, 'namevar': self._name,
624-
'repeat_text': repeat_text, 'value': value,
625-
'current_condition': current_condition,
626-
'previous_condition': previous_condition,
627-
'previousstate_condition': previousstate_condition,
628-
'next_condition': next_condition, 'state': state}, next=next_run)
629-
630-
def _delayed_execute(self, actionname: str, namevar: str = "", repeat_text: str = "", value=None, current_condition=None, previous_condition=None, previousstate_condition=None, next_condition=None, state=None, caller=None):
631-
if state:
632-
self._log_debug("Putting delayed action '{}' from state '{}' into queue. Caller: {}", namevar, state, caller)
633-
self.__queue.put(["delayedaction", self, actionname, namevar, repeat_text, value, current_condition, previous_condition, previousstate_condition, next_condition, state])
634-
else:
635-
self._log_debug("Putting delayed action '{}' into queue. Caller: {}", namevar, caller)
636-
self.__queue.put(["delayedaction", self, actionname, namevar, repeat_text, value, current_condition, previous_condition, previousstate_condition, next_condition])
661+
662+
self._abitem.scheduler_add(
663+
self._scheduler_name,
664+
self,
665+
value=vals,
666+
next=next_run,
667+
overwrite=overwrite
668+
)
669+
670+
def delayed_execute(self, actionname: str, namevar: str = "", repeat_text: str = "", value=None, current_condition=None, previous_condition=None, previousstate_condition=None, next_condition=None, state=None, caller=None):
671+
self._log_debug(
672+
"Putting action '{}'{} into queue. Caller: {}", namevar,
673+
f" from state '{state}'" if state else "", caller
674+
)
675+
676+
self.__queue.put([
677+
"delayedaction",
678+
self,
679+
actionname,
680+
namevar,
681+
repeat_text,
682+
value,
683+
current_condition,
684+
previous_condition,
685+
previousstate_condition,
686+
next_condition,
687+
state
688+
])
637689
if not self._abitem.update_lock.locked():
638690
self._log_debug("Running queue")
691+
self._log_increase_indent()
639692
self._abitem.run_queue()
640693

641694
# Really execute the action (needs to be implemented in derived classes)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/env python3
2+
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
3+
#########################################################################
4+
# Copyright 2014- Thomas Ernst offline@gmx.net
5+
#########################################################################
6+
# Finite state machine plugin for SmartHomeNG
7+
#
8+
# This plugin is free software: you can redistribute it and/or modify
9+
# it under the terms of the GNU General Public License as published by
10+
# the Free Software Foundation, either version 3 of the License, or
11+
# (at your option) any later version.
12+
#
13+
# This plugin is distributed in the hope that it will be useful,
14+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
# GNU General Public License for more details.
17+
#
18+
# You should have received a copy of the GNU General Public License
19+
# along with this plugin. If not, see <http://www.gnu.org/licenses/>.
20+
#########################################################################
21+
from queue import Queue
22+
from threading import RLock
23+
24+
25+
class ActionScheduler:
26+
27+
def __init__(self, smarthome, se_plugin, logger):
28+
self._queue = Queue()
29+
self._scheduled = {}
30+
self._sh = smarthome
31+
self._se_plugin = se_plugin
32+
self.logger = logger
33+
self._lock = RLock()
34+
self._dirty = False
35+
self._next_wakeup = None
36+
37+
# ---------- API für Items ----------
38+
def add(self, abitem, name, action, value, next_run, overwrite, callback=None):
39+
self._queue.put((
40+
'add',
41+
abitem,
42+
name,
43+
{
44+
'action': action,
45+
'value': value or {},
46+
'next': next_run,
47+
'overwrite': overwrite
48+
},
49+
callback
50+
))
51+
self._mark_dirty()
52+
53+
def remove(self, abitem, name, callback=None):
54+
self._queue.put(('remove', abitem, name, callback))
55+
self._mark_dirty()
56+
57+
def remove_all(self, abitem, callback=None):
58+
self._queue.put(('remove_all', abitem, callback))
59+
self._mark_dirty()
60+
61+
def _mark_dirty(self):
62+
if not self._dirty:
63+
self._dirty = True
64+
self._se_plugin.scheduler_trigger('actionscheduler')
65+
66+
# ---------- Scheduler Loop ----------
67+
def run(self):
68+
self._dirty = False
69+
self._next_wakeup = None
70+
now = self._sh.shtime.now()
71+
72+
while not self._queue.empty():
73+
cmd = self._queue.get()
74+
75+
if cmd[0] == 'add':
76+
_, abitem, name, entry, callback = cmd
77+
key = (abitem, name)
78+
with self._lock:
79+
if key in self._scheduled and entry.get('overwrite', True) is False:
80+
new_next = self._scheduled[key]['next']
81+
added = False
82+
else:
83+
self._scheduled[key] = entry
84+
new_next = entry.get('next')
85+
added = True
86+
87+
if callback:
88+
try:
89+
callback(added, new_next)
90+
except Exception as e:
91+
self.logger.debug(f"Add callback failed for '{name}': {e}")
92+
93+
elif cmd[0] == 'remove':
94+
_, abitem, name, callback = cmd
95+
with self._lock:
96+
removed = self._scheduled.pop((abitem, name), None) is not None
97+
if callback:
98+
try:
99+
callback(removed)
100+
except Exception as e:
101+
self.logger.debug(f"Remove callback failed for '{name}': {e}")
102+
103+
elif cmd[0] == 'remove_all':
104+
_, abitem, callback = cmd
105+
removed = 0
106+
with self._lock:
107+
for key in list(self._scheduled.keys()):
108+
if key[0] is abitem:
109+
self._scheduled.pop(key, None)
110+
removed += 1
111+
if callback:
112+
callback(removed)
113+
114+
execute = []
115+
with self._lock:
116+
for key, entry in self._scheduled.items():
117+
if now >= entry['next']:
118+
execute.append(key)
119+
for (abitem, name) in execute:
120+
with self._lock:
121+
entry = self._scheduled.pop((abitem, name), None)
122+
if not entry:
123+
continue
124+
125+
action = entry['action']
126+
vals = entry.get('value', {})
127+
'''
128+
try:
129+
self.logger.develop(f"Scheduled action '{name}' executing")
130+
except Exception:
131+
self.logger.debug(f"Scheduled action '{name}' executing")
132+
'''
133+
action.delayed_execute(**vals)
134+
next_times = []
135+
136+
with self._lock:
137+
for entry in self._scheduled.values():
138+
next_times.append(entry['next'])
139+
140+
if next_times:
141+
next_wakeup = min(next_times)
142+
self._se_plugin.scheduler_trigger('actionscheduler', dt=next_wakeup)

stateengine/StateEngineActions.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, abitem):
4040
self.__unassigned_delays = {}
4141
self.__unassigned_repeats = {}
4242
self.__unassigned_instantevals = {}
43+
self.__unassigned_overwrites = {}
4344
self.__unassigned_orders = {}
4445
self.__unassigned_nextconditionsets = {}
4546
self.__unassigned_conditionsets = {}
@@ -112,11 +113,19 @@ def update(self, attribute, value):
112113
elif func == "se_instanteval":
113114
# set instant calculation
114115
if name not in self.__actions:
115-
# If we do not have the action yet (repeat-attribute before action-attribute), ...
116+
# If we do not have the action yet (instanteval-attribute before action-attribute), ...
116117
self.__unassigned_instantevals[name] = value
117118
else:
118119
_issue = self.__actions[name].update_instanteval(value)
119120
return _count, _issue
121+
elif func == "se_overwrite":
122+
# set overwriting delayed actions
123+
if name not in self.__actions:
124+
# If we do not have the action yet (overwrite-attribute before action-attribute), ...
125+
self.__unassigned_overwrites[name] = value
126+
else:
127+
_issue = self.__actions[name].update_overwrite(value)
128+
return _count, _issue
120129
elif func == "se_repeat":
121130
# set repeat
122131
if name not in self.__actions:
@@ -300,6 +309,12 @@ def __ensure_action_exists(self, func, name):
300309
_issue_list.append(_issue)
301310
del self.__unassigned_instantevals[name]
302311

312+
if name in self.__unassigned_overwrites:
313+
_issue = action.update_overwrite(self.__unassigned_overwrites[name])
314+
if _issue:
315+
_issue_list.append(_issue)
316+
del self.__unassigned_overwrites[name]
317+
303318
if name in self.__unassigned_mindeltas:
304319
_issue = action.update_mindelta(self.__unassigned_mindeltas[name])
305320
if _issue:
@@ -369,7 +384,7 @@ def remove_action(e):
369384
self._log_warning("Removed action {0} because: {1}.", name, e)
370385

371386
parameter = {'function': None, 'force': None, 'repeat': None, 'delay': 0, 'order': None, 'nextconditionset': None, 'conditionset': None,
372-
'previousconditionset': None, 'previousstate_conditionset': None, 'mode': None, 'instanteval': None, 'mindelta': None, 'minagedelta': None}
387+
'previousconditionset': None, 'previousstate_conditionset': None, 'mode': None, 'instanteval': None, 'overwrite': None, 'mindelta': None, 'minagedelta': None}
373388
_issue = None
374389
_issue_list = []
375390
# value_list needs to be string or list
@@ -513,12 +528,15 @@ def remove_action(e):
513528
except ValueError as ex:
514529
remove_action(ex)
515530
return _issue_list
516-
517531
# add additional parameters
518532
if parameter['instanteval'] is not None:
519533
_issue = self.__actions[name].update_instanteval(parameter['instanteval'])
520534
if _issue:
521535
_issue_list.append(_issue)
536+
if parameter['overwrite'] is not None:
537+
_issue = self.__actions[name].update_overwrite(parameter['overwrite'])
538+
if _issue:
539+
_issue_list.append(_issue)
522540
if parameter['repeat'] is not None:
523541
_issue = self.__actions[name].update_repeat(parameter['repeat'])
524542
if _issue:

0 commit comments

Comments
 (0)