Skip to content

Commit cdddc35

Browse files
author
Alan Christie
committed
feat: Add from-link-prefix variables
1 parent 179135a commit cdddc35

File tree

6 files changed

+90
-58
lines changed

6 files changed

+90
-58
lines changed

tests/job-definitions/job-definitions.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,19 @@ jobs:
131131
132132
concatenate:
133133
command: >-
134-
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
134+
concatenate.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
135135
# Simulate a multiple input files Job (combiner)...
136136
variables:
137137
inputs:
138138
properties:
139139
inputFile:
140140
type: files
141+
options:
142+
type: object
143+
properties:
144+
inputDirPrefix:
145+
title: Optional inoput directory prefix
146+
type: string
141147
outputs:
142148
properties:
143149
outputBase:

tests/jobs/concatenate.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
parser = argparse.ArgumentParser(
44
prog="addcol",
5-
description="Takes a list of files and writes them into single outputfile",
5+
description="Takes an optional directory prefix and a file,"
6+
" and combines all the input files that are found"
7+
" into single outputfile",
68
)
7-
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
9+
parser.add_argument("--inputDirPrefix")
10+
parser.add_argument("--inputFile", required=True)
811
parser.add_argument("-o", "--outputFile", required=True)
912
args = parser.parse_args()
1013

1114

1215
with open(args.outputFile, "wt", encoding="utf8") as ofile:
13-
for f in args.inputFile:
14-
ofile.write(f.read())
16+
with open(args.inputFile, "rt", encoding="utf8") as ifile:
17+
ofile.write(ifile.read())

tests/workflow-definitions/simple-python-split-combine.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ steps:
4646
variables:
4747
outputFile: results.smi
4848
plumbing:
49+
- variable: outputFile
50+
from-workflow:
51+
variable: combination
4952
- variable: inputFile
5053
from-step:
5154
name: parallel
5255
variable: outputFile
53-
- variable: outputFile
54-
from-workflow:
55-
variable: combination
56+
- variable: inputDirPrefix
57+
from-link-prefix:
5658
- variable: outputFile
5759
to-project:

workflow/decoder.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def get_step_workflow_variable_connections(
144144
return connections
145145

146146

147-
def get_step_prior_step_plumbing(
147+
def get_step_prior_step_connections(
148148
*, step_definition: dict[str, Any]
149149
) -> dict[str, list[Connector]]:
150150
"""Returns list of variable Connections, indexed by prior step name,
@@ -166,3 +166,14 @@ def get_step_prior_step_plumbing(
166166
Connector(in_=step_variable, out=v_map["variable"])
167167
]
168168
return plumbing
169+
170+
171+
def get_step_link_prefix_variables(*, step_definition: dict[str, Any]) -> set[str]:
172+
"""Returns the set of variables expected to be set to the value
173+
of the instance directory prefix."""
174+
variables: set[str] = set()
175+
if "plumbing" in step_definition:
176+
for v_map in step_definition["plumbing"]:
177+
if "from-link-prefix" in v_map:
178+
variables.add(v_map["variable"])
179+
return variables

workflow/workflow-schema.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,29 @@ definitions:
102102
- variable
103103
- from-workflow
104104

105+
# A Step variable
106+
# (whose value is set to the value of a directory prefix used when the DM
107+
# links the instance directories of prior step instances into this
108+
# step's instance directory)
109+
#
110+
# This _must_ be treated by the step's job as a directory prefix,
111+
# typiclaly '.instance-', that can be used to identify directories in this step's
112+
# execution directory where the execution directory of prior steps
113+
# are hard-linked by the DM. A job can find all the prior step directory names
114+
# using the selected variable (e.g. inspect any directory name
115+
# that starts with "{variable}").
116+
step-variable-from-link-prefix:
117+
type: object
118+
additionalProperties: false
119+
properties:
120+
variable:
121+
$ref: '#/definitions/variable-name'
122+
from-link-prefix:
123+
type: 'null'
124+
required:
125+
- variable
126+
- from-link-prefix
127+
105128
# A Step variable
106129
# (whose value (a file) is to be copied to the project directory)
107130
step-variable-to-project:
@@ -181,6 +204,7 @@ definitions:
181204
anyOf:
182205
- $ref: "#/definitions/step-variable-from-step"
183206
- $ref: "#/definitions/step-variable-from-workflow"
207+
- $ref: "#/definitions/step-variable-from-link-prefix"
184208
- $ref: "#/definitions/step-variable-to-project"
185209
minItems: 1
186210
required:

workflow/workflow_engine.py

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
from .decoder import (
4444
Connector,
4545
get_step,
46-
get_step_prior_step_plumbing,
46+
get_step_link_prefix_variables,
47+
get_step_prior_step_connections,
4748
get_step_workflow_variable_connections,
4849
)
4950

@@ -77,10 +78,15 @@ def __init__(
7778
*,
7879
wapi_adapter: WorkflowAPIAdapter,
7980
instance_launcher: InstanceLauncher,
81+
step_link_prefix: str = ".instance-",
8082
):
83+
"""Initialiser, given a Workflow API adapter, Instance laucnher,
84+
and a step (directory) link prefix (the directory prefix the DM uses to hard-link
85+
prior step instanes into the next step, typically '.instance-')"""
8186
# Keep the dependent objects
8287
self._wapi_adapter = wapi_adapter
8388
self._instance_launcher = instance_launcher
89+
self._step_link_prefix = step_link_prefix
8490

8591
def handle_message(self, msg: Message) -> None:
8692
"""Expect Workflow and Pod messages.
@@ -393,7 +399,7 @@ def _prepare_step(
393399
our_inputs: dict[str, Any] = job_defintion_decoder.get_inputs(
394400
our_job_definition
395401
)
396-
our_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
402+
our_plumbing: dict[str, list[Connector]] = get_step_prior_step_connections(
397403
step_definition=step_definition
398404
)
399405
step_is_combiner: bool = False
@@ -483,60 +489,40 @@ def _prepare_step(
483489
assert connector.in_ in rwf_variables
484490
variables[connector.out] = rwf_variables[connector.in_]
485491

492+
# Process the step's "plumbing" relating to link-prefix variables.
493+
#
494+
# This will be a set of variable names. We just set each one
495+
# to the built-in step link prefix.
496+
for link_variable in get_step_link_prefix_variables(
497+
step_definition=step_definition
498+
):
499+
variables[link_variable] = self._step_link_prefix
500+
486501
# Now process variables (in the "plumbing" block)
487502
# that relate to values used in prior steps.
488503
#
489504
# The decoder gives us a map indexed by prior step name that's a list of
490-
# "in" "out" connectors as above. If this is a combiner step remember
505+
# "in"/"out" connectors as above. If this is a combiner step remember
491506
# that the combiner_input_variable is a used as a list.
492-
prior_step_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
493-
step_definition=step_definition
507+
prior_step_plumbing: dict[str, list[Connector]] = (
508+
get_step_prior_step_connections(step_definition=step_definition)
494509
)
495510
for prior_step_name, connections in prior_step_plumbing.items():
496-
if step_is_combiner and prior_step_name == step_name_being_combined:
497-
assert combiner_input_variable
498-
input_source_list: list[str] = []
499-
for replica in range(num_step_recplicas_being_combined):
500-
prior_step, _ = (
501-
self._wapi_adapter.get_running_workflow_step_by_name(
502-
name=prior_step_name,
503-
replica=replica,
504-
running_workflow_id=rwf_id,
505-
)
506-
)
507-
# Copy "in" value to "out"...
508-
# accumulating thiose for the 'combining' variable,
509-
# which will be set as a list when we're done.
510-
for connector in connections:
511-
assert connector.in_ in prior_step["variables"]
512-
if connector.out == combiner_input_variable:
513-
# Each instance may have a different value
514-
input_source_list.append(
515-
prior_step["variables"][connector.in_]
516-
)
517-
elif replica == 0:
518-
# Only the first instance value are of interest,
519-
# the rest wil be the same - only one variable
520-
# is a list of different values.
521-
variables[connector.out] = prior_step["variables"][
522-
connector.in_
523-
]
524-
# Now we have accumulated the prior steps values (files)
525-
# set the combiner's corresponding input variable...
526-
variables[combiner_input_variable] = input_source_list
527-
else:
528-
# Not a preior step for a combiner,
529-
# or not a step being combined in a combiner.
530-
#
531-
# Retrieve the prior "running" step
532-
# in order to get the variables that were set there...
533-
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
534-
name=prior_step_name, running_workflow_id=rwf_id
535-
)
536-
# Copy "in" value to "out"...
537-
for connector in connections:
538-
assert connector.in_ in prior_step["variables"]
539-
variables[connector.out] = prior_step["variables"][connector.in_]
511+
# Retrieve the first prior "running" step in order to get the variables
512+
# that were used for it.
513+
#
514+
# For a combiner step we only need to inspect the first instance of
515+
# the prior step (the default replica value is '0').
516+
# We assume all the combiner's prior (parallel) instances
517+
# have the same variables and values.
518+
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
519+
name=prior_step_name,
520+
running_workflow_id=rwf_id,
521+
)
522+
# Copy "in" value to "out"...
523+
for connector in connections:
524+
assert connector.in_ in prior_step["variables"]
525+
variables[connector.out] = prior_step["variables"][connector.in_]
540526

541527
# All variables are set ...
542528
# is this enough to satisfy the setp's Job command?

0 commit comments

Comments
 (0)