@@ -1844,6 +1844,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
1844
1844
1845
1845
triggers = {}
1846
1846
xtrig_labels = set ()
1847
+
1847
1848
for left in left_nodes :
1848
1849
if left .startswith ('@' ):
1849
1850
xtrig_labels .add (left [1 :])
@@ -2265,6 +2266,17 @@ def load_graph(self):
2265
2266
self .workflow_polling_tasks .update (
2266
2267
parser .workflow_state_polling_tasks )
2267
2268
self ._proc_triggers (parser , seq , task_triggers )
2269
+ self .check_outputs (
2270
+ [
2271
+ task_output
2272
+ for task_output in parser .task_output_opt
2273
+ if task_output [0 ]
2274
+ in [
2275
+ task_output .split (':' )[0 ]
2276
+ for task_output in parser .terminals
2277
+ ]
2278
+ ]
2279
+ )
2268
2280
2269
2281
self .set_required_outputs (task_output_opt )
2270
2282
@@ -2278,6 +2290,26 @@ def load_graph(self):
2278
2290
for tdef in self .taskdefs .values ():
2279
2291
tdef .tweak_outputs ()
2280
2292
2293
+ def check_outputs (
2294
+ self , tasks_and_outputs : Iterable [Tuple [str , str ]]
2295
+ ) -> None :
2296
+ """Check that task outputs have been registered with tasks.
2297
+
2298
+ Args: tasks_and_outputs: ((task, output), ...)
2299
+
2300
+ Raises: WorkflowConfigError is a user has defined a task with a
2301
+ custom output, but has not registered a custom output.
2302
+ """
2303
+ for task , output in tasks_and_outputs :
2304
+ registered_outputs = self .cfg ['runtime' ][task ]['outputs' ]
2305
+ if (
2306
+ not TaskOutputs .is_valid_std_name (output )
2307
+ and output not in registered_outputs
2308
+ ):
2309
+ raise WorkflowConfigError (
2310
+ f"Undefined custom output: { task } :{ output } "
2311
+ )
2312
+
2281
2313
def _proc_triggers (self , parser , seq , task_triggers ):
2282
2314
"""Define graph edges, taskdefs, and triggers, from graph sections."""
2283
2315
suicides = 0
0 commit comments