Skip to content

Commit 654f173

Browse files
authored
Merge pull request #6844 from oliver-sanders/6537
trigger: support ^/$ syntax for referencing the initial/final cycle points
2 parents d3f0c5b + 0a8281e commit 654f173

File tree

6 files changed

+154
-25
lines changed

6 files changed

+154
-25
lines changed

changes.d/6844.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Cylc commands including `cylc trigger` now support the `^` and `$` syntax for referencing the initial and final cycle points respectively.

cylc/flow/id_match.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Dict,
2222
Iterable,
2323
List,
24+
Optional,
2425
TYPE_CHECKING,
2526
# Tuple,
2627
# Union,
@@ -78,6 +79,8 @@
7879
def filter_ids(
7980
pool: 'Pool',
8081
ids: 'Iterable[str]',
82+
icp: 'PointBase',
83+
fcp: 'Optional[PointBase]',
8184
*,
8285
warn: 'bool' = True,
8386
out: 'IDTokens' = IDTokens.Task,
@@ -112,6 +115,7 @@ def filter_ids(
112115
_cycles: 'List[PointBase]' = []
113116
_tasks: 'List[TaskProxy]' = []
114117
_not_matched: 'List[str]' = []
118+
_invalid: 'List[str]' = []
115119

116120
# enable / disable pattern matching
117121
match: Callable[[Any, Any], bool]
@@ -138,7 +142,7 @@ def filter_ids(
138142
try:
139143
id_tokens_map[id_] = Tokens(id_, relative=True)
140144
except ValueError:
141-
_not_matched.append(id_)
145+
_invalid.append(id_)
142146
LOG.warning(f'Invalid ID: {id_}')
143147

144148
for id_, tokens in id_tokens_map.items():
@@ -174,6 +178,19 @@ def filter_ids(
174178
task = tokens[IDTokens.Task.value]
175179
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
176180
task_sel = task_sel_raw or '*'
181+
182+
# support ^/$ derefencing of the initial and final cycle points
183+
if cycle == '^':
184+
cycle = str(icp)
185+
elif cycle == '$':
186+
if not fcp:
187+
_invalid.append(id_)
188+
LOG.warning(
189+
'ID references final cycle point, but none is set:'
190+
f' {id_}'
191+
)
192+
cycle = str(fcp)
193+
177194
for icycle, itasks in pool.items():
178195
if not point_match(icycle, cycle, pattern_match):
179196
continue
@@ -208,7 +225,9 @@ def filter_ids(
208225
raise NotImplementedError
209226

210227
if not (cycles or tasks):
211-
_not_matched.append(id_)
228+
_not_matched.append(
229+
id_.replace('^', str(icp)).replace('$', str(fcp))
230+
)
212231
if warn:
213232
LOG.warning(f"No active tasks matching: {id_}")
214233
else:
@@ -227,7 +246,7 @@ def filter_ids(
227246
if icycle in pool:
228247
_tasks.extend(pool[icycle].values())
229248
ret = _tasks
230-
return ret, _not_matched
249+
return ret, _not_matched, _invalid
231250

232251

233252
def point_match(

cylc/flow/task_pool.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2496,9 +2496,11 @@ def filter_task_proxies(
24962496
(matched, inactive_matched, unmatched)
24972497
24982498
"""
2499-
matched, unmatched = filter_ids(
2499+
matched, unmatched, invalid = filter_ids(
25002500
self.active_tasks,
25012501
ids,
2502+
self.config.initial_point,
2503+
self.config.final_point,
25022504
warn=warn_no_active,
25032505
)
25042506
inactive_matched: 'Set[Tuple[TaskDef, PointBase]]' = set()
@@ -2507,7 +2509,7 @@ def filter_task_proxies(
25072509
unmatched
25082510
)
25092511

2510-
return matched, inactive_matched, unmatched
2512+
return matched, inactive_matched, unmatched + invalid
25112513

25122514
def match_inactive_tasks(
25132515
self,

tests/functional/cylc-trigger/00-compat/flow.cylc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
R1 = foo => bar
44
[runtime]
55
[[foo]]
6-
script = cylc trigger "${CYLC_WORKFLOW_ID}//1/bar"
6+
script = cylc trigger "${CYLC_WORKFLOW_ID}//^/bar"
77
[[bar]]
88
script = true

tests/integration/test_task_pool.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,6 +1133,57 @@ async def test_detect_incomplete_tasks(
11331133
assert itask in schd.pool.get_tasks()
11341134

11351135

1136+
async def test_trigger_icp_fcp_syntax(
1137+
flow,
1138+
scheduler,
1139+
start,
1140+
log_filter,
1141+
):
1142+
"""It should support the ^/$ syntax for referencing the initial/final cp.
1143+
1144+
See https://github.com/cylc/cylc-flow/issues/6537
1145+
"""
1146+
cfg = {
1147+
'scheduling': {
1148+
'cycling mode': 'integer',
1149+
'initial cycle point': 1,
1150+
'final cycle point': 2,
1151+
'graph': {
1152+
'R1/^': 'start',
1153+
'R1/$': 'end',
1154+
},
1155+
},
1156+
}
1157+
1158+
# trigger tasks at both the initial and final cycle points
1159+
id_ = flow(cfg)
1160+
schd = scheduler(id_)
1161+
async with start(schd) as log:
1162+
await commands.run_cmd(
1163+
commands.force_trigger_tasks(schd, ['^/start', '$/end'], ['1'])
1164+
)
1165+
assert log_filter(contains='[1/start:waiting(queued)] => waiting')
1166+
assert log_filter(contains='[2/end:waiting(queued)] => waiting')
1167+
log.clear()
1168+
1169+
# clear the final cycle point
1170+
del cfg['scheduling']['final cycle point']
1171+
del cfg['scheduling']['graph']['R1/$']
1172+
1173+
# try triggering a task at the (non existent) final cycle point
1174+
id_ = flow(cfg)
1175+
schd = scheduler(id_)
1176+
async with start(schd):
1177+
await commands.run_cmd(
1178+
commands.force_trigger_tasks(schd, ['^/start', '$/end'], ['1'])
1179+
)
1180+
assert log_filter(contains='[1/start:waiting(queued)] => waiting')
1181+
assert not log_filter(contains='[2/end:waiting(queued)] => waiting')
1182+
assert log_filter(
1183+
contains='ID references final cycle point, but none is set: $/end'
1184+
)
1185+
1186+
11361187
async def test_future_trigger_final_point(
11371188
flow,
11381189
scheduler,

tests/unit/test_id_match.py

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,15 @@ def test_filter_ids_task_mode(task_pool, ids, matched, not_matched):
128128
{}
129129
)
130130

131-
_matched, _not_matched = filter_ids(pool, ids)
131+
_matched, _not_matched, _invalid = filter_ids(
132+
pool,
133+
ids,
134+
IntegerPoint('1'),
135+
IntegerPoint('1'),
136+
)
132137
assert [get_task_id(itask) for itask in _matched] == matched
133138
assert _not_matched == not_matched
139+
assert not _invalid
134140

135141

136142
@pytest.mark.parametrize(
@@ -189,22 +195,43 @@ def test_filter_ids_cycle_mode(task_pool, ids, matched, not_matched):
189195
{}
190196
)
191197

192-
_matched, _not_matched = filter_ids(pool, ids, out=IDTokens.Cycle)
198+
_matched, _not_matched, _invalid = filter_ids(
199+
pool,
200+
ids,
201+
IntegerPoint('1'),
202+
IntegerPoint('1'),
203+
out=IDTokens.Cycle,
204+
)
193205
assert _matched == [IntegerPoint(i) for i in matched]
194206
assert _not_matched == not_matched
207+
assert not _invalid
195208

196209

197210
def test_filter_ids_invalid(caplog):
198211
"""Ensure invalid IDs are handled elegantly."""
199-
matched, not_matched = filter_ids({}, ['#'])
200-
assert matched == []
201-
assert not_matched == ['#']
212+
_matched, _not_matched, _invalid = filter_ids(
213+
{},
214+
['#'],
215+
IntegerPoint('1'),
216+
IntegerPoint('1'),
217+
)
218+
assert _matched == []
219+
assert _not_matched == ['#']
220+
assert not _invalid
202221
assert caplog.record_tuples == [
203222
('cylc', 30, 'No active tasks matching: #'),
204223
]
224+
205225
caplog.clear()
206-
matched, not_matched = filter_ids({}, ['#'], warn=False)
226+
_matched, _not_matched, _invalid = filter_ids(
227+
{},
228+
['#'],
229+
IntegerPoint('1'),
230+
IntegerPoint('1'),
231+
warn=False,
232+
)
207233
assert caplog.record_tuples == []
234+
assert not _invalid
208235

209236

210237
def test_filter_ids_pattern_match_off(task_pool):
@@ -216,14 +243,17 @@ def test_filter_ids_pattern_match_off(task_pool):
216243
{}
217244
)
218245

219-
_matched, _not_matched = filter_ids(
246+
_matched, _not_matched, _invalid = filter_ids(
220247
pool,
221248
['1/a'],
249+
IntegerPoint('1'),
250+
IntegerPoint('1'),
222251
out=IDTokens.Task,
223252
pattern_match=False,
224253
)
225254
assert [get_task_id(itask) for itask in _matched] == ['1/a:x']
226255
assert _not_matched == []
256+
assert not _invalid
227257

228258

229259
def test_filter_ids_toggle_pattern_matching(task_pool, caplog):
@@ -238,25 +268,31 @@ def test_filter_ids_toggle_pattern_matching(task_pool, caplog):
238268
ids = ['*/*']
239269

240270
# ensure pattern matching works
241-
_matched, _not_matched = filter_ids(
271+
_matched, _not_matched, _invalid = filter_ids(
242272
pool,
243273
ids,
274+
IntegerPoint('1'),
275+
IntegerPoint('1'),
244276
out=IDTokens.Task,
245277
pattern_match=True,
246278
)
247279
assert [get_task_id(itask) for itask in _matched] == ['1/a:x']
248280
assert _not_matched == []
281+
assert not _invalid
249282

250283
# ensure pattern matching can be disabled
251284
caplog.clear()
252-
_matched, _not_matched = filter_ids(
285+
_matched, _not_matched, _invalid = filter_ids(
253286
pool,
254287
ids,
288+
IntegerPoint('1'),
289+
IntegerPoint('1'),
255290
out=IDTokens.Task,
256291
pattern_match=False,
257292
)
258293
assert [get_task_id(itask) for itask in _matched] == []
259294
assert _not_matched == ['*/*']
295+
assert not _invalid
260296

261297
# ensure the ID is logged
262298
assert len(caplog.record_tuples) == 1
@@ -285,36 +321,56 @@ def test_filter_ids_namespace_hierarchy(task_pool, ids, matched, not_matched):
285321
},
286322
)
287323

288-
_matched, _not_matched = filter_ids(
324+
_matched, _not_matched, _invalid = filter_ids(
289325
pool,
290326
ids,
327+
IntegerPoint('1'),
328+
IntegerPoint('1'),
291329
pattern_match=False,
292330
)
293331

294332
assert [get_task_id(itask) for itask in _matched] == matched
295333
assert _not_matched == not_matched
334+
assert not _invalid
296335

297336

298337
def test_filter_ids_out_format():
299-
filter_ids({}, [], out=IDTokens.Cycle)
338+
filter_ids(
339+
{},
340+
[],
341+
IntegerPoint('1'),
342+
IntegerPoint('1'),
343+
out=IDTokens.Cycle,
344+
)
300345
with pytest.raises(ValueError):
301-
filter_ids({}, [], out=IDTokens.Job)
346+
filter_ids(
347+
{},
348+
[],
349+
IntegerPoint('1'),
350+
IntegerPoint('1'),
351+
out=IDTokens.Job,
352+
)
302353

303354

304355
def test_filter_ids_log_errors(caplog):
305-
_, _not_matched = filter_ids({}, ['/////'])
306-
assert _not_matched == ['/////']
356+
*_, _invalid = filter_ids(
357+
{},
358+
['/////'],
359+
IntegerPoint('1'),
360+
IntegerPoint('1'),
361+
)
362+
assert _invalid == ['/////']
307363
assert caplog.record_tuples == [('cylc', 30, 'Invalid ID: /////')]
308364

309365

310366
@pytest.mark.parametrize(
311367
'point, value, pattern_match, expected',
312368
[
313-
(IntegerPoint(23), '23', True, True),
314-
(IntegerPoint(23), '23', False, True),
315-
(IntegerPoint(23), '2*', True, True),
316-
(IntegerPoint(23), '2*', False, False),
317-
(IntegerPoint(23), '2a', True, False),
369+
(IntegerPoint('23'), '23', True, True),
370+
(IntegerPoint('23'), '23', False, True),
371+
(IntegerPoint('23'), '2*', True, True),
372+
(IntegerPoint('23'), '2*', False, False),
373+
(IntegerPoint('23'), '2a', True, False),
318374
(ISO8601Point('2049-01-01T00:00Z'), '2049', True, True),
319375
(ISO8601Point('2049-01-01T00:00Z'), '2049', False, True),
320376
(ISO8601Point('2049-03-01T00:00Z'), '2049', True, False),

0 commit comments

Comments
 (0)