Skip to content

Commit 9f7039a

Browse files
authored
Merge pull request #6925 from cylc/8.5.x-sync
🤖 Merge 8.5.x-sync into master
2 parents 32bc307 + ceb197d commit 9f7039a

File tree

18 files changed

+308
-108
lines changed

18 files changed

+308
-108
lines changed

changes.d/6836.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug causing the results of `platform = $(subshell commands)` to be cached, and preventing re-evaluation for each task with the same config.

changes.d/6914.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug where jobs in the UI could regress to an earlier state.

changes.d/6924.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Include `log/version/` files in `cylc cat-log` and the UI log view.

cylc/flow/data_store_mgr.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
Set,
6868
TYPE_CHECKING,
6969
Tuple,
70+
Union,
7071
)
7172
import zlib
7273

@@ -1609,13 +1610,20 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):
16091610
poverride(rtconfig, overrides, prepend=True)
16101611
return rtconfig
16111612

1612-
def insert_job(self, name, cycle_point, status, job_conf):
1613+
def insert_job(
1614+
self,
1615+
name: str,
1616+
cycle_point: Union['PointBase', str],
1617+
status: str,
1618+
job_conf: dict,
1619+
):
16131620
"""Insert job into data-store.
16141621
16151622
Args:
1616-
name (str): Corresponding task name.
1617-
cycle_point (str|PointBase): Cycle point string
1618-
job_conf (dic):
1623+
name: Corresponding task name.
1624+
cycle_point: Cycle point string
1625+
status: The task's state.
1626+
job_conf:
16191627
Dictionary of job configuration used to generate
16201628
the job script.
16211629
(see TaskJobManager._prep_submit_task_job_impl)
@@ -1625,6 +1633,11 @@ def insert_job(self, name, cycle_point, status, job_conf):
16251633
None
16261634
16271635
"""
1636+
if status not in JOB_STATUS_SET:
1637+
# Ignore task-only states e.g. preparing
1638+
# https://github.com/cylc/cylc-flow/issues/4994
1639+
return
1640+
16281641
sub_num = job_conf['submit_num']
16291642
tp_tokens = self.id_.duplicate(
16301643
cycle=str(cycle_point),
@@ -1641,9 +1654,6 @@ def insert_job(self, name, cycle_point, status, job_conf):
16411654
# Job already exists (i.e. post-submission submit failure)
16421655
return
16431656

1644-
if status not in JOB_STATUS_SET:
1645-
return
1646-
16471657
j_buf = PbJob(
16481658
stamp=f'{j_id}@{update_time}',
16491659
id=j_id,
@@ -2761,12 +2771,12 @@ def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
27612771

27622772
def delta_job_attr(
27632773
self,
2764-
tokens: Tokens,
2774+
itask: 'TaskProxy',
27652775
attr_key: str,
27662776
attr_val: Any,
27672777
) -> None:
27682778
"""Set job attribute."""
2769-
j_id, job = self.store_node_fetcher(tokens)
2779+
j_id, job = self.store_node_fetcher(itask.job_tokens)
27702780
if not job:
27712781
return
27722782
j_delta = PbJob(stamp=f'{j_id}@{time()}')
@@ -2779,12 +2789,19 @@ def delta_job_attr(
27792789

27802790
def delta_job_state(
27812791
self,
2782-
tokens: Tokens,
2792+
itask: 'TaskProxy',
27832793
status: str,
27842794
) -> None:
27852795
"""Set job state."""
2786-
j_id, job = self.store_node_fetcher(tokens)
2787-
if not job or status not in JOB_STATUS_SET:
2796+
if status not in JOB_STATUS_SET:
2797+
# Ignore task-only states e.g. preparing
2798+
return
2799+
j_id, job = self.store_node_fetcher(itask.job_tokens)
2800+
if not job or (
2801+
# Don't cause backwards state change:
2802+
JOB_STATUSES_ALL.index(status) <= JOB_STATUSES_ALL.index(job.state)
2803+
and not itask.job_vacated
2804+
):
27882805
return
27892806
j_delta = PbJob(
27902807
stamp=f'{j_id}@{time()}',
@@ -2798,15 +2815,15 @@ def delta_job_state(
27982815

27992816
def delta_job_time(
28002817
self,
2801-
tokens: Tokens,
2818+
itask: 'TaskProxy',
28022819
event_key: str,
28032820
time_str: Optional[str] = None,
28042821
) -> None:
28052822
"""Set an event time in job pool object.
28062823
28072824
Set values of both event_key + '_time' and event_key + '_time_string'.
28082825
"""
2809-
j_id, job = self.store_node_fetcher(tokens)
2826+
j_id, job = self.store_node_fetcher(itask.job_tokens)
28102827
if not job:
28112828
return
28122829
j_delta = PbJob(stamp=f'{j_id}@{time()}')

cylc/flow/install_plugins/log_vc_info.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,8 @@ def write_vc_info(
285285
)
286286
info_file.parent.mkdir(exist_ok=True, parents=True)
287287
with open(info_file, 'w') as f:
288-
f.write(
289-
json.dumps(info, indent=JSON_INDENT)
288+
print(
289+
json.dumps(info, indent=JSON_INDENT), file=f
290290
)
291291

292292

@@ -356,7 +356,7 @@ def write_diff(
356356
try:
357357
_run_cmd(vcs, args, repo_path, stdout=f)
358358
except VCSMissingBaseError as exc:
359-
f.write(f"# No diff - {exc}")
359+
print(f"# No diff - {exc}", file=f)
360360
return diff_file
361361

362362

cylc/flow/platforms.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ def log_platform_event(
8383
def get_platform(
8484
task_conf: Optional[str] = None,
8585
task_name: str = UNKNOWN_TASK,
86-
bad_hosts: Optional[Set[str]] = None
86+
bad_hosts: Optional[Set[str]] = None,
87+
evaluated_host: Optional[str] = None,
8788
) -> Dict[str, Any]:
8889
...
8990

@@ -92,7 +93,8 @@ def get_platform(
9293
def get_platform(
9394
task_conf: Union[dict, 'OrderedDictWithDefaults'],
9495
task_name: str = UNKNOWN_TASK,
95-
bad_hosts: Optional[Set[str]] = None
96+
bad_hosts: Optional[Set[str]] = None,
97+
evaluated_host: Optional[str] = None,
9698
) -> Optional[Dict[str, Any]]:
9799
...
98100

@@ -108,7 +110,8 @@ def get_platform(
108110
def get_platform(
109111
task_conf: Union[str, dict, 'OrderedDictWithDefaults', None] = None,
110112
task_name: str = UNKNOWN_TASK,
111-
bad_hosts: Optional[Set[str]] = None
113+
bad_hosts: Optional[Set[str]] = None,
114+
evaluated_host: Optional[str] = None,
112115
) -> Optional[Dict[str, Any]]:
113116
"""Get a platform.
114117
@@ -121,6 +124,7 @@ def get_platform(
121124
task_name: Help produce more helpful error messages.
122125
bad_hosts: A set of hosts known to be unreachable (had an ssh 255
123126
error)
127+
evaluated_host: Host name evaluated from platform subshell.
124128
125129
Returns:
126130
platform: A platform definition dictionary. Uses either
@@ -169,6 +173,7 @@ def get_platform(
169173
glbl_cfg().get(['platforms']),
170174
task_job_section,
171175
task_remote_section,
176+
evaluated_host,
172177
),
173178
bad_hosts=bad_hosts,
174179
)
@@ -330,7 +335,8 @@ def get_platform_from_group(
330335
def platform_name_from_job_info(
331336
platforms: Union[dict, 'OrderedDictWithDefaults'],
332337
job: Dict[str, Any],
333-
remote: Dict[str, Any]
338+
remote: Dict[str, Any],
339+
evaluated_host: Optional[str] = None,
334340
) -> str:
335341
"""
336342
Find out which job platform to use given a list of possible platforms
@@ -385,6 +391,7 @@ def platform_name_from_job_info(
385391
job: Workflow config [runtime][TASK][job] section.
386392
remote: Workflow config [runtime][TASK][remote] section.
387393
platforms: Dictionary containing platform definitions.
394+
evaluated_host: Host is the result of evaluating a subshell.
388395
389396
Returns:
390397
platform: string representing a platform from the global config.
@@ -422,7 +429,9 @@ def platform_name_from_job_info(
422429

423430
# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
424431
# https://github.com/cylc/cylc-flow/pull/4975
425-
if 'host' in remote and remote['host']:
432+
if evaluated_host:
433+
task_host = evaluated_host
434+
elif 'host' in remote and remote['host']:
426435
task_host = remote['host']
427436
else:
428437
task_host = 'localhost'

cylc/flow/scripts/cat_log.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
),
107107
'i': ('install log', r'install/*-*install.log'),
108108
's': ('scheduler log', r'scheduler/*-*start*.log'),
109+
'v': ('version control info (JSON)', r'version/*'),
109110
}
110111

111112

cylc/flow/task_events_mgr.py

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -797,11 +797,9 @@ def process_message(
797797
# ... but either way update the job ID in the job proxy (it only
798798
# comes in via the submission message).
799799
if itask.run_mode != RunMode.SIMULATION:
800-
job_tokens = itask.tokens.duplicate(
801-
job=str(itask.submit_num)
802-
)
803800
self.data_store_mgr.delta_job_attr(
804-
job_tokens, 'job_id', itask.summary['submit_method_id'])
801+
itask, 'job_id', itask.summary['submit_method_id']
802+
)
805803
else:
806804
# In simulation mode submitted implies started:
807805
self.spawn_children(itask, TASK_OUTPUT_STARTED, forced)
@@ -1354,9 +1352,8 @@ def _process_message_failed(
13541352
if event_time is None:
13551353
event_time = get_current_time_string()
13561354
itask.set_summary_time('finished', event_time)
1357-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1358-
self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time)
1359-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_FAILED)
1355+
self.data_store_mgr.delta_job_time(itask, 'finished', event_time)
1356+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_FAILED)
13601357
self.workflow_db_mgr.put_update_task_jobs(itask, {
13611358
"run_status": 1,
13621359
"time_run_exit": event_time,
@@ -1394,12 +1391,11 @@ def _process_message_failed(
13941391

13951392
def _process_message_started(self, itask, event_time, forced):
13961393
"""Helper for process_message, handle a started message."""
1394+
self.data_store_mgr.delta_job_time(itask, 'started', event_time)
1395+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_RUNNING)
13971396
if itask.job_vacated:
13981397
itask.job_vacated = False
13991398
LOG.info(f"[{itask}] Vacated job restarted")
1400-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1401-
self.data_store_mgr.delta_job_time(job_tokens, 'started', event_time)
1402-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_RUNNING)
14031399
itask.set_summary_time('started', event_time)
14041400
self.workflow_db_mgr.put_update_task_jobs(itask, {
14051401
"time_run": itask.summary['started_time_string']})
@@ -1430,9 +1426,8 @@ def _process_message_succeeded(self, itask, event_time, forced):
14301426
Ignore forced.
14311427
"""
14321428

1433-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1434-
self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time)
1435-
self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_SUCCEEDED)
1429+
self.data_store_mgr.delta_job_time(itask, 'finished', event_time)
1430+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUCCEEDED)
14361431
itask.set_summary_time('finished', event_time)
14371432
self.workflow_db_mgr.put_update_task_jobs(itask, {
14381433
"run_status": 0,
@@ -1505,13 +1500,9 @@ def _process_message_submit_failed(
15051500
self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg)
15061501

15071502
# Register newly submit-failed job with the database and datastore.
1508-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
15091503
self._insert_task_job(
15101504
itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced)
1511-
self.data_store_mgr.delta_job_state(
1512-
job_tokens,
1513-
TASK_STATUS_SUBMIT_FAILED
1514-
)
1505+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMIT_FAILED)
15151506
self._reset_job_timers(itask)
15161507

15171508
return no_retries
@@ -1560,24 +1551,12 @@ def _process_message_submitted(
15601551
# Do after itask has changed state
15611552
self._insert_task_job(
15621553
itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced)
1563-
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
1564-
self.data_store_mgr.delta_job_time(
1565-
job_tokens,
1566-
'submitted',
1567-
event_time,
1568-
)
1554+
self.data_store_mgr.delta_job_time(itask, 'submitted', event_time)
15691555
if itask.run_mode == RunMode.SIMULATION:
15701556
# Simulate job started as well.
1571-
self.data_store_mgr.delta_job_time(
1572-
job_tokens,
1573-
'started',
1574-
event_time,
1575-
)
1557+
self.data_store_mgr.delta_job_time(itask, 'started', event_time)
15761558
else:
1577-
self.data_store_mgr.delta_job_state(
1578-
job_tokens,
1579-
TASK_STATUS_SUBMITTED,
1580-
)
1559+
self.data_store_mgr.delta_job_state(itask, TASK_STATUS_SUBMITTED)
15811560

15821561
def _insert_task_job(
15831562
self,

cylc/flow/task_job_mgr.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,10 +1220,10 @@ def _prep_submit_task_job(
12201220
f"\"{itask.identity}\" the following are not compatible:\n"
12211221
)
12221222

1223-
host_n, platform_name = None, None
1223+
host_name, platform_name = None, None
12241224
try:
12251225
if rtconfig['remote']['host'] is not None:
1226-
host_n = self.task_remote_mgr.eval_host(
1226+
host_name = self.task_remote_mgr.eval_host(
12271227
rtconfig['remote']['host']
12281228
)
12291229
else:
@@ -1242,27 +1242,26 @@ def _prep_submit_task_job(
12421242
return False
12431243
else:
12441244
# host/platform select not ready
1245-
if host_n is None and platform_name is None:
1245+
if host_name is None and platform_name is None:
12461246
return None
12471247
elif (
1248-
host_n is None
1248+
host_name is None
12491249
and rtconfig['platform']
12501250
and rtconfig['platform'] != platform_name
12511251
):
12521252
LOG.debug(
12531253
f"for task {itask.identity}: platform = "
12541254
f"{rtconfig['platform']} evaluated as {platform_name}"
12551255
)
1256-
rtconfig['platform'] = platform_name
1256+
12571257
elif (
12581258
platform_name is None
1259-
and rtconfig['remote']['host'] != host_n
1259+
and rtconfig['remote']['host'] != host_name
12601260
):
12611261
LOG.debug(
12621262
f"[{itask}] host = "
1263-
f"{rtconfig['remote']['host']} evaluated as {host_n}"
1263+
f"{rtconfig['remote']['host']} evaluated as {host_name}"
12641264
)
1265-
rtconfig['remote']['host'] = host_n
12661265

12671266
try:
12681267
platform = cast(
@@ -1271,7 +1270,10 @@ def _prep_submit_task_job(
12711270
# return early if the subshell is still evaluating.
12721271
'dict',
12731272
get_platform(
1274-
rtconfig, itask.tdef.name, bad_hosts=self.bad_hosts
1273+
platform_name or rtconfig,
1274+
itask.tdef.name,
1275+
bad_hosts=self.bad_hosts,
1276+
evaluated_host=host_name,
12751277
),
12761278
)
12771279
except PlatformLookupError as exc:

cylc/flow/task_proxy.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,11 @@ def __init__(
315315
)
316316
)
317317

318+
@property
319+
def job_tokens(self) -> 'Tokens':
320+
"""Return the job tokens for this task proxy."""
321+
return self.tokens.duplicate(job=str(self.submit_num))
322+
318323
def __repr__(self) -> str:
319324
return f"<{type(self).__name__} {self.identity} {self.state}>"
320325

0 commit comments

Comments
 (0)