Skip to content

Commit 40670d6

Browse files
authored
Fix data store bug where late arrival of submitted message would cause backwards state change (#6914)
Fix data store bug where late arrival of submitted message would cause backwards state change
1 parent 487cf11 commit 40670d6

File tree

7 files changed

+122
-91
lines changed

7 files changed

+122
-91
lines changed

changes.d/6914.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug where jobs in the UI could regress to an earlier state.

cylc/flow/data_store_mgr.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
Set,
6868
TYPE_CHECKING,
6969
Tuple,
70+
Union,
7071
)
7172
import zlib
7273

@@ -1609,13 +1610,20 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):
16091610
poverride(rtconfig, overrides, prepend=True)
16101611
return rtconfig
16111612

1612-
def insert_job(self, name, cycle_point, status, job_conf):
1613+
def insert_job(
1614+
self,
1615+
name: str,
1616+
cycle_point: Union['PointBase', str],
1617+
status: str,
1618+
job_conf: dict,
1619+
):
16131620
"""Insert job into data-store.
16141621
16151622
Args:
1616-
name (str): Corresponding task name.
1617-
cycle_point (str|PointBase): Cycle point string
1618-
job_conf (dic):
1623+
name: Corresponding task name.
1624+
cycle_point: Cycle point string
1625+
status: The task's state.
1626+
job_conf:
16191627
Dictionary of job configuration used to generate
16201628
the job script.
16211629
(see TaskJobManager._prep_submit_task_job_impl)
@@ -1625,6 +1633,11 @@ def insert_job(self, name, cycle_point, status, job_conf):
16251633
None
16261634
16271635
"""
1636+
if status not in JOB_STATUS_SET:
1637+
# Ignore task-only states e.g. preparing
1638+
# https://github.com/cylc/cylc-flow/issues/4994
1639+
return
1640+
16281641
sub_num = job_conf['submit_num']
16291642
tp_tokens = self.id_.duplicate(
16301643
cycle=str(cycle_point),
@@ -1641,9 +1654,6 @@ def insert_job(self, name, cycle_point, status, job_conf):
16411654
# Job already exists (i.e. post-submission submit failure)
16421655
return
16431656

1644-
if status not in JOB_STATUS_SET:
1645-
return
1646-
16471657
j_buf = PbJob(
16481658
stamp=f'{j_id}@{update_time}',
16491659
id=j_id,
@@ -2761,12 +2771,12 @@ def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
27612771

27622772
def delta_job_attr(
27632773
self,
2764-
tokens: Tokens,
2774+
itask: 'TaskProxy',
27652775
attr_key: str,
27662776
attr_val: Any,
27672777
) -> None:
27682778
"""Set job attribute."""
2769-
j_id, job = self.store_node_fetcher(tokens)
2779+
j_id, job = self.store_node_fetcher(itask.job_tokens)
27702780
if not job:
27712781
return
27722782
j_delta = PbJob(stamp=f'{j_id}@{time()}')
@@ -2779,12 +2789,19 @@ def delta_job_attr(
27792789

27802790
def delta_job_state(
27812791
self,
2782-
tokens: Tokens,
2792+
itask: 'TaskProxy',
27832793
status: str,
27842794
) -> None:
27852795
"""Set job state."""
2786-
j_id, job = self.store_node_fetcher(tokens)
2787-
if not job or status not in JOB_STATUS_SET:
2796+
if status not in JOB_STATUS_SET:
2797+
# Ignore task-only states e.g. preparing
2798+
return
2799+
j_id, job = self.store_node_fetcher(itask.job_tokens)
2800+
if not job or (
2801+
# Don't cause backwards state change:
2802+
JOB_STATUSES_ALL.index(status) <= JOB_STATUSES_ALL.index(job.state)
2803+
and not itask.job_vacated
2804+
):
27882805
return
27892806
j_delta = PbJob(
27902807
stamp=f'{j_id}@{time()}',
@@ -2798,15 +2815,15 @@ def delta_job_state(
27982815

27992816
def delta_job_time(
28002817
self,
2801-
tokens: Tokens,
2818+
itask: 'TaskProxy',
28022819
event_key: str,
28032820
time_str: Optional[str] = None,
28042821
) -> None:
28052822
"""Set an event time in job pool object.
28062823
28072824
Set values of both event_key + '_time' and event_key + '_time_string'.
28082825
"""
2809-
j_id, job = self.store_node_fetcher(tokens)
2826+
j_id, job = self.store_node_fetcher(itask.job_tokens)
28102827
if not job:
28112828
return
28122829
j_delta = PbJob(stamp=f'{j_id}@{time()}')

cylc/flow/task_events_mgr.py

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -797,11 +797,9 @@ def process_message(
797797
# ... but either way update the job ID in the job proxy (it only
798798
# comes in via the submission message).
799799
if itask.run_mode != RunMode.SIMULATION:
800-
job_tokens = itask.tokens.duplicate(
801-
job=str(itask.submit_num)
802-
)
803800
self.data_store_mgr.delta_job_attr(
804-
job_tokens, 'job_id', itask.summary['submit_method_id'])
801+
itask, 'job_id', itask.summary['submit_method_id']
802+
)
805803
else:
806804
# In simulation mode submitted implies started:
807805
self.spawn_children(itask, TASK_OUTPUT_STARTED, forced)
@@ -1354,9 +1352,8 @@ def _process_message_failed(
13541352
if event_time is None:
13551353
event_time = get_current_time_string()
13561354
itask.set_summary_time('finished', event_time)
1357-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1358-
self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time)
1359-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_FAILED)
1355+
self.data_store_mgr.delta_job_time(itask, 'finished', event_time)
1356+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_FAILED)
13601357
self.workflow_db_mgr.put_update_task_jobs(itask, {
13611358
"run_status": 1,
13621359
"time_run_exit": event_time,
@@ -1394,12 +1391,11 @@ def _process_message_failed(
13941391

13951392
def _process_message_started(self, itask, event_time, forced):
13961393
"""Helper for process_message, handle a started message."""
1394+
self.data_store_mgr.delta_job_time(itask, 'started', event_time)
1395+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_RUNNING)
13971396
if itask.job_vacated:
13981397
itask.job_vacated = False
13991398
LOG.info(f"[{itask}] Vacated job restarted")
1400-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1401-
self.data_store_mgr.delta_job_time(job_tokens, 'started', event_time)
1402-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_RUNNING)
14031399
itask.set_summary_time('started', event_time)
14041400
self.workflow_db_mgr.put_update_task_jobs(itask, {
14051401
"time_run": itask.summary['started_time_string']})
@@ -1430,9 +1426,8 @@ def _process_message_succeeded(self, itask, event_time, forced):
14301426
Ignore forced.
14311427
"""
14321428

1433-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1434-
self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time)
1435-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_SUCCEEDED)
1429+
self.data_store_mgr.delta_job_time(itask, 'finished', event_time)
1430+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUCCEEDED)
14361431
itask.set_summary_time('finished', event_time)
14371432
self.workflow_db_mgr.put_update_task_jobs(itask, {
14381433
"run_status": 0,
@@ -1505,13 +1500,9 @@ def _process_message_submit_failed(
15051500
self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg)
15061501

15071502
# Register newly submit-failed job with the database and datastore.
1508-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
15091503
self._insert_task_job(
15101504
itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced)
1511-
self.data_store_mgr.delta_job_state(
1512-
job_tokens,
1513-
TASK_STATUS_SUBMIT_FAILED
1514-
)
1505+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMIT_FAILED)
15151506
self._reset_job_timers(itask)
15161507

15171508
return no_retries
@@ -1560,24 +1551,12 @@ def _process_message_submitted(
15601551
# Do after itask has changed state
15611552
self._insert_task_job(
15621553
itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced)
1563-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1564-
self.data_store_mgr.delta_job_time(
1565-
job_tokens,
1566-
'submitted',
1567-
event_time,
1568-
)
1554+
self.data_store_mgr.delta_job_time(itask, 'submitted', event_time)
15691555
if itask.run_mode == RunMode.SIMULATION:
15701556
# Simulate job started as well.
1571-
self.data_store_mgr.delta_job_time(
1572-
job_tokens,
1573-
'started',
1574-
event_time,
1575-
)
1557+
self.data_store_mgr.delta_job_time(itask, 'started', event_time)
15761558
else:
1577-
self.data_store_mgr.delta_job_state(
1578-
job_tokens,
1579-
TASK_STATUS_SUBMITTED,
1580-
)
1559+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMITTED)
15811560

15821561
def _insert_task_job(
15831562
self,

cylc/flow/task_proxy.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,11 @@ def __init__(
315315
)
316316
)
317317

318+
@property
319+
def job_tokens(self) -> 'Tokens':
320+
"""Return the job tokens for this task proxy."""
321+
return self.tokens.duplicate(job=str(self.submit_num))
322+
318323
def __repr__(self) -> str:
319324
return f"<{type(self).__name__} {self.identity} {self.state}>"
320325

0 commit comments

Comments
 (0)