Skip to content

Commit cdd936e

Browse files
author
Alan Christie
committed
feat: new _prepare_step_variables function
1 parent c39ddb7 commit cdd936e

File tree

3 files changed

+106
-27
lines changed

3 files changed

+106
-27
lines changed

tests/job-definitions/job-definitions.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,27 @@ jobs:
132132
concatenate:
133133
command: >-
134134
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
135+
# Simulate a multiple input files Job (combiner)...
136+
variables:
137+
inputs:
138+
properties:
139+
inputFile:
140+
type: files
141+
outputs:
142+
properties:
143+
outputBase:
144+
creates: '{{ outputFile }}'
145+
type: file
135146

136147
splitsmiles:
137148
command: >-
138149
copyf.py {{ inputFile }}
139-
# Simulate multiple output files...
150+
# Simulate a multiple output files Job (splitetr)...
140151
variables:
152+
inputs:
153+
properties:
154+
inputFile:
155+
type: file
141156
outputs:
142157
properties:
143158
outputBase:

tests/workflow-definitions/simple-python-split-combine.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@ steps:
1515
job: splitsmiles
1616
version: "1.0.0"
1717
variables:
18-
name: count
19-
value: "1"
2018
outputBase: chunk
2119
plumbing:
2220
- variable: inputFile
2321
from-workflow:
2422
variable: candidateMolecules
2523

26-
- name: parallel-step
24+
- name: parallel
2725
description: Add some params
2826
specification:
2927
collection: workflow-engine-unit-test-jobs

workflow/workflow_engine.py

Lines changed: 89 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import logging
2626
import sys
27+
from dataclasses import dataclass
2728
from typing import Any, Optional
2829

2930
import decoder.decoder as job_defintion_decoder
@@ -51,6 +52,20 @@
5152
_LOGGER.addHandler(logging.StreamHandler(sys.stdout))
5253

5354

55+
@dataclass
56+
class StepPreparationResponse:
57+
"""Step preparation response object. Iterations is +ve (non-zero) if a step
58+
can be launched - it's value indicates how many times. If a step can be launched
59+
'variables' will not be None. If a parallel set of steps can take place
60+
(even just one) 'iteration_variable' will be set and 'iteration_values'
61+
will be a list containing a value for eacdh step."""
62+
63+
iterations: int
64+
variables: dict[str, Any] | None = None
65+
iteration_variable: str | None = None
66+
iteration_values: list[str] | None = None
67+
68+
5469
class WorkflowEngine:
5570
"""The workflow engine."""
5671

@@ -126,10 +141,18 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
126141
# Now find the first step (index 0)...
127142
first_step: dict[str, Any] = wf_response["steps"][0]
128143

144+
sp_resp = self._prepare_step_variables(
145+
wf=wf_response, step_definition=first_step, rwf=rwf_response
146+
)
147+
assert sp_resp.variables is not None
129148
# Launch it.
130149
# If there's a launch problem the step (and running workflow) will have
131150
# and error, stopping it. There will be no Pod event as the launch has failed.
132-
self._launch(wf=wf_response, rwf=rwf_response, step_definition=first_step)
151+
self._launch(
152+
rwf=rwf_response,
153+
step_definition=first_step,
154+
step_preparation_response=sp_resp,
155+
)
133156

134157
def _handle_workflow_stop_message(self, r_wfid: str) -> None:
135158
"""Logic to handle a STOP message."""
@@ -265,8 +288,31 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
265288
# There's another step!
266289
# For this simple logic it is the next step.
267290
next_step = wf_response["steps"][step_index + 1]
291+
292+
# A mojor piece of work to accomplish is to get ourselves into a position
293+
# that allows us to check the step command can be executed.
294+
# We do this by compiling a map of variables we belive the step needs.
295+
296+
# If the step about to be launched is based on a prior step
297+
# that generates multiple outputs (files) then we have to
298+
# exit unless all of the step instances have completed.
299+
#
300+
# Do we need a 'prepare variables' function?
301+
# One that returns a map of variables or nothing
302+
# (e.g. 'nothing' when a step launch cannot be attempted)
303+
sp_resp = self._prepare_step_variables(
304+
wf=wf_response, step_definition=next_step, rwf=rwf_response
305+
)
306+
if sp_resp.iterations == 0:
307+
# Cannot prepare variables for this step,
308+
# we have to leave.
309+
return
310+
assert sp_resp.variables is not None
311+
268312
self._launch(
269-
wf=wf_response, rwf=rwf_response, step_definition=next_step
313+
rwf=rwf_response,
314+
step_definition=next_step,
315+
step_preparation_response=sp_resp,
270316
)
271317

272318
# Something was started (or there was a launch error and the step
@@ -361,20 +407,18 @@ def _validate_step_command(
361407
)
362408
return all_variables if success else message
363409

364-
def _launch(
410+
def _prepare_step_variables(
365411
self,
366412
*,
367413
wf: dict[str, Any],
368-
rwf: dict[str, Any],
369414
step_definition: dict[str, Any],
370-
) -> None:
415+
rwf: dict[str, Any],
416+
) -> StepPreparationResponse:
417+
"""Attempts to prepare a map of step variables. If variables cannot be
418+
presented to the step we return an object with 'iterations' set to zero."""
419+
371420
step_name: str = step_definition["name"]
372421
rwf_id: str = rwf["id"]
373-
project_id = rwf["project"]["id"]
374-
375-
# A mojor piece of work to accomplish is to get ourselves into a position
376-
# that allows us to check the step command can be executed.
377-
# We do this by compiling a map of variables we belive the step needs.
378422

379423
# We start with all the workflow variables that were provided
380424
# by the user when they "ran" the workflow. We're given a full set of
@@ -390,13 +434,10 @@ def _launch(
390434
msg = f"Failed command validation error_msg={error_msg}"
391435
_LOGGER.warning(msg)
392436
self._set_step_error(step_name, rwf_id, None, 1, msg)
393-
return
437+
return StepPreparationResponse(iterations=0)
394438

395439
variables: dict[str, Any] = error_or_variables
396440

397-
# A step replication number,
398-
# used only for steps expected to run in parallel (even if just once)
399-
step_replication_number: int = 0
400441
# Do we replicate this step (run it more than once)?
401442
# We do if a variable in this step's mapping block
402443
# refers to an output of a prior step whose type is 'files'.
@@ -405,7 +446,7 @@ def _launch(
405446
#
406447
# In this engine we onlhy act on the _first_ match, i.e. there CANNOT
407448
# be more than one prior step variable that is 'files'!
408-
replication_values: list[str] = []
449+
iter_values: list[str] = []
409450
iter_variable: str | None = None
410451
plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
411452
step_definition=step_definition
@@ -435,34 +476,59 @@ def _launch(
435476
output_variable=connector.in_,
436477
)
437478
)
438-
replication_values = result["output"].copy()
479+
iter_values = result["output"].copy()
439480
break
440481
# Stop if we've got an iteration variable
441482
if iter_variable:
442483
break
443484

444-
num_step_instances: int = max(1, len(replication_values))
445-
for iteration in range(num_step_instances):
485+
num_step_instances: int = max(1, len(iter_values))
486+
return StepPreparationResponse(
487+
variables=variables,
488+
iterations=num_step_instances,
489+
iteration_variable=iter_variable,
490+
iteration_values=iter_values,
491+
)
492+
493+
def _launch(
494+
self,
495+
*,
496+
rwf: dict[str, Any],
497+
step_definition: dict[str, Any],
498+
step_preparation_response: StepPreparationResponse,
499+
) -> None:
500+
step_name: str = step_definition["name"]
501+
rwf_id: str = rwf["id"]
502+
project_id = rwf["project"]["id"]
503+
504+
# A step replication number,
505+
# used only for steps expected to run in parallel (even if just once)
506+
step_replication_number: int = 0
507+
508+
variables = step_preparation_response.variables
509+
assert variables is not None
510+
for iteration in range(step_preparation_response.iterations):
446511

447512
# If we are replicating this step then we must replace the step's variable
448513
# with a value expected for this iteration.
449-
if iter_variable:
450-
iter_value: str = replication_values[iteration]
514+
if step_preparation_response.iteration_variable:
515+
assert step_preparation_response.iteration_values
516+
iter_value: str = step_preparation_response.iteration_values[iteration]
451517
_LOGGER.info(
452518
"Replicating step: %s iteration=%s variable=%s value=%s",
453519
step_name,
454520
iteration,
455-
iter_variable,
521+
step_preparation_response.iteration_variable,
456522
iter_value,
457523
)
458524
# Over-write the replicating variable
459525
# and set the replication number to a unique +ve non-zero value...
460-
variables[iter_variable] = iter_value
526+
variables[step_preparation_response.iteration_variable] = iter_value
461527
step_replication_number = iteration + 1
462528

463529
_LOGGER.info(
464530
"Launching step: %s RunningWorkflow=%s (name=%s)"
465-
" variables=%s project=%s",
531+
" step_variables=%s project=%s",
466532
step_name,
467533
rwf_id,
468534
rwf["name"],

0 commit comments

Comments
 (0)