Skip to content

Commit e770f48

Browse files
authored
Merge pull request #6926 from MetRonnie/submit-time
Fix data store missing info for submit-failed jobs
2 parents 33c5035 + 226850f commit e770f48

File tree

8 files changed

+103
-57
lines changed

8 files changed

+103
-57
lines changed

changes.d/6926.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed info missing from UI for submit-failed tasks.

cylc/flow/data_store_mgr.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
Set,
6868
TYPE_CHECKING,
6969
Tuple,
70-
Union,
7170
)
7271
import zlib
7372

@@ -1612,37 +1611,27 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):
16121611

16131612
def insert_job(
16141613
self,
1615-
name: str,
1616-
cycle_point: Union['PointBase', str],
1614+
itask: 'TaskProxy',
16171615
status: str,
16181616
job_conf: dict,
1619-
):
1617+
) -> None:
16201618
"""Insert job into data-store.
16211619
16221620
Args:
1623-
name: Corresponding task name.
1624-
cycle_point: Cycle point string
16251621
status: The task's state.
16261622
job_conf:
16271623
Dictionary of job configuration used to generate
16281624
the job script.
16291625
(see TaskJobManager._prep_submit_task_job_impl)
16301626
1631-
Returns:
1632-
1633-
None
1634-
16351627
"""
16361628
if status not in JOB_STATUS_SET:
16371629
# Ignore task-only states e.g. preparing
16381630
# https://github.com/cylc/cylc-flow/issues/4994
16391631
return
16401632

16411633
sub_num = job_conf['submit_num']
1642-
tp_tokens = self.id_.duplicate(
1643-
cycle=str(cycle_point),
1644-
task=name,
1645-
)
1634+
tp_tokens = self.id_.duplicate(itask.tokens)
16461635
tproxy: Optional[PbTaskProxy]
16471636
tp_id, tproxy = self.store_node_fetcher(tp_tokens)
16481637
if not tproxy:
@@ -1665,6 +1654,7 @@ def insert_job(
16651654
execution_time_limit=job_conf.get('execution_time_limit'),
16661655
platform=job_conf['platform']['name'],
16671656
job_runner_name=job_conf.get('job_runner_name'),
1657+
job_id=itask.summary.get('submit_method_id'),
16681658
)
16691659
# Not all fields are populated with some submit-failures,
16701660
# so use task cfg as base.

cylc/flow/task_events_mgr.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -794,13 +794,7 @@ def process_message(
794794
self._process_message_submitted(itask, event_time, forced)
795795
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED, forced)
796796

797-
# ... but either way update the job ID in the job proxy (it only
798-
# comes in via the submission message).
799-
if itask.run_mode != RunMode.SIMULATION:
800-
self.data_store_mgr.delta_job_attr(
801-
itask, 'job_id', itask.summary['submit_method_id']
802-
)
803-
else:
797+
if itask.run_mode == RunMode.SIMULATION:
804798
# In simulation mode submitted implies started:
805799
self.spawn_children(itask, TASK_OUTPUT_STARTED, forced)
806800

@@ -1465,7 +1459,6 @@ def _process_message_submit_failed(
14651459
"time_submit_exit": event_time,
14661460
"submit_status": 1,
14671461
})
1468-
itask.summary['submit_method_id'] = None
14691462
LOG.error(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
14701463
if (
14711464
forced
@@ -1503,6 +1496,7 @@ def _process_message_submit_failed(
15031496
self._insert_task_job(
15041497
itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced)
15051498
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMIT_FAILED)
1499+
self.data_store_mgr.delta_job_time(itask, 'submitted', event_time)
15061500
self._reset_job_timers(itask)
15071501

15081502
return no_retries
@@ -1589,11 +1583,9 @@ def _insert_task_job(
15891583
except IndexError:
15901584
# we do not have access to the job config (e.g. Scheduler
15911585
# crashed) - https://github.com/cylc/cylc-flow/pull/6326
1592-
job_id = itask.tokens.duplicate(
1593-
job=itask.submit_num
1594-
).relative_id
15951586
LOG.warning(
1596-
f'Could not find the job configuration for "{job_id}".'
1587+
'Could not find the job configuration for '
1588+
f'"{itask.job_tokens.relative_id}".'
15971589
)
15981590
itask.jobs.append({"submit_num": itask.submit_num})
15991591
job_conf = itask.jobs[-1]
@@ -1610,8 +1602,7 @@ def _insert_task_job(
16101602

16111603
# insert job into data store
16121604
self.data_store_mgr.insert_job(
1613-
itask.tdef.name,
1614-
itask.point,
1605+
itask,
16151606
job_status,
16161607
{
16171608
**job_conf,
@@ -1634,7 +1625,7 @@ def _insert_task_job(
16341625
# preparation started due to intelligent host (and or
16351626
# platform) selection
16361627
'platform_name': itask.platform['name'],
1637-
}
1628+
},
16381629
)
16391630

16401631
def _setup_job_logs_retrieval(self, itask, event) -> None:

cylc/flow/task_job_mgr.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,12 @@ def _prep_submit_task_job(
12881288
# bad_hosts:
12891289
self.bad_hosts -= exc.hosts_consumed
12901290
self._set_retry_timers(itask, rtconfig)
1291+
# Provide dummy platform otherwise it will incorrectly show as
1292+
# the default localhost platform in the data store:
1293+
itask.platform = {
1294+
'name': rtconfig['platform'],
1295+
'job runner': '',
1296+
}
12911297
self._prep_submit_task_job_error(itask, msg, exc)
12921298
return False
12931299

@@ -1339,15 +1345,20 @@ def _prep_submit_task_job_error(
13391345
itask.is_manual_submit = False
13401346
# job failed in preparation i.e. is really preparation-failed rather
13411347
# than submit-failed
1342-
# provide a dummy job config - this info will be added to the data
1343-
# store
13441348
try_num = itask.get_try_num()
1345-
itask.jobs.append({
1346-
'task_id': itask.identity,
1347-
'platform': itask.platform,
1348-
'submit_num': itask.submit_num,
1349-
'try_num': try_num,
1350-
})
1349+
if not itask.jobs or (
1350+
itask.jobs[-1]['submit_num'] != itask.submit_num
1351+
):
1352+
# provide a dummy job config - this info will be added to the data
1353+
# store
1354+
itask.jobs.append({
1355+
'task_id': itask.identity,
1356+
'platform': itask.platform,
1357+
'job_runner_name': itask.platform['job runner'],
1358+
'submit_num': itask.submit_num,
1359+
'try_num': try_num,
1360+
'flow_nums': itask.flow_nums,
1361+
})
13511362
# create a DB entry for the submit-failed job
13521363
self.workflow_db_mgr.put_insert_task_jobs(
13531364
itask,
@@ -1417,10 +1428,10 @@ def get_execution_time_limit(
14171428

14181429
def get_job_conf(
14191430
self,
1420-
itask,
1421-
rtconfig,
1422-
job_file_path=None,
1423-
job_d=None,
1431+
itask: 'TaskProxy',
1432+
rtconfig: dict,
1433+
job_file_path: Optional[str] = None,
1434+
job_d: Optional[str] = None,
14241435
):
14251436
"""Return a job config.
14261437

tests/integration/network/test_graphql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,12 @@ async def test_edges(harness):
366366

367367

368368
async def test_jobs(harness):
369+
schd: Scheduler
369370
schd, client, w_tokens = harness
370371

371372
# add a job
372-
schd.data_store_mgr.insert_job('a', '1', 'submitted', job_config(schd))
373+
itask = schd.pool._get_task_by_id('1/a')
374+
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
373375
schd.data_store_mgr.update_data_structure()
374376
j_tokens = w_tokens.duplicate(
375377
cycle='1',

tests/integration/test_data_store_mgr.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,15 +316,18 @@ async def test_delta_task_held(mod_harness):
316316

317317
def test_insert_job(mod_harness):
318318
"""Test method that adds a new job to the store."""
319+
schd: Scheduler
319320
schd, data = mod_harness
320321
assert len(schd.data_store_mgr.added[JOBS]) == 0
321-
schd.data_store_mgr.insert_job('foo', '1', 'submitted', job_config(schd))
322+
itask = schd.pool.get_tasks()[0]
323+
schd.data_store_mgr.insert_job(itask, 'submitted', job_config(schd))
322324
assert len(schd.data_store_mgr.added[JOBS]) == 1
323325
assert ext_id(schd) in schd.data_store_mgr.added[JOBS]
324326

325327

326328
def test_insert_db_job(mod_harness, job_db_row):
327329
"""Test method that adds a new job from the db to the store."""
330+
schd: Scheduler
328331
schd, data = mod_harness
329332
assert len(schd.data_store_mgr.added[JOBS]) == 1
330333
schd.data_store_mgr.insert_db_job(0, job_db_row)
@@ -334,6 +337,7 @@ def test_insert_db_job(mod_harness, job_db_row):
334337

335338
def test_delta_job_msg(mod_harness):
336339
"""Test method adding messages to job element."""
340+
schd: Scheduler
337341
schd, data = mod_harness
338342
j_id = ext_id(schd)
339343
tokens = Tokens(j_id)
@@ -724,7 +728,7 @@ def _patch_remove(*args, **kwargs):
724728
assert itask
725729
itask.submit_num += 1
726730
one.data_store_mgr.insert_job(
727-
itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1}
731+
itask, itask.state.status, {'submit_num': 1}
728732
)
729733
await one.update_data_structure()
730734

tests/integration/test_task_events_mgr.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
EventKey,
3232
TaskJobLogsRetrieveContext,
3333
)
34+
from cylc.flow.task_state import (
35+
TASK_STATUS_PREPARING,
36+
TASK_STATUS_SUBMIT_FAILED,
37+
)
3438

3539
from .test_workflow_events import TEMPLATES
3640

@@ -126,7 +130,7 @@ async def test__insert_task_job(flow, one_conf, scheduler, start, validate):
126130

127131

128132
async def test__always_insert_task_job(
129-
flow, scheduler, mock_glbl_cfg, start, run
133+
flow, scheduler, mock_glbl_cfg, start
130134
):
131135
"""Insert Task Job _Always_ inserts a task into the data store.
132136
@@ -144,20 +148,22 @@ async def test__always_insert_task_job(
144148
[platforms]
145149
[[broken1]]
146150
hosts = no-such-host-1
151+
job runner = abc
147152
[[broken2]]
148153
hosts = no-such-host-2
154+
job runner = def
149155
[platform groups]
150-
[[broken]]
156+
[[broken_group]]
151157
platforms = broken1
152158
"""
153159
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_config)
154160

155161
id_ = flow({
156-
'scheduling': {'graph': {'R1': 'broken & broken2'}},
162+
'scheduling': {'graph': {'R1': 'foo & bar'}},
157163
'runtime': {
158164
'root': {'submission retry delays': 'PT10M'},
159-
'broken': {'platform': 'broken'},
160-
'broken2': {'platform': 'broken2'}
165+
'foo': {'platform': 'broken_group'},
166+
'bar': {'platform': 'broken2'}
161167
}
162168
})
163169

@@ -174,14 +180,57 @@ async def test__always_insert_task_job(
174180
)
175181

176182
# Both jobs are in the data store with submit-failed state:
183+
ds_jobs = schd.data_store_mgr.data[schd.id][JOBS]
177184
updates = {
178-
k.split('//')[-1]: v.state
179-
for k, v in schd.data_store_mgr.data[schd.id][JOBS].items()
185+
id_.split('//')[-1]: (job.state, job.platform, job.job_runner_name)
186+
for id_, job in ds_jobs.items()
180187
}
181188
assert updates == {
182-
'1/broken/01': 'submit-failed',
183-
'1/broken2/01': 'submit-failed'
189+
'1/foo/01': ('submit-failed', 'broken_group', ''),
190+
'1/bar/01': ('submit-failed', 'broken2', 'def'),
184191
}
192+
for job in ds_jobs.values():
193+
assert job.submitted_time
194+
195+
196+
async def test__submit_failed_job_id(flow, scheduler, start, db_select):
197+
"""If a job is killed in the submitted state, the job ID should still be
198+
in the DB/data store.
199+
200+
See https://github.com/cylc/cylc-flow/pull/6926
201+
"""
202+
async def get_ds_job_id(schd: Scheduler):
203+
await schd.update_data_structure()
204+
return list(schd.data_store_mgr.data[schd.id][JOBS].values())[0].job_id
205+
206+
id_ = flow('foo')
207+
schd: Scheduler = scheduler(id_)
208+
job_id = '1234'
209+
async with start(schd):
210+
itask = schd.pool.get_tasks()[0]
211+
itask.state_reset(TASK_STATUS_PREPARING)
212+
itask.submit_num = 1
213+
itask.summary['submit_method_id'] = job_id
214+
schd.workflow_db_mgr.put_insert_task_jobs(itask, {})
215+
schd.task_events_mgr.process_message(
216+
itask, 'INFO', schd.task_events_mgr.EVENT_SUBMITTED
217+
)
218+
assert await get_ds_job_id(schd) == job_id
219+
220+
schd.task_events_mgr.process_message(
221+
itask, 'CRITICAL', schd.task_events_mgr.EVENT_SUBMIT_FAILED
222+
)
223+
assert itask.state(TASK_STATUS_SUBMIT_FAILED)
224+
assert await get_ds_job_id(schd) == job_id
225+
226+
assert db_select(schd, False, 'task_jobs', 'job_id', 'submit_status') == [
227+
(job_id, 1)
228+
]
229+
230+
# Restart and check data store again:
231+
schd = scheduler(id_)
232+
async with start(schd):
233+
assert await get_ds_job_id(schd) == job_id
185234

186235

187236
async def test__process_message_failed_with_retry(one, start, log_filter):

tests/integration/tui/test_logs.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ async def workflow(
149149
# mark 1/a/01 as failed
150150
job_1 = schd.tokens.duplicate(cycle='1', task='a', job='01')
151151
schd.data_store_mgr.insert_job(
152-
'a',
153-
IntegerPoint('1'),
152+
itask,
154153
TASK_STATUS_SUCCEEDED,
155154
{'submit_num': 1, 'platform': {'name': 'x'}}
156155
)
@@ -160,8 +159,7 @@ async def workflow(
160159
itask.submit_num = 2
161160
job_2 = schd.tokens.duplicate(cycle='1', task='a', job='02')
162161
schd.data_store_mgr.insert_job(
163-
'a',
164-
IntegerPoint('1'),
162+
itask,
165163
TASK_STATUS_SUCCEEDED,
166164
{'submit_num': 2, 'platform': {'name': 'x'}}
167165
)

0 commit comments

Comments
 (0)