Skip to content

Commit c39ddb7

Browse files
author
Alan Christie
committed
refactor: Better function and variable naming (plumbing)
1 parent 5e2c8bc commit c39ddb7

File tree

2 files changed

+43
-31
lines changed

2 files changed

+43
-31
lines changed

workflow/decoder.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,38 +127,42 @@ def get_step_input_variable_names(
127127
return variable_names
128128

129129

130-
def get_step_workflow_plumbing(*, step: dict[str, Any]) -> list[Connector]:
131-
"""Returns a list of workflow vaiable name to step variable name
132-
Translation objects for the given step."""
133-
variable_mapping: list[Connector] = []
134-
if "plumbing" in step:
135-
for v_map in step["plumbing"]:
130+
def get_step_workflow_variable_connections(
131+
*, step_definition: dict[str, Any]
132+
) -> list[Connector]:
133+
"""Returns a list of connectors that connect a workflow variable name
134+
to a step variable name for the given step definition."""
135+
connections: list[Connector] = []
136+
if "plumbing" in step_definition:
137+
for v_map in step_definition["plumbing"]:
136138
if "from-workflow" in v_map:
137-
variable_mapping.append(
139+
connections.append(
138140
Connector(
139141
in_=v_map["from-workflow"]["variable"], out=v_map["variable"]
140142
)
141143
)
142-
return variable_mapping
143-
144-
145-
def get_step_prior_step_plumbing(*, step: dict[str, Any]) -> dict[str, list[Connector]]:
146-
"""Returns list of Translation objects, indexed by prior step name,
147-
that identify source step (output) variable name to this step's (input)
148-
variable name."""
149-
variable_mapping: dict[str, list[Connector]] = {}
150-
if "plumbing" in step:
151-
for v_map in step["plumbing"]:
144+
return connections
145+
146+
147+
def get_step_prior_step_plumbing(
148+
*, step_definition: dict[str, Any]
149+
) -> dict[str, list[Connector]]:
150+
"""Returns list of variable Connections, indexed by prior step name,
151+
that identify a source step variable name (an output) to an input variable in this
152+
step (an input)."""
153+
plumbing: dict[str, list[Connector]] = {}
154+
if "plumbing" in step_definition:
155+
for v_map in step_definition["plumbing"]:
152156
if "from-step" in v_map:
153157
step_name = v_map["from-step"]["name"]
154158
step_variable = v_map["from-step"]["variable"]
155159
# Tuple is "from" -> "to"
156-
if step_name in variable_mapping:
157-
variable_mapping[step_name].append(
160+
if step_name in plumbing:
161+
plumbing[step_name].append(
158162
Connector(in_=step_variable, out=v_map["variable"])
159163
)
160164
else:
161-
variable_mapping[step_name] = [
165+
plumbing[step_name] = [
162166
Connector(in_=step_variable, out=v_map["variable"])
163167
]
164-
return variable_mapping
168+
return plumbing

workflow/workflow_engine.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
Connector,
4444
get_step,
4545
get_step_prior_step_plumbing,
46-
get_step_workflow_plumbing,
46+
get_step_workflow_variable_connections,
4747
)
4848

4949
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -129,7 +129,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
129129
# Launch it.
130130
# If there's a launch problem the step (and running workflow) will have
131131
# and error, stopping it. There will be no Pod event as the launch has failed.
132-
self._launch(wf=wf_response, rwf=rwf_response, step=first_step)
132+
self._launch(wf=wf_response, rwf=rwf_response, step_definition=first_step)
133133

134134
def _handle_workflow_stop_message(self, r_wfid: str) -> None:
135135
"""Logic to handle a STOP message."""
@@ -265,7 +265,9 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
265265
# There's another step!
266266
# For this simple logic it is the next step.
267267
next_step = wf_response["steps"][step_index + 1]
268-
self._launch(wf=wf_response, rwf=rwf_response, step=next_step)
268+
self._launch(
269+
wf=wf_response, rwf=rwf_response, step_definition=next_step
270+
)
269271

270272
# Something was started (or there was a launch error and the step
271273
# and running workflow error will have been set).
@@ -330,7 +332,7 @@ def _validate_step_command(
330332
# "in" variables are worklfow variables, and "out" variables
331333
# are expected Job variables. We use this to add variables
332334
# to the "all variables" map.
333-
for connector in get_step_workflow_plumbing(step=step):
335+
for connector in get_step_workflow_variable_connections(step_definition=step):
334336
assert connector.in_ in running_workflow_variables
335337
all_variables[connector.out] = running_workflow_variables[connector.in_]
336338

@@ -339,7 +341,7 @@ def _validate_step_command(
339341
# us a map indexed by prior step name that's a list of "in" "out"
340342
# tuples as above.
341343
prior_step_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
342-
step=step
344+
step_definition=step
343345
)
344346
for prior_step_name, connections in prior_step_plumbing.items():
345347
# Retrieve the prior "running" step
@@ -360,9 +362,13 @@ def _validate_step_command(
360362
return all_variables if success else message
361363

362364
def _launch(
363-
self, *, wf: dict[str, Any], rwf: dict[str, Any], step: dict[str, Any]
365+
self,
366+
*,
367+
wf: dict[str, Any],
368+
rwf: dict[str, Any],
369+
step_definition: dict[str, Any],
364370
) -> None:
365-
step_name: str = step["name"]
371+
step_name: str = step_definition["name"]
366372
rwf_id: str = rwf["id"]
367373
project_id = rwf["project"]["id"]
368374

@@ -376,7 +382,7 @@ def _launch(
376382
rwf_variables: dict[str, Any] = rwf.get("variables", {})
377383
error_or_variables: str | dict[str, Any] = self._validate_step_command(
378384
running_workflow_id=rwf_id,
379-
step=step,
385+
step=step_definition,
380386
running_workflow_variables=rwf_variables,
381387
)
382388
if isinstance(error_or_variables, str):
@@ -401,7 +407,9 @@ def _launch(
401407
# be more than one prior step variable that is 'files'!
402408
replication_values: list[str] = []
403409
iter_variable: str | None = None
404-
plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(step=step)
410+
plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
411+
step_definition=step_definition
412+
)
405413
for p_step_name, connections in plumbing.items():
406414
# We need to get the Job definition for each step
407415
# and then check whether the (ouptu) variable is of type 'files'...
@@ -468,7 +476,7 @@ def _launch(
468476
debug=rwf.get("debug"),
469477
launching_user_name=rwf["running_user"],
470478
launching_user_api_token=rwf["running_user_api_token"],
471-
specification=step["specification"],
479+
specification=step_definition["specification"],
472480
variables=variables,
473481
running_workflow_id=rwf_id,
474482
step_name=step_name,

0 commit comments

Comments
 (0)