Skip to content

Commit 5989c87

Browse files
author
Alan Christie
committed
feat: Passes prior step to launcher
1 parent 8994624 commit 5989c87

File tree

2 files changed

+74
-23
lines changed

2 files changed

+74
-23
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ warn_unused_ignores = true
5151

5252
[tool.pylint]
5353
disable = [
54+
"fixme",
5455
"R",
5556
"too-few-public-methods",
5657
"too-many-arguments",

workflow/workflow_engine.py

Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,15 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
250250
self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed")
251251
return
252252

253-
# If we get here the prior step completed successfully
254-
# and so we can mark the Step as DOne (successfully),
255-
# and then inspect the Workflow to determine the next step.
253+
# If we get here the prior step completed successfully and we can decide
254+
# whether the step has outputs (files) that need to be written to the
255+
# Project directory, while also marking the Step as DONE (successfully).
256+
# We pass the outputs to the DM via a call to the API adapter's realise_outputs().
257+
# In return it copies (links) these files to the Project directory.
258+
#
259+
# We then inspect the Workflow to determine the next step.
260+
261+
# ToDo
256262

257263
self._wapi_adapter.set_running_workflow_step_done(
258264
running_workflow_step_id=r_wfsid,
@@ -319,11 +325,18 @@ def _validate_step_command(
319325
*,
320326
running_workflow_step_id: str,
321327
step: dict[str, Any],
328+
workflow_steps: list[dict[str, Any]],
329+
our_step_index: int,
322330
running_workflow_variables: dict[str, Any] | None = None,
323331
) -> str | dict[str, Any]:
324332
"""Returns an error message if the command isn't valid.
325333
Without a message we return all the variables that were (successfully)
326-
applied to the command."""
334+
applied to the command.
335+
336+
We are also given a list of steps in workflow_steps and out position in
337+
the list with our_step_index."""
338+
assert our_step_index >= 0
339+
327340
# We get the Job from the step specification, which must contain
328341
# the keys "collection", "job", and "version". Here we assume that
329342
# the workflow definition has passed the RUN-level validation
@@ -380,52 +393,46 @@ def _validate_step_command(
380393
if running_workflow_variables:
381394
all_variables |= running_workflow_variables
382395

383-
# This gives all the running workflow and step-specific variables.
384-
# Now we have to inspect the workflow step 'inputs' (and 'options')
385-
# and see if there are further variables that need constructing
386-
# and then adding (merging) into the 'all_variables' dictionary.
387-
388-
wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
389-
running_workflow_step_id=running_workflow_step_id,
390-
)
391-
392396
# We must always process the current step's variables
393397
_LOGGER.debug("Validating step %s (%s)", step, running_workflow_step_id)
394398
inputs = step.get("inputs", [])
395399
outputs = step.get("outputs", [])
396400
previous_step_outputs = []
397-
our_index: int = wf_step_data["caller_step_index"]
398-
assert our_index >= 0
399401
_LOGGER.debug(
400-
"We are at workflow step index %d (%s)", our_index, running_workflow_step_id
402+
"We are at workflow step index %d (%s)",
403+
our_step_index,
404+
running_workflow_step_id,
401405
)
402406

403-
if our_index > 0:
407+
if our_step_index > 0:
404408
# resolve all previous steps
405409
previous_step_names = set()
406410
for inp in inputs:
407411
if step_name := inp["from"].get("step", None):
408412
previous_step_names.add(step_name)
409413

410-
for step in wf_step_data["steps"]:
414+
for step in workflow_steps:
411415
if step["name"] in previous_step_names:
412416
previous_step_outputs.extend(step.get("outputs", []))
413417

414418
_LOGGER.debug(
415419
"Index %s (%s) workflow_variables=%s",
416-
our_index,
420+
our_step_index,
417421
running_workflow_step_id,
418422
all_variables,
419423
)
420424
_LOGGER.debug(
421-
"Index %s (%s) inputs=%s", our_index, running_workflow_step_id, inputs
425+
"Index %s (%s) inputs=%s", our_step_index, running_workflow_step_id, inputs
422426
)
423427
_LOGGER.debug(
424-
"Index %s (%s) outputs=%s", our_index, running_workflow_step_id, outputs
428+
"Index %s (%s) outputs=%s",
429+
our_step_index,
430+
running_workflow_step_id,
431+
outputs,
425432
)
426433
_LOGGER.debug(
427434
"Index %s (%s) previous_step_outputs=%s",
428-
our_index,
435+
our_step_index,
429436
running_workflow_step_id,
430437
previous_step_outputs,
431438
)
@@ -452,7 +459,7 @@ def _validate_step_command(
452459
all_variables |= step_vars
453460
_LOGGER.debug(
454461
"Index %s (%s) all_variables=%s",
455-
our_index,
462+
our_step_index,
456463
running_workflow_step_id,
457464
all_variables,
458465
)
@@ -481,6 +488,15 @@ def _launch(
481488

482489
_LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name)
483490

491+
# Get step data - importantly, giving us the sequence of steps in the response.
492+
# Steps will be in wf_step_data["steps"] and our position in the list
493+
# is wf_step_data["caller_step_index"]
494+
wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
495+
running_workflow_step_id=rwfs_id,
496+
)
497+
assert wf_step_data["caller_step_index"] >= 0
498+
our_step_index: int = wf_step_data["caller_step_index"]
499+
484500
# Now check the step command can be executed
485501
# (by trying to decoding the Job command).
486502
#
@@ -491,6 +507,8 @@ def _launch(
491507
error_or_variables: str | dict[str, Any] = self._validate_step_command(
492508
running_workflow_step_id=rwfs_id,
493509
step=step,
510+
workflow_steps=wf_step_data["steps"],
511+
our_step_index=our_step_index,
494512
running_workflow_variables=running_workflow_variables,
495513
)
496514
if isinstance(error_or_variables, str):
@@ -514,6 +532,36 @@ def _launch(
514532
variables,
515533
)
516534

535+
# When we launch a step we need to identify all the prior steps in the workflow,
536+
# those we depend on. The DataManager will then link their outputs to
537+
# out instance directory. For simple workflows there is only one prior step,
538+
# and it's the one immediately prior to this one.
539+
#
540+
# We put all the prior step IDs in: -
541+
# 'running_workflow_step_prior_steps'
542+
# A list of step UUID strings.
543+
#
544+
# In this 'simple' linear implementation that is simply the immediately
545+
# preceding step.
546+
prior_steps: list[str] = []
547+
if our_step_index > 0:
548+
# We need the step ID of the prior step.
549+
prior_step_name: str = wf_step_data["steps"][our_step_index - 1]["name"]
550+
step_response, _ = self._wapi_adapter.get_running_workflow_step_by_name(
551+
name=prior_step_name,
552+
running_workflow_id=rwf_id,
553+
)
554+
assert "id" in step_response
555+
prior_steps.append(step_response["id"])
556+
557+
# We must also identify workflow inputs that are required by the step we are
558+
# about to launch and pass those using: -
559+
#
560+
# 'running_workflow_step_inputs'
561+
# A list of string pairs (input filename and output filename)
562+
# (with relative paths if appropriate.
563+
inputs: list[tuple[str, str]] | None = None
564+
517565
lp: LaunchParameters = LaunchParameters(
518566
project_id=project_id,
519567
name=step_name,
@@ -524,6 +572,8 @@ def _launch(
524572
specification_variables=variables,
525573
running_workflow_id=rwf_id,
526574
running_workflow_step_id=rwfs_id,
575+
running_workflow_step_prior_steps=prior_steps,
576+
running_workflow_step_inputs=inputs,
527577
)
528578
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
529579
if lr.error_num:

0 commit comments

Comments
 (0)