34
34
import re
35
35
from textwrap import wrap
36
36
import traceback
37
+ from types import SimpleNamespace
37
38
from typing import (
38
39
TYPE_CHECKING ,
39
40
Any ,
@@ -461,6 +462,7 @@ def __init__(
461
462
self .mem_log ("config.py: after get(sparse=False)" )
462
463
463
464
# These 2 must be called before call to init_cyclers(self.cfg):
465
+ self .set_experimental_features ()
464
466
self .process_utc_mode ()
465
467
self .process_cycle_point_tz ()
466
468
@@ -614,6 +616,14 @@ def __init__(
614
616
615
617
skip_mode_validate (self .taskdefs )
616
618
619
+ def set_experimental_features (self ):
620
+ all_ = self .cfg ['scheduler' ]['experimental' ]['all' ]
621
+ self .experimental = SimpleNamespace (** {
622
+ key .replace (' ' , '_' ): value or all_
623
+ for key , value in self .cfg ['scheduler' ]['experimental' ].items ()
624
+ if key != 'all'
625
+ })
626
+
617
627
@staticmethod
618
628
def _warn_if_queues_have_implicit_tasks (
619
629
config , taskdefs , max_warning_lines
@@ -1062,8 +1072,13 @@ def _set_completion_expressions(self):
1062
1072
for name , taskdef in self .taskdefs .items ():
1063
1073
expr = taskdef .rtconfig ['completion' ]
1064
1074
if expr :
1075
+ any_suicide = any (
1076
+ dep .suicide
1077
+ for d in taskdef .dependencies .values ()
1078
+ for dep in d
1079
+ )
1065
1080
# check the user-defined expression
1066
- self ._check_completion_expression (name , expr )
1081
+ self ._check_completion_expression (name , expr , any_suicide )
1067
1082
else :
1068
1083
# derive a completion expression for this taskdef
1069
1084
expr = get_completion_expression (taskdef )
@@ -1086,14 +1101,18 @@ def _set_completion_expressions(self):
1086
1101
# on after the TaskDef has been created
1087
1102
taskdef .rtconfig ['completion' ] = expr
1088
1103
1089
- def _check_completion_expression (self , task_name : str , expr : str ) -> None :
1104
+ def _check_completion_expression (
1105
+ self , task_name : str , expr : str , any_suicide : bool
1106
+ ) -> None :
1090
1107
"""Checks a user-defined completion expression.
1091
1108
1092
1109
Args:
1093
1110
task_name:
1094
1111
The name of the task we are checking.
1095
1112
expr:
1096
1113
The completion expression as defined in the config.
1114
+ any_suicide:
1115
+ Does this task have any suicide triggers
1097
1116
1098
1117
"""
1099
1118
# check completion expressions are not being used in compat mode
@@ -1242,12 +1261,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None:
1242
1261
and expr_opt is None
1243
1262
and compvar in {'submit_failed' , 'expired' }
1244
1263
):
1245
- raise WorkflowConfigError (
1264
+ msg = (
1246
1265
f'{ task_name } :{ trigger } is permitted in the graph'
1247
- ' but is not referenced in the completion'
1248
- ' expression (so is not permitted by it).'
1249
- f'\n Try: completion = "{ expr } or { compvar } "'
1266
+ ' but is not referenced in the completion.'
1250
1267
)
1268
+ if (
1269
+ any_suicide
1270
+ and trigger == "expired"
1271
+ and self .experimental .expire_triggers
1272
+ ):
1273
+ msg += (
1274
+ "\n This may be due to use of an expire "
1275
+ "(formerly suicide) trigger."
1276
+ )
1277
+ msg += f'\n Try: completion = "{ expr } or { compvar } "'
1278
+ raise WorkflowConfigError (msg )
1251
1279
1252
1280
if (
1253
1281
graph_opt is False
@@ -2334,7 +2362,8 @@ def load_graph(self):
2334
2362
parser = GraphParser (
2335
2363
family_map ,
2336
2364
self .parameters ,
2337
- task_output_opt = task_output_opt
2365
+ task_output_opt = task_output_opt ,
2366
+ expire_triggers = self .experimental .expire_triggers ,
2338
2367
)
2339
2368
parser .parse_graph (graph )
2340
2369
task_output_opt .update (parser .task_output_opt )
0 commit comments