Skip to content

Commit 8f6891b

Browse files
committed
Fix job ID missing from DB/data store for submit-failed jobs
1 parent 78605c1 commit 8f6891b

File tree

6 files changed

+65
-37
lines changed

6 files changed

+65
-37
lines changed

cylc/flow/data_store_mgr.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
Set,
6868
TYPE_CHECKING,
6969
Tuple,
70-
Union,
7170
)
7271
import zlib
7372

@@ -1612,37 +1611,27 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):
16121611

16131612
def insert_job(
16141613
self,
1615-
name: str,
1616-
cycle_point: Union['PointBase', str],
1614+
itask: 'TaskProxy',
16171615
status: str,
16181616
job_conf: dict,
1619-
):
1617+
) -> None:
16201618
"""Insert job into data-store.
16211619
16221620
Args:
1623-
name: Corresponding task name.
1624-
cycle_point: Cycle point string
16251621
status: The task's state.
16261622
job_conf:
16271623
Dictionary of job configuration used to generate
16281624
the job script.
16291625
(see TaskJobManager._prep_submit_task_job_impl)
16301626
1631-
Returns:
1632-
1633-
None
1634-
16351627
"""
16361628
if status not in JOB_STATUS_SET:
16371629
# Ignore task-only states e.g. preparing
16381630
# https://github.com/cylc/cylc-flow/issues/4994
16391631
return
16401632

16411633
sub_num = job_conf['submit_num']
1642-
tp_tokens = self.id_.duplicate(
1643-
cycle=str(cycle_point),
1644-
task=name,
1645-
)
1634+
tp_tokens = self.id_.duplicate(itask.tokens)
16461635
tproxy: Optional[PbTaskProxy]
16471636
tp_id, tproxy = self.store_node_fetcher(tp_tokens)
16481637
if not tproxy:
@@ -1665,6 +1654,7 @@ def insert_job(
16651654
execution_time_limit=job_conf.get('execution_time_limit'),
16661655
platform=job_conf['platform']['name'],
16671656
job_runner_name=job_conf.get('job_runner_name'),
1657+
job_id=itask.summary.get('submit_method_id'),
16681658
)
16691659
# Not all fields are populated with some submit-failures,
16701660
# so use task cfg as base.

cylc/flow/task_events_mgr.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -794,13 +794,7 @@ def process_message(
794794
self._process_message_submitted(itask, event_time, forced)
795795
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED, forced)
796796

797-
# ... but either way update the job ID in the job proxy (it only
798-
# comes in via the submission message).
799-
if itask.run_mode != RunMode.SIMULATION:
800-
self.data_store_mgr.delta_job_attr(
801-
itask, 'job_id', itask.summary['submit_method_id']
802-
)
803-
else:
797+
if itask.run_mode == RunMode.SIMULATION:
804798
# In simulation mode submitted implies started:
805799
self.spawn_children(itask, TASK_OUTPUT_STARTED, forced)
806800

@@ -1465,7 +1459,6 @@ def _process_message_submit_failed(
14651459
"time_submit_exit": event_time,
14661460
"submit_status": 1,
14671461
})
1468-
itask.summary['submit_method_id'] = None
14691462
LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
14701463
if (
14711464
forced
@@ -1590,11 +1583,9 @@ def _insert_task_job(
15901583
except IndexError:
15911584
# we do not have access to the job config (e.g. Scheduler
15921585
# crashed) - https://github.com/cylc/cylc-flow/pull/6326
1593-
job_id = itask.tokens.duplicate(
1594-
job=itask.submit_num
1595-
).relative_id
15961586
LOG.warning(
1597-
f'Could not find the job configuration for "{job_id}".'
1587+
'Could not find the job configuration for '
1588+
f'"{itask.job_tokens.relative_id}".'
15981589
)
15991590
itask.jobs.append({"submit_num": itask.submit_num})
16001591
job_conf = itask.jobs[-1]
@@ -1611,8 +1602,7 @@ def _insert_task_job(
16111602

16121603
# insert job into data store
16131604
self.data_store_mgr.insert_job(
1614-
itask.tdef.name,
1615-
itask.point,
1605+
itask,
16161606
job_status,
16171607
{
16181608
**job_conf,
@@ -1635,7 +1625,7 @@ def _insert_task_job(
16351625
# preparation started due to intelligent host (and or
16361626
# platform) selection
16371627
'platform_name': itask.platform['name'],
1638-
}
1628+
},
16391629
)
16401630

16411631
def _setup_job_logs_retrieval(self, itask, event) -> None:

tests/integration/network/test_graphql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,12 @@ async def test_edges(harness):
366366

367367

368368
async def test_jobs(harness):
369+
schd: Scheduler
369370
schd, client, w_tokens = harness
370371

371372
# add a job
372-
schd.data_store_mgr.insert_job('a', '1', 'submitted', job_config(schd))
373+
itask = schd.pool._get_task_by_id('1/a')
374+
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
373375
schd.data_store_mgr.update_data_structure()
374376
j_tokens = w_tokens.duplicate(
375377
cycle='1',

tests/integration/test_data_store_mgr.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,18 @@ async def test_delta_task_held(mod_harness):
316316

317317
def test_insert_job(mod_harness):
318318
"""Test method that adds a new job to the store."""
319+
schd: Scheduler
319320
schd, data = mod_harness
320321
assert len(schd.data_store_mgr.added[JOBS]) == 0
321-
schd.data_store_mgr.insert_job('foo', '1', 'submitted', job_config(schd))
322+
itask = schd.pool.get_tasks()[0]
323+
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
322324
assert len(schd.data_store_mgr.added[JOBS]) == 1
323325
assert ext_id(schd) in schd.data_store_mgr.added[JOBS]
324326

325327

326328
def test_insert_db_job(mod_harness, job_db_row):
327329
"""Test method that adds a new job from the db to the store."""
330+
schd: Scheduler
328331
schd, data = mod_harness
329332
assert len(schd.data_store_mgr.added[JOBS]) == 1
330333
schd.data_store_mgr.insert_db_job(0, job_db_row)
@@ -334,6 +337,7 @@ def test_insert_db_job(mod_harness, job_db_row):
334337

335338
def test_delta_job_msg(mod_harness):
336339
"""Test method adding messages to job element."""
340+
schd: Scheduler
337341
schd, data = mod_harness
338342
j_id = ext_id(schd)
339343
tokens = Tokens(j_id)
@@ -724,7 +728,7 @@ def _patch_remove(*args, **kwargs):
724728
assert itask
725729
itask.submit_num += 1
726730
one.data_store_mgr.insert_job(
727-
itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1}
731+
itask, itask.state.status, {'submit_num': 1}
728732
)
729733
await one.update_data_structure()
730734

tests/integration/test_task_events_mgr.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
EventKey,
3232
TaskJobLogsRetrieveContext,
3333
)
34+
from cylc.flow.task_state import (
35+
TASK_STATUS_PREPARING,
36+
TASK_STATUS_SUBMIT_FAILED,
37+
)
3438

3539
from .test_workflow_events import TEMPLATES
3640

@@ -126,7 +130,7 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
126130

127131

128132
async def test__always_insert_task_job(
129-
flow, scheduler, mock_glbl_cfg, start, run
133+
flow, scheduler, mock_glbl_cfg, start
130134
):
131135
"""Insert Task Job _Always_ inserts a task into the data store.
132136
@@ -189,6 +193,46 @@ async def test__always_insert_task_job(
189193
assert job.submitted_time
190194

191195

196+
async def test__submit_failed_job_id(flow, scheduler, start, db_select):
197+
"""If a job is killed in the submitted state, the job ID should still be
198+
in the DB/data store.
199+
200+
See https://github.com/cylc/cylc-flow/pull/6926
201+
"""
202+
async def get_ds_job_id(schd: Scheduler):
203+
await schd.update_data_structure()
204+
return list(schd.data_store_mgr.data[schd.id][JOBS].values())[0].job_id
205+
206+
id_ = flow('foo')
207+
schd: Scheduler = scheduler(id_)
208+
job_id = '1234'
209+
async with start(schd):
210+
itask = schd.pool.get_tasks()[0]
211+
itask.state_reset(TASK_STATUS_PREPARING)
212+
itask.submit_num = 1
213+
itask.summary['submit_method_id'] = job_id
214+
schd.workflow_db_mgr.put_insert_task_jobs(itask, {})
215+
schd.task_events_mgr.process_message(
216+
itask, 'INFO', schd.task_events_mgr.EVENT_SUBMITTED
217+
)
218+
assert await get_ds_job_id(schd) == job_id
219+
220+
schd.task_events_mgr.process_message(
221+
itask, 'CRITICAL', schd.task_events_mgr.EVENT_SUBMIT_FAILED
222+
)
223+
assert itask.state(TASK_STATUS_SUBMIT_FAILED)
224+
assert await get_ds_job_id(schd) == job_id
225+
226+
assert db_select(schd, False, 'task_jobs', 'job_id', 'submit_status') == [
227+
(job_id, 1)
228+
]
229+
230+
# Restart and check data store again:
231+
schd = scheduler(id_)
232+
async with start(schd):
233+
assert await get_ds_job_id(schd) == job_id
234+
235+
192236
async def test__process_message_failed_with_retry(one, start, log_filter):
193237
"""Log job failure, even if a retry is scheduled.
194238

tests/integration/tui/test_logs.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ async def workflow(
149149
# mark 1/a/01 as failed
150150
job_1 = schd.tokens.duplicate(cycle='1', task='a', job='01')
151151
schd.data_store_mgr.insert_job(
152-
'a',
153-
IntegerPoint('1'),
152+
itask,
154153
TASK_STATUS_SUCCEEDED,
155154
{'submit_num': 1, 'platform': {'name': 'x'}}
156155
)
@@ -160,8 +159,7 @@ async def workflow(
160159
itask.submit_num = 2
161160
job_2 = schd.tokens.duplicate(cycle='1', task='a', job='02')
162161
schd.data_store_mgr.insert_job(
163-
'a',
164-
IntegerPoint('1'),
162+
itask,
165163
TASK_STATUS_SUCCEEDED,
166164
{'submit_num': 2, 'platform': {'name': 'x'}}
167165
)

0 commit comments

Comments
 (0)