Skip to content

Commit 0d70707

Browse files
committed
Use existing node runtime on job insert
1 parent 740e4c0 commit 0d70707

File tree

1 file changed

+79
-13
lines changed

1 file changed

+79
-13
lines changed

cylc/flow/data_store_mgr.py

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,38 @@
214214
WORKFLOW: {'log_records': 10}
215215
}
216216

217+
# internal runtime to protobuf field name mapping
218+
RUNTIME_MAPPING = {
219+
'completion': 'completion',
220+
'directives': 'directives',
221+
'environment': 'environment',
222+
'env-script': 'env_script',
223+
'err-script': 'err_script',
224+
'execution polling intervals': 'execution_polling_intervals',
225+
'execution retry delays': 'execution_retry_delays',
226+
'execution time limit': 'execution_time_limit',
227+
'exit-script': 'exit_script',
228+
'init-script': 'init_script',
229+
'outputs': 'outputs',
230+
'post-script': 'post_script',
231+
'platform': 'platform',
232+
'pre-script': 'pre_script',
233+
'run mode': 'run_mode',
234+
'script': 'script',
235+
'submission polling intervals': 'submission_polling_intervals',
236+
'submission retry delays': 'submission_retry_delays',
237+
'work sub-directory': 'work_sub_dir',
238+
}
239+
RUNTIME_LIST_JOINS = {
240+
'execution polling intervals',
241+
'execution retry delays',
242+
'submission polling intervals',
243+
'submission retry delays',
244+
}
245+
RUNTIME_JSON_DUMPS = {'directives', 'environment', 'outputs'}
246+
RUNTIME_STRINGIFYS = {'execution time limit'}
247+
RUNTIME_TRY_ITEMS = {'platform': 'name'}
248+
217249

218250
def setbuff(obj, key, value):
219251
"""Set an attribute on a protobuf object.
@@ -318,6 +350,47 @@ def runtime_from_config(rtconfig):
318350
)
319351

320352

353+
def runtime_from_partial(rtconfig, runtimeold=None):
354+
"""Populate runtime object from partial/full config.
355+
356+
Potentially slower with all the setattr calls, but no expected fields.
357+
"""
358+
runtime = PbRuntime()
359+
if runtimeold is not None:
360+
runtime.CopyFrom(runtimeold)
361+
for key, val in rtconfig.items():
362+
if val is None or key not in RUNTIME_MAPPING:
363+
continue
364+
elif key in RUNTIME_LIST_JOINS:
365+
setattr(runtime, RUNTIME_MAPPING[key], listjoin(val))
366+
elif key in RUNTIME_JSON_DUMPS:
367+
setattr(
368+
runtime,
369+
RUNTIME_MAPPING[key],
370+
json.dumps(
371+
[
372+
{'key': k, 'value': v}
373+
for k, v in val.items()
374+
]
375+
)
376+
)
377+
elif key in RUNTIME_TRY_ITEMS:
378+
try:
379+
setattr(
380+
runtime,
381+
RUNTIME_MAPPING[key],
382+
val[RUNTIME_TRY_ITEMS[key]]
383+
)
384+
except (KeyError, TypeError):
385+
with suppress(KeyError, TypeError):
386+
setattr(runtime, RUNTIME_MAPPING[key], val)
387+
elif key in RUNTIME_STRINGIFYS:
388+
setattr(runtime, RUNTIME_MAPPING[key], str(val or ''))
389+
else:
390+
setattr(runtime, RUNTIME_MAPPING[key], val)
391+
return runtime
392+
393+
321394
def reset_protobuf_object(msg_class, msg_orig):
322395
"""Reset object to clear memory build-up."""
323396
# See: https://github.com/protocolbuffers/protobuf/issues/19674
@@ -1659,13 +1732,9 @@ def insert_job(
16591732
)
16601733
# Not all fields are populated with some submit-failures,
16611734
# so use task cfg as base.
1662-
j_cfg = pdeepcopy(self._apply_broadcasts_to_runtime(
1663-
tp_tokens,
1664-
self.schd.config.cfg['runtime'][tproxy.name]
1665-
))
1666-
for key, val in job_conf.items():
1667-
j_cfg[key] = val
1668-
j_buf.runtime.CopyFrom(runtime_from_config(j_cfg))
1735+
j_buf.runtime.CopyFrom(
1736+
runtime_from_partial(job_conf, tproxy.runtime)
1737+
)
16691738

16701739
# Add in log files.
16711740
j_buf.job_log_dir = get_task_job_log(
@@ -2329,19 +2398,16 @@ def delta_broadcast(self):
23292398
self.updates_pending = True
23302399

23312400
def _generate_broadcast_node_deltas(self, node_data, node_type):
2332-
cfg = self.schd.config.cfg
2401+
rt_cfg = self.schd.config.cfg['runtime']
23332402
# NOTE: node_data may change during operation so make a copy
23342403
# see https://github.com/cylc/cylc-flow/pull/6397
23352404
for node_id, node in list(node_data.items()):
23362405
# Avoid removed tasks with deltas queued during reload.
2337-
if node.name not in cfg['runtime']:
2406+
if node.name not in rt_cfg:
23382407
continue
23392408
tokens = Tokens(node_id)
23402409
new_runtime = runtime_from_config(
2341-
self._apply_broadcasts_to_runtime(
2342-
tokens,
2343-
cfg['runtime'][node.name]
2344-
)
2410+
self._apply_broadcasts_to_runtime(tokens, rt_cfg[node.name])
23452411
)
23462412
new_sruntime = new_runtime.SerializeToString(
23472413
deterministic=True

0 commit comments

Comments
 (0)