Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ warn_unused_ignores = true

[tool.pylint]
disable = [
"fixme",
"R",
"too-few-public-methods",
"too-many-arguments",
Expand Down
74 changes: 74 additions & 0 deletions tests/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,77 @@ def test_set_variables_from_options_for_step_for_simnple_python_molprops_with_op
assert "value" in new_variables
assert new_variables["name"] == "propertyName"
assert new_variables["value"] == "propertyValue"


def test_get_workflow_inputs_for_step_with_name_step1():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1"
)

# Assert
assert len(inputs) == 1
assert "candidateMolecules" in inputs


def test_get_workflow_inputs_for_step_with_name_step2():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2"
)

# Assert
assert not inputs


def test_get_workflow_inputs_for_step_with_unkown_step_name():
# Arrange

# Act
inputs = decoder.get_workflow_input_names_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown"
)

# Assert
assert not inputs


def test_get_workflow_outputs_for_step_with_name_step1():
# Arrange

# Act
outputs = decoder.get_workflow_output_values_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1"
)

# Assert
assert not outputs


def test_get_workflow_outputs_for_step_with_name_step2():
# Arrange

# Act
outputs = decoder.get_workflow_output_values_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2"
)

# Assert
assert len(outputs) == 1
assert "clustered-molecules.smi" in outputs


def test_get_workflow_outputs_for_step_with_unkown_step_name():
# Arrange

# Act
outputs = decoder.get_workflow_output_values_for_step(
_SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown"
)

# Assert
assert not outputs
46 changes: 46 additions & 0 deletions tests/test_test_api_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,49 @@ def test_get_workflow_steps_driving_this_step_when_2nd_step():
assert response["steps"][0]["name"] == "step-1"
assert response["steps"][1]["name"] == "step-2"
assert response["steps"][2]["name"] == "step-3"


def test_get_running_workflow_step_by_name():
# Arrange
utaa = UnitTestWorkflowAPIAdapter()
response = utaa.create_workflow(
workflow_definition={
"name": "blah",
"steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}],
}
)
response = utaa.create_running_workflow(
user_id="dlister",
workflow_id=response["id"],
project_id=TEST_PROJECT_ID,
variables={},
)
rwf_id = response["id"]
response, _ = utaa.create_running_workflow_step(
running_workflow_id=rwf_id, step="step-2"
)
rwfs_id = response["id"]

# Act
response, _ = utaa.get_running_workflow_step_by_name(
name="step-2", running_workflow_id=rwf_id
)

# Assert
assert response["running_workflow"]["id"] == rwf_id
assert response["name"] == "step-2"
assert response["id"] == rwfs_id


def test_basic_realise():
# Arrange
utaa = UnitTestWorkflowAPIAdapter()

# Act
response, _ = utaa.realise_outputs(
running_workflow_step_id="r-workflow-step-00000000-0000-0000-0000-000000000001",
outputs=["a.txt"],
)

# Assert
assert not response
23 changes: 21 additions & 2 deletions tests/wapi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,25 @@ def get_running_workflow_step(
response["id"] = running_workflow_step_id
return response, 0

def get_running_workflow_step_by_name(
self, *, name: str, running_workflow_id: str
) -> dict[str, Any]:
UnitTestWorkflowAPIAdapter.lock.acquire()
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
running_workflow_step = Unpickler(pickle_file).load()
UnitTestWorkflowAPIAdapter.lock.release()

print(f"name={name} running_workflow_id={running_workflow_id}")
for rwfs_id, record in running_workflow_step.items():
print(f"rwfs_id={rwfs_id} record={record}")
if record["running_workflow"]["id"] != running_workflow_id:
continue
if record["name"] == name:
response = record
response["id"] = rwfs_id
return response, 0
return {}, 0

def set_running_workflow_step_variables(
self,
*,
Expand Down Expand Up @@ -362,8 +381,8 @@ def get_running_workflow_steps(self, *, running_workflow_id: str) -> dict[str, A
return {"count": len(steps), "running_workflow_steps": steps}

def realise_outputs(
self, *, running_workflow_step_id: str, variables: dict[str, str]
self, *, running_workflow_step_id: str, outputs: list[str, str]
) -> tuple[dict[str, Any], int]:
del running_workflow_step_id
del variables
del outputs
return {}, 0
41 changes: 41 additions & 0 deletions workflow/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,47 @@ def get_variable_names(definition: dict[str, Any]) -> list[str]:
return wf_variable_names


def get_workflow_input_names_for_step(
definition: dict[str, Any], name: str
) -> list[str]:
"""Given a Workflow definition and a step name we return a list of workflow
input variable names the step expects. To do this we iterate through the step's
inputs to find those that are declared 'from->workflow-input'.

To get the input (a filename) the caller simply looks these names up
in the variable map."""
inputs: list[str] = []
for step in definition.get("steps", {}):
if step["name"] == name and "inputs" in step:
# Find all the workflow inputs.
# This gives us the name of the workflow input variable
# and the name of the step input (Job) variable.
inputs.extend(
step_input["from"]["workflow-input"]
for step_input in step["inputs"]
if "from" in step_input and "workflow-input" in step_input["from"]
)
return inputs


def get_workflow_output_values_for_step(
definition: dict[str, Any], name: str
) -> list[str]:
"""Given a Workflow definition and a step name we return a list of workflow
out variable names the step creates. To do this we iterate through the workflows's
outputs to find those that are declared 'from' our step."""
wf_outputs = definition.get("variable-mapping", {}).get("outputs", {})
outputs: list[str] = []
outputs.extend(
output["as"]
for output in wf_outputs
if "from" in output
and "step" in output["from"]
and output["from"]["step"] == name
)
return outputs


def set_variables_from_options_for_step(
definition: dict[str, Any], variables: dict[str, Any], step_name: str
) -> dict[str, Any]:
Expand Down
40 changes: 35 additions & 5 deletions workflow/workflow_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,37 @@ def get_running_workflow_step(
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
# },

@abstractmethod
def get_running_workflow_step_by_name(
self, *, name: str, running_workflow_id: str
) -> tuple[dict[str, Any], int]:
"""Get a RunningWorkflowStep Record given a step name
(and its RUnningWorkflow ID)"""
# Should return:
# {
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
# "name:": "step-1234",
# "done": False,
# "success": false,
# "error_num": 0,
# "error_msg": "",
# "variables": {
# "x": 1,
# "y": 2,
# },
# "running_workflow": {
# "id": "r-workflow-00000000-0000-0000-0000-000000000001",
# },
# }
# If not present an empty dictionary should be returned.
#
# For steps that are not the first in a workflow the following field
# can be expected in the response: -
#
# "prior_running_workflow_step": {
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
# },

@abstractmethod
def set_running_workflow_step_variables(
self,
Expand Down Expand Up @@ -312,12 +343,11 @@ def get_job(

@abstractmethod
def realise_outputs(
self, *, running_workflow_step_id: str, outputs: list[tuple[str, str]]
self, *, running_workflow_step_id: str, outputs: list[str]
) -> tuple[dict[str, Any], int]:
"""Copy (link) the step's files as outputs into the Project directory,
while also renaming the file. A step ID is provided, along with a list of
outputs (files in the instance directory) and the required counterpart file
in the Project directory."""
"""Copy (link) the step's files as outputs into the Project directory.
A step ID is provided, along with a list of outputs
(files that will be in the step's instance directory)."""
# Should return an empty map or:
# {
# "error": "<error message>",
Expand Down
Loading