Skip to content

Commit a248e6e

Browse files
committed
Fix cylc set on a task no longer in pool causing task state regression
1 parent 1d77c4f commit a248e6e

File tree

2 files changed

+55
-4
lines changed

2 files changed

+55
-4
lines changed

cylc/flow/task_pool.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,8 +2118,7 @@ def set_prereqs_and_outputs(
21182118
point, tdef, flow_nums,
21192119
flow_wait=flow_wait, transient=True
21202120
)
2121-
if trans is not None:
2122-
self._set_outputs_itask(trans, outputs)
2121+
if trans and self._set_outputs_itask(trans, outputs):
21232122
no_op = False
21242123

21252124
if not no_op:
@@ -2189,15 +2188,18 @@ def _set_outputs_itask(
21892188
self,
21902189
itask: 'TaskProxy',
21912190
outputs: Iterable[str],
2192-
) -> None:
2191+
) -> bool:
21932192
"""Set requested outputs on a task proxy and spawn children.
21942193
21952194
If no outputs were specified and the task has no required outputs to
21962195
set, set the "success pathway" outputs in the same way that skip mode
21972196
does.
21982197
21992198
Designated flows should already be merged to the task proxy.
2199+
2200+
Returns True if any outputs were set, else False.
22002201
"""
2202+
no_op = True
22012203
outputs = set(outputs)
22022204

22032205
if not outputs:
@@ -2227,18 +2229,24 @@ def _set_outputs_itask(
22272229
LOG.info(f"output {itask.identity}:{output} completed already")
22282230
continue
22292231
self.task_events_mgr.process_message(
2230-
itask, logging.INFO, output, forced=True)
2232+
itask, logging.INFO, output, forced=True
2233+
)
2234+
no_op = False
22312235

22322236
if not itask.state(TASK_STATUS_WAITING):
22332237
# Can't be runahead limited or queued.
22342238
itask.state_reset(is_runahead=False, is_queued=False)
22352239
self.task_queue_mgr.remove_task(itask)
22362240

2241+
if no_op:
2242+
return False
2243+
22372244
self.data_store_mgr.delta_task_state(itask)
22382245
self.data_store_mgr.delta_task_outputs(itask)
22392246
self.workflow_db_mgr.put_update_task_state(itask)
22402247
self.workflow_db_mgr.put_update_task_outputs(itask)
22412248
self.workflow_db_mgr.process_queued_ops()
2249+
return True
22422250

22432251
def _set_prereqs_itask(
22442252
self,

tests/integration/scripts/test_set.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,46 @@ def assert_job_failed(itask: TaskProxy):
369369
assert db_task_states(foo) == [(TASK_STATUS_SUCCEEDED,)]
370370
# But job is still failed:
371371
assert_job_failed(foo)
372+
373+
374+
async def test_set_already_succeeded(
375+
flow, scheduler, run, complete, db_select
376+
):
377+
"""Doing `cylc set` on a task that has already succeeded should not
378+
change anything."""
379+
schd: Scheduler = scheduler(
380+
flow('foo => bar'),
381+
paused_start=False,
382+
)
383+
384+
def db_task_states(itask: TaskProxy):
385+
return db_select(
386+
schd,
387+
True,
388+
'task_states',
389+
'submit_num',
390+
'status',
391+
'time_updated',
392+
name=itask.tdef.name,
393+
)
394+
395+
def data_store_task_state(itask: TaskProxy):
396+
return schd.data_store_mgr.data[schd.tokens.id][TASK_PROXIES][
397+
itask.tokens.id
398+
].state
399+
400+
async with run(schd):
401+
foo = schd.pool.get_tasks()[0]
402+
await complete(schd, foo.identity)
403+
time_updated = db_task_states(foo)[0][2]
404+
expected = [(1, TASK_STATUS_SUCCEEDED, time_updated)]
405+
assert db_task_states(foo) == expected
406+
assert data_store_task_state(foo) == TASK_STATUS_SUCCEEDED
407+
408+
await run_cmd(
409+
set_prereqs_and_outputs(schd, [foo.identity], [])
410+
)
411+
assert foo.state(TASK_STATUS_SUCCEEDED)
412+
await schd.update_data_structure()
413+
assert db_task_states(foo) == expected
414+
assert data_store_task_state(foo) == TASK_STATUS_SUCCEEDED

0 commit comments

Comments
 (0)