Skip to content

Commit 46e44eb

Browse files
authored
Merge pull request #6842 from MetRonnie/cylc-set
Fix `cylc set` causing erroneous state changes
2 parents 125c48f + 5ad213a commit 46e44eb

25 files changed

+686
-405
lines changed

changes.d/6842.fix.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed two `cylc set` bugs:
2+
- It was affecting job states when it should only affect _task_ states.
3+
- In some cases it was causing a task to go back into the waiting state.

cylc/flow/data_store_mgr.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
from cylc.flow.flow_mgr import FlowNums
135135
from cylc.flow.prerequisite import Prerequisite
136136
from cylc.flow.scheduler import Scheduler
137+
from cylc.flow.taskdef import TaskDef
137138

138139
EDGES = 'edges'
139140
FAMILIES = 'families'
@@ -256,7 +257,7 @@ def generate_checksum(in_strings):
256257
return zlib.adler32(''.join(sorted(in_strings)).encode()) & 0xffffffff
257258

258259

259-
def task_mean_elapsed_time(tdef):
260+
def task_mean_elapsed_time(tdef: 'TaskDef') -> float | None:
260261
"""Calculate task mean elapsed time."""
261262
if tdef.elapsed_times:
262263
return round(sum(tdef.elapsed_times) / len(tdef.elapsed_times))
@@ -2739,11 +2740,14 @@ def delta_from_task_proxy(self, itask: TaskProxy) -> None:
27392740
# -----------
27402741
# Job Deltas
27412742
# -----------
2742-
def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
2743-
"""Add message to job."""
2743+
def delta_job_msg(self, tokens: Tokens, msg: str) -> bool:
2744+
"""Add message to job.
2745+
2746+
Returns False if the job was not found in the data store.
2747+
"""
27442748
j_id, job = self.store_node_fetcher(tokens)
27452749
if not job:
2746-
return
2750+
return False
27472751
j_delta = self.updated[JOBS].setdefault(
27482752
j_id,
27492753
PbJob(id=j_id)
@@ -2756,6 +2760,7 @@ def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
27562760
j_delta.messages[:] = job.messages
27572761
j_delta.messages.append(msg)
27582762
self.updates_pending = True
2763+
return True
27592764

27602765
def delta_job_attr(
27612766
self,

cylc/flow/id.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import re
2424
from typing import (
2525
TYPE_CHECKING,
26+
Any,
2627
Iterable,
2728
List,
2829
Literal,
@@ -365,9 +366,22 @@ def is_null(self) -> bool:
365366
self[key] for key in self._REGULAR_KEYS
366367
)
367368

369+
@property
370+
def submit_num(self) -> int | None:
371+
"""The job submit number as an integer, or None if not set.
372+
373+
Examples:
374+
>>> Tokens('//c/t/01').submit_num
375+
1
376+
>>> Tokens('//c/t').submit_num is None
377+
True
378+
"""
379+
return int(self['job']) if self['job'] else None
380+
368381
@overload
369382
def duplicate(
370383
self,
384+
*,
371385
cycle: str,
372386
task: str,
373387
**kwargs,
@@ -377,13 +391,14 @@ def duplicate(
377391
@overload
378392
def duplicate(
379393
self,
394+
*tokens_list: 'Tokens',
380395
**kwargs,
381396
) -> 'Tokens':
382397
...
383398

384399
def duplicate(
385400
self,
386-
*tokens_list,
401+
*tokens_list: 'Tokens',
387402
**kwargs,
388403
) -> 'Tokens':
389404
"""Duplicate a tokens object.
@@ -418,7 +433,7 @@ def duplicate(
418433
'~u/w//c/b/01'
419434
420435
"""
421-
_kwargs = {}
436+
_kwargs: dict[str, Any] = {}
422437
for tokens in (self, *tokens_list):
423438
_kwargs.update(tokens)
424439
_kwargs.update(kwargs)

cylc/flow/network/resolvers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
Optional,
3333
Tuple,
3434
TYPE_CHECKING,
35-
Union,
3635
cast,
3736
)
3837
from uuid import uuid4
@@ -69,9 +68,9 @@
6968

7069
class TaskMsg(NamedTuple):
7170
"""Tuple for Scheduler.message_queue"""
72-
job_id: Union[Tokens, str]
71+
job_id: Tokens
7372
event_time: str
74-
severity: Union[str, int]
73+
severity: str | int
7574
message: str
7675

7776

@@ -839,7 +838,12 @@ def put_messages(
839838
"""
840839
for severity, message in messages:
841840
self.schd.message_queue.put(
842-
TaskMsg(task_job, event_time, severity, message)
841+
TaskMsg(
842+
Tokens(task_job, relative=True),
843+
event_time,
844+
severity,
845+
message,
846+
)
843847
)
844848
return (True, f'Messages queued: {len(messages)}')
845849

cylc/flow/run_modes/simulation.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,16 @@
3737
from cylc.flow.exceptions import PointParsingError
3838
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
3939
from cylc.flow.run_modes import RunMode
40-
from cylc.flow.task_outputs import TASK_OUTPUT_SUBMITTED
40+
from cylc.flow.task_outputs import (
41+
TASK_OUTPUT_STARTED,
42+
TASK_OUTPUT_SUBMITTED,
43+
)
4144
from cylc.flow.task_state import (
4245
TASK_STATUS_FAILED,
4346
TASK_STATUS_RUNNING,
4447
TASK_STATUS_SUCCEEDED,
4548
)
49+
from cylc.flow.util import serialise_set
4650
from cylc.flow.wallclock import get_unix_time_from_time_string
4751

4852

@@ -91,22 +95,20 @@ def submit_task_job(
9195
itask.jobs.append(
9296
task_job_mgr.get_simulation_job_conf(itask)
9397
)
94-
task_job_mgr.task_events_mgr.process_message(
95-
itask, INFO, TASK_OUTPUT_SUBMITTED,
96-
)
98+
for output in (TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED):
99+
task_job_mgr.task_events_mgr.process_message(itask, INFO, output)
97100
task_job_mgr.workflow_db_mgr.put_insert_task_jobs(
98101
itask, {
99102
'time_submit': now[1],
100103
'time_run': now[1],
101104
'try_num': itask.get_try_num(),
102-
'flow_nums': str(list(itask.flow_nums)),
105+
'flow_nums': serialise_set(itask.flow_nums),
103106
'is_manual_submit': itask.is_manual_submit,
104107
'job_runner_name': RunMode.SIMULATION.value,
105108
'platform_name': RunMode.SIMULATION.value,
106109
'submit_status': 0 # Submission has succeeded
107110
}
108111
)
109-
itask.state.status = TASK_STATUS_RUNNING
110112
return True
111113

112114

cylc/flow/run_modes/skip.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
TASK_OUTPUT_SUBMITTED,
3636
TASK_OUTPUT_SUCCEEDED,
3737
)
38+
from cylc.flow.util import serialise_set
3839

3940

4041
if TYPE_CHECKING:
@@ -78,7 +79,7 @@ def submit_task_job(
7879
itask, {
7980
'time_submit': now[1],
8081
'try_num': itask.get_try_num(),
81-
'flow_nums': str(list(itask.flow_nums)),
82+
'flow_nums': serialise_set(itask.flow_nums),
8283
'is_manual_submit': itask.is_manual_submit,
8384
'job_runner_name': RunMode.SKIP.value,
8485
'platform_name': RunMode.SKIP.value,

cylc/flow/rundb.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -714,11 +714,12 @@ def select_task_action_timers(self, callback):
714714
for row_idx, row in enumerate(self.connect().execute(stmt)):
715715
callback(row_idx, list(row))
716716

717-
def select_task_job(self, cycle, name, submit_num=None):
717+
def select_task_job(
718+
self, cycle: str, name: str, submit_num: str | int | None = None
719+
) -> dict[str, Any] | None:
718720
"""Select items from task_jobs by (cycle, name, submit_num).
719721
720722
:return: a dict for mapping keys to the column values
721-
:rtype: dict
722723
"""
723724
keys = []
724725
for column in self.tables[self.TABLE_TASK_JOBS].columns[3:]:
@@ -737,7 +738,7 @@ def select_task_job(self, cycle, name, submit_num=None):
737738
''' # nosec B608
738739
# * table name is code constant
739740
# * keys are code constants
740-
stmt_args = [cycle, name]
741+
stmt_args: list[Any] = [cycle, name]
741742
else:
742743
stmt = rf'''
743744
SELECT
@@ -752,14 +753,13 @@ def select_task_job(self, cycle, name, submit_num=None):
752753
# * table name is code constant
753754
# * keys are code constants
754755
stmt_args = [cycle, name, submit_num]
755-
try:
756+
with suppress(sqlite3.DatabaseError):
756757
for row in self.connect().execute(stmt, stmt_args):
757758
ret = {}
758759
for key, value in zip(keys, row):
759760
ret[key] = value
760761
return ret
761-
except sqlite3.DatabaseError:
762-
return None
762+
return None
763763

764764
def select_jobs_for_restart(self, callback):
765765
"""Select from task_pool+task_states+task_jobs for restart.

cylc/flow/scheduler.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ def process_queued_task_messages(self) -> None:
965965
"""Process incoming task messages for each task proxy.
966966
967967
"""
968-
messages: 'Dict[str, List[Tuple[Optional[int], TaskMsg]]]' = {}
968+
messages: dict[str, list[TaskMsg]] = {}
969969

970970
# Retrieve queued messages
971971
while self.message_queue.qsize():
@@ -974,28 +974,23 @@ def process_queued_task_messages(self) -> None:
974974
except Empty:
975975
break
976976
self.message_queue.task_done()
977-
tokens = Tokens(task_msg.job_id, relative=True)
978977
# task ID (job stripped)
979-
task_id = tokens.duplicate(job=None).relative_id
980-
messages.setdefault(task_id, [])
981-
# job may be None (e.g. simulation mode)
982-
job = int(tokens['job']) if tokens['job'] else None
983-
messages[task_id].append(
984-
(job, task_msg)
985-
)
978+
task_id = task_msg.job_id.duplicate(job=None).relative_id
979+
messages.setdefault(task_id, []).append(task_msg)
986980

981+
unprocessed_messages: List[TaskMsg] = []
987982
# Poll tasks for which messages caused a backward state change.
988-
to_poll_tasks = []
989-
for itask in self.pool.get_tasks():
990-
message_items = messages.get(itask.identity)
991-
if message_items is None:
983+
to_poll_tasks: List[TaskProxy] = []
984+
for task_id, message_items in messages.items():
985+
itask = self.pool._get_task_by_id(task_id)
986+
if itask is None:
987+
unprocessed_messages.extend(message_items)
992988
continue
993989
should_poll = False
994-
del messages[itask.identity]
995-
for submit_num, tm in message_items:
990+
for tm in message_items:
996991
if self.task_events_mgr.process_message(
997992
itask, tm.severity, tm.message, tm.event_time,
998-
self.task_events_mgr.FLAG_RECEIVED, submit_num
993+
self.task_events_mgr.FLAG_RECEIVED, tm.job_id.submit_num
999994
):
1000995
should_poll = True
1001996
if should_poll:
@@ -1006,11 +1001,18 @@ def process_queued_task_messages(self) -> None:
10061001
# Remaining unprocessed messages have no corresponding task proxy.
10071002
# For example, if I manually set a running task to succeeded, the
10081003
# proxy can be removed, but the orphaned job still sends messages.
1009-
for tms in messages.values():
1010-
warn = "Undeliverable task messages received and ignored:"
1011-
for _, msg in tms:
1012-
warn += f'\n {msg.job_id}: {msg.severity} - "{msg.message}"'
1013-
LOG.warning(warn)
1004+
warn = ""
1005+
for tm in unprocessed_messages:
1006+
job_tokens = self.tokens.duplicate(tm.job_id)
1007+
tdef = self.config.get_taskdef(job_tokens['task'])
1008+
if not self.task_events_mgr.process_job_message(
1009+
job_tokens, tdef, tm.message, tm.event_time
1010+
):
1011+
warn += f'\n {tm.job_id}: {tm.severity} - "{tm.message}"'
1012+
if warn:
1013+
LOG.warning(
1014+
f"Undeliverable task messages received and ignored:{warn}"
1015+
)
10141016

10151017
async def process_command_queue(self) -> None:
10161018
"""Process queued commands."""

0 commit comments

Comments
 (0)