|
215 | 215 | WORKFLOW: {'log_records': 10}
|
216 | 216 | }
|
217 | 217 |
|
| 218 | +# internal runtime to protobuf field name mapping |
| 219 | +RUNTIME_CFG_MAP_TO_FIELD = { |
| 220 | + 'completion': 'completion', |
| 221 | + 'directives': 'directives', |
| 222 | + 'environment': 'environment', |
| 223 | + 'env-script': 'env_script', |
| 224 | + 'err-script': 'err_script', |
| 225 | + 'execution polling intervals': 'execution_polling_intervals', |
| 226 | + 'execution retry delays': 'execution_retry_delays', |
| 227 | + 'execution time limit': 'execution_time_limit', |
| 228 | + 'exit-script': 'exit_script', |
| 229 | + 'init-script': 'init_script', |
| 230 | + 'outputs': 'outputs', |
| 231 | + 'post-script': 'post_script', |
| 232 | + 'platform': 'platform', |
| 233 | + 'pre-script': 'pre_script', |
| 234 | + 'run mode': 'run_mode', |
| 235 | + 'script': 'script', |
| 236 | + 'submission polling intervals': 'submission_polling_intervals', |
| 237 | + 'submission retry delays': 'submission_retry_delays', |
| 238 | + 'work sub-directory': 'work_sub_dir', |
| 239 | +} |
| 240 | +RUNTIME_LIST_JOINS = { |
| 241 | + 'execution polling intervals', |
| 242 | + 'execution retry delays', |
| 243 | + 'submission polling intervals', |
| 244 | + 'submission retry delays', |
| 245 | +} |
| 246 | +RUNTIME_JSON_DUMPS = {'directives', 'environment', 'outputs'} |
| 247 | +RUNTIME_STRINGIFYS = {'execution time limit'} |
| 248 | + |
218 | 249 |
|
219 | 250 | def setbuff(obj, key, value):
|
220 | 251 | """Set an attribute on a protobuf object.
|
@@ -319,6 +350,41 @@ def runtime_from_config(rtconfig):
|
319 | 350 | )
|
320 | 351 |
|
321 | 352 |
|
| 353 | +def runtime_from_partial(rtconfig, runtimeold: Optional[PbRuntime] = None): |
| 354 | + """Populate runtime object from partial/full config. |
| 355 | +
|
| 356 | + Potentially slower than the non-partial one, due to tha the setattr calls, |
| 357 | + but does not have expected fields. |
| 358 | + """ |
| 359 | + runtime = PbRuntime() |
| 360 | + if runtimeold is not None: |
| 361 | + runtime.CopyFrom(runtimeold) |
| 362 | + for key, val in rtconfig.items(): |
| 363 | + if val is None or key not in RUNTIME_CFG_MAP_TO_FIELD: |
| 364 | + continue |
| 365 | + elif key in RUNTIME_LIST_JOINS: |
| 366 | + setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], listjoin(val)) |
| 367 | + elif key in RUNTIME_JSON_DUMPS: |
| 368 | + setattr( |
| 369 | + runtime, |
| 370 | + RUNTIME_CFG_MAP_TO_FIELD[key], |
| 371 | + json.dumps( |
| 372 | + [ |
| 373 | + {'key': k, 'value': v} |
| 374 | + for k, v in val.items() |
| 375 | + ] |
| 376 | + ) |
| 377 | + ) |
| 378 | + elif key == 'platform' and isinstance(val, dict): |
| 379 | + with suppress(KeyError, TypeError): |
| 380 | + setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], val['name']) |
| 381 | + elif key in RUNTIME_STRINGIFYS: |
| 382 | + setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], str(val or '')) |
| 383 | + else: |
| 384 | + setattr(runtime, RUNTIME_CFG_MAP_TO_FIELD[key], val) |
| 385 | + return runtime |
| 386 | + |
| 387 | + |
322 | 388 | def reset_protobuf_object(msg_class, msg_orig):
|
323 | 389 | """Reset object to clear memory build-up."""
|
324 | 390 | # See: https://github.com/protocolbuffers/protobuf/issues/19674
|
@@ -1574,6 +1640,9 @@ def _process_internal_task_proxy(
|
1574 | 1640 | ext_trig.satisfied = satisfied
|
1575 | 1641 |
|
1576 | 1642 | for label, satisfied in itask.state.xtriggers.items():
|
| 1643 | + # Reload may have removed xtrigger of orphan task |
| 1644 | + if label not in self.schd.xtrigger_mgr.xtriggers.functx_map: |
| 1645 | + continue |
1577 | 1646 | sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
|
1578 | 1647 | itask, label).get_signature()
|
1579 | 1648 | xtrig = tproxy.xtriggers[f'{label}={sig}']
|
@@ -1657,13 +1726,9 @@ def insert_job(
|
1657 | 1726 | )
|
1658 | 1727 | # Not all fields are populated with some submit-failures,
|
1659 | 1728 | # so use task cfg as base.
|
1660 |
| - j_cfg = pdeepcopy(self._apply_broadcasts_to_runtime( |
1661 |
| - tp_tokens, |
1662 |
| - self.schd.config.cfg['runtime'][tproxy.name] |
1663 |
| - )) |
1664 |
| - for key, val in job_conf.items(): |
1665 |
| - j_cfg[key] = val |
1666 |
| - j_buf.runtime.CopyFrom(runtime_from_config(j_cfg)) |
| 1729 | + j_buf.runtime.CopyFrom( |
| 1730 | + runtime_from_partial(job_conf, tproxy.runtime) |
| 1731 | + ) |
1667 | 1732 |
|
1668 | 1733 | # Add in log files.
|
1669 | 1734 | j_buf.job_log_dir = get_task_job_log(
|
@@ -2327,16 +2392,16 @@ def delta_broadcast(self):
|
2327 | 2392 | self.updates_pending = True
|
2328 | 2393 |
|
2329 | 2394 | def _generate_broadcast_node_deltas(self, node_data, node_type):
|
2330 |
| - cfg = self.schd.config.cfg |
| 2395 | + rt_cfg = self.schd.config.cfg['runtime'] |
2331 | 2396 | # NOTE: node_data may change during operation so make a copy
|
2332 | 2397 | # see https://github.com/cylc/cylc-flow/pull/6397
|
2333 | 2398 | for node_id, node in list(node_data.items()):
|
| 2399 | + # Avoid removed tasks with deltas queued during reload. |
| 2400 | + if node.name not in rt_cfg: |
| 2401 | + continue |
2334 | 2402 | tokens = Tokens(node_id)
|
2335 | 2403 | new_runtime = runtime_from_config(
|
2336 |
| - self._apply_broadcasts_to_runtime( |
2337 |
| - tokens, |
2338 |
| - cfg['runtime'][node.name] |
2339 |
| - ) |
| 2404 | + self._apply_broadcasts_to_runtime(tokens, rt_cfg[node.name]) |
2340 | 2405 | )
|
2341 | 2406 | new_sruntime = new_runtime.SerializeToString(
|
2342 | 2407 | deterministic=True
|
|
0 commit comments