Skip to content

Commit 2b2fc77

Browse files
wxtimhjoliverMetRonnie
authored
Xtrig arg validate (#5955)
* Xtrigger function arg validation. * Add integration tests for xtrigger validation, which provides simple examples for each of the built in xtriggers. - Xrandom validate function - Init test xrandom validate function - Add unit tests for validation of built in xtriggers - Automatically validate xtrigger function signature --------- Co-authored-by: Hilary James Oliver <[email protected]> * Apply suggestions from code review Co-authored-by: Ronnie Dutta <[email protected]> * fix flake8 * Improve xtrigger validation (#60) * Improve xtrigger validation * wall_clock: use placeholder function for signature validation & autodocs * Fix docstring for autodoc [skip ci] --------- Co-authored-by: Hilary Oliver <[email protected]> Co-authored-by: Ronnie Dutta <[email protected]>
1 parent 24caa24 commit 2b2fc77

File tree

22 files changed

+602
-199
lines changed

22 files changed

+602
-199
lines changed

changes.d/5955.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support xtrigger argument validation.

cylc/flow/config.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from cylc.flow.id import Tokens
5858
from cylc.flow.cycling.integer import IntegerInterval
5959
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval
60+
6061
from cylc.flow.exceptions import (
6162
CylcError,
6263
InputError,
@@ -1718,28 +1719,25 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
17181719
if label != 'wall_clock':
17191720
raise WorkflowConfigError(f"xtrigger not defined: {label}")
17201721
else:
1721-
# Allow "@wall_clock" in the graph as an undeclared
1722-
# zero-offset clock xtrigger.
1723-
xtrig = SubFuncContext(
1724-
'wall_clock', 'wall_clock', [], {})
1722+
# Allow "@wall_clock" in graph as implicit zero-offset.
1723+
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})
17251724

1726-
if xtrig.func_name == 'wall_clock':
1727-
if self.cycling_type == INTEGER_CYCLING_TYPE:
1728-
raise WorkflowConfigError(
1729-
"Clock xtriggers require datetime cycling:"
1730-
f" {label} = {xtrig.get_signature()}"
1731-
)
1732-
else:
1733-
# Convert offset arg to kwarg for certainty later.
1734-
if "offset" not in xtrig.func_kwargs:
1735-
xtrig.func_kwargs["offset"] = None
1736-
with suppress(IndexError):
1737-
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
1725+
if (
1726+
xtrig.func_name == 'wall_clock'
1727+
and self.cycling_type == INTEGER_CYCLING_TYPE
1728+
):
1729+
raise WorkflowConfigError(
1730+
"Clock xtriggers require datetime cycling:"
1731+
f" {label} = {xtrig.get_signature()}"
1732+
)
17381733

1739-
if self.xtrigger_mgr is None:
1740-
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
1741-
else:
1734+
# Generic xtrigger validation.
1735+
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
1736+
1737+
if self.xtrigger_mgr:
1738+
# (not available during validation)
17421739
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
1740+
17431741
self.taskdefs[right].add_xtrig_label(label, seq)
17441742

17451743
def get_actual_first_point(self, start_point):
@@ -2427,10 +2425,10 @@ def upgrade_clock_triggers(self):
24272425
# Derive an xtrigger label.
24282426
label = '_'.join(('_cylc', 'wall_clock', task_name))
24292427
# Define the xtrigger function.
2430-
xtrig = SubFuncContext(label, 'wall_clock', [], {})
2431-
xtrig.func_kwargs["offset"] = offset
2428+
args = [] if offset is None else [offset]
2429+
xtrig = SubFuncContext(label, 'wall_clock', args, {})
24322430
if self.xtrigger_mgr is None:
2433-
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
2431+
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
24342432
else:
24352433
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
24362434
# Add it to the task, for each sequence that the task appears in.

cylc/flow/exceptions.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,12 @@ class XtriggerConfigError(WorkflowConfigError):
241241
242242
"""
243243

244-
def __init__(self, label: str, trigger: str, message: str):
244+
def __init__(self, label: str, message: str):
245245
self.label: str = label
246-
self.trigger: str = trigger
247246
self.message: str = message
248247

249-
def __str__(self):
250-
return f'[{self.label}] {self.message}'
248+
def __str__(self) -> str:
249+
return f'[@{self.label}] {self.message}'
251250

252251

253252
class ClientError(CylcError):

cylc/flow/scripts/function_run.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
1919
(This command is for internal use.)
2020
21-
Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
22-
defined in a module of the same name. Positional and keyword arguments must be
23-
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
24-
local xtrigger modules.
21+
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
22+
It must be in a module of the same name. Positional and keyword arguments must
23+
be passed in as JSON strings.
24+
25+
Python entry points are the preferred way to make xtriggers available to the
26+
scheduler, but local xtriggers can be stored in <src-dir>.
27+
2528
"""
2629
import sys
2730

cylc/flow/subprocpool.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
from cylc.flow.task_proxy import TaskProxy
4242
from cylc.flow.wallclock import get_current_time_string
4343

44-
_XTRIG_FUNCS: dict = {}
44+
_XTRIG_MOD_CACHE: dict = {}
45+
_XTRIG_FUNC_CACHE: dict = {}
4546

4647

4748
def _killpg(proc, signal):
@@ -66,68 +67,86 @@ def _killpg(proc, signal):
6667
return True
6768

6869

69-
def get_func(func_name, src_dir):
70-
"""Find and return an xtrigger function from a module of the same name.
70+
def get_xtrig_mod(mod_name, src_dir):
71+
"""Find, cache, and return a named xtrigger module.
7172
72-
These locations are checked in this order:
73-
- <src_dir>/lib/python/
74-
- `$CYLC_PYTHONPATH`
75-
- defined via a `cylc.xtriggers` entry point for an
76-
installed Python package.
73+
Locations checked in this order:
74+
- <src_dir>/lib/python (prepend to sys.path)
75+
- $CYLC_PYTHONPATH (already in sys.path)
76+
- `cylc.xtriggers` entry point
7777
78-
Workflow source directory passed in as this is executed in an independent
79-
process in the command pool and therefore doesn't know about the workflow.
78+
(Check entry point last so users can override with local implementations).
79+
80+
Workflow source dir passed in - this executes in an independent subprocess.
81+
82+
Raises:
83+
ImportError, if the module is not found
8084
8185
"""
82-
if func_name in _XTRIG_FUNCS:
83-
return _XTRIG_FUNCS[func_name]
86+
if mod_name in _XTRIG_MOD_CACHE:
87+
# Found and cached already.
88+
return _XTRIG_MOD_CACHE[mod_name]
8489

8590
# First look in <src-dir>/lib/python.
8691
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
87-
mod_name = func_name
8892
try:
89-
mod_by_name = __import__(mod_name, fromlist=[mod_name])
93+
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
9094
except ImportError:
91-
# Look for xtriggers via entry_points for external sources.
92-
# Do this after the lib/python and PYTHONPATH approaches to allow
93-
# users to override entry_point definitions with local/custom
94-
# implementations.
95+
# Then entry point.
9596
for entry_point in iter_entry_points('cylc.xtriggers'):
96-
if func_name == entry_point.name:
97-
_XTRIG_FUNCS[func_name] = entry_point.load()
98-
return _XTRIG_FUNCS[func_name]
99-
97+
if mod_name == entry_point.name:
98+
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
99+
return _XTRIG_MOD_CACHE[mod_name]
100100
# Still unable to find anything so abort
101101
raise
102102

103-
try:
104-
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
105-
except AttributeError:
106-
# Module func_name has no function func_name, nor an entry_point entry.
107-
raise
108-
return _XTRIG_FUNCS[func_name]
103+
return _XTRIG_MOD_CACHE[mod_name]
104+
105+
106+
def get_xtrig_func(mod_name, func_name, src_dir):
107+
"""Find, cache, and return a function from an xtrigger module.
108+
109+
Raises:
110+
ImportError, if the module is not found
111+
AttributeError, if the function is not found in the module
112+
113+
"""
114+
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
115+
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]
116+
117+
mod = get_xtrig_mod(mod_name, src_dir)
118+
119+
_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)
120+
121+
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]
109122

110123

111124
def run_function(func_name, json_args, json_kwargs, src_dir):
112125
"""Run a Python function in the process pool.
113126
114127
func_name(*func_args, **func_kwargs)
115128
129+
The function is presumed to be in a module of the same name.
130+
116131
Redirect any function stdout to stderr (and workflow log in debug mode).
117132
Return value printed to stdout as a JSON string - allows use of the
118133
existing process pool machinery as-is. src_dir is for local modules.
119134
120135
"""
121136
func_args = json.loads(json_args)
122137
func_kwargs = json.loads(json_kwargs)
138+
123139
# Find and import then function.
124-
func = get_func(func_name, src_dir)
140+
func = get_xtrig_func(func_name, func_name, src_dir)
141+
125142
# Redirect stdout to stderr.
126143
orig_stdout = sys.stdout
127144
sys.stdout = sys.stderr
128145
res = func(*func_args, **func_kwargs)
146+
129147
# Restore stdout.
130148
sys.stdout = orig_stdout
149+
131150
# Write function return value as JSON to stdout.
132151
sys.stdout.write(json.dumps(res))
133152

0 commit comments

Comments
 (0)