Skip to content

Commit 2ca6bfa

Browse files
author
Alan Christie
committed
feat: Work toward use of project directory
1 parent 1479a35 commit 2ca6bfa

File tree

8 files changed

+66
-20
lines changed

8 files changed

+66
-20
lines changed

tests/api_adapter.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,19 @@ def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
9898
item = {"id": wfid, "workflow": value}
9999
return item
100100

101-
def create_running_workflow(self, *, workflow_id: str) -> str:
101+
def create_running_workflow(self, *, workflow_id: str, project_id: str) -> str:
102102
UnitTestAPIAdapter.lock.acquire()
103103
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
104104
running_workflow = Unpickler(pickle_file).load()
105105

106106
next_id: int = len(running_workflow) + 1
107107
running_workflow_id: str = _RUNNING_WORKFLOW_ID_FORMAT.format(id=next_id)
108-
record = {"done": False, "success": False, "workflow": workflow_id}
108+
record = {
109+
"done": False,
110+
"success": False,
111+
"workflow": workflow_id,
112+
"project_id": project_id,
113+
}
109114
running_workflow[running_workflow_id] = record
110115

111116
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:

tests/instance_launcher.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ def launch(
5252
assert workflow_id
5353
assert step_specification
5454

55+
# Project must be `project-00000000-0000-0000-0000-000000000001`
56+
assert project_id == "project-00000000-0000-0000-0000-000000000001"
57+
5558
# We're passed a RunningWorkflowStep ID but a record is expected to have been
5659
# created bt the caller, we simply create instance records.
5760
response = self._api_adapter.get_running_workflow_step(
@@ -66,15 +69,22 @@ def launch(
6669
response = self._api_adapter.create_task(instance_id=instance_id)
6770
task_id = response["id"]
6871

72+
# Where to run the job (i.e. in the project directory)
73+
execution_directory = f"project-root/{project_id}"
74+
os.makedirs(execution_directory, exist_ok=True)
75+
6976
# Just run the Python module that matched the 'job' in the step specification.
70-
# Don't care about 'version' or 'collection'.
77+
# Don't care about 'version' or 'collection'. It will be relative to the
78+
# execution directory.
7179
job: str = step_specification["job"]
7280
job_module = f"{_JOB_DIRECTORY}/{job}.py"
7381
assert os.path.isfile(job_module)
7482

7583
job_cmd: List[str] = ["python", job_module]
76-
print(f"Running job command: {job_cmd}")
77-
completed_process: CompletedProcess = subprocess.run(job_cmd, check=False)
84+
print(f"Running job command: {job_module}")
85+
completed_process: CompletedProcess = subprocess.run(
86+
job_cmd, check=False, cwd=execution_directory
87+
)
7888

7989
# Simulate a PodMessage (that will contain the instance ID),
8090
# filling-in only the fields that are of use to the Engine.

tests/project-root/README.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
This is the directory used as the working directory for any workflow Job executions.
2+
Any and all files generated by Jobs will be placed in a project directory here.
3+
So expect to find a temporary project directory called
4+
`project-00000000-0000-0000-0000-000000000001` with files in it here.

tests/test_test_api_adapter.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def test_get_nop_job():
1717
)
1818

1919
# Assert
20-
assert jd["command"] == "python --version"
20+
assert jd["command"] == "nop.py"
2121

2222

2323
def test_get_unknown_workflow():
@@ -74,7 +74,10 @@ def test_create_running_workflow():
7474
response = utaa.create_workflow(workflow_definition={"name": "blah"})
7575

7676
# Act
77-
response = utaa.create_running_workflow(workflow_id=response["id"])
77+
response = utaa.create_running_workflow(
78+
workflow_id=response["id"],
79+
project_id="project-00000000-0000-0000-0000-000000000001",
80+
)
7881

7982
# Assert
8083
assert response["id"] == "r-workflow-00000000-0000-0000-0000-000000000001"
@@ -85,7 +88,9 @@ def test_get_running_workflow():
8588
utaa = UnitTestAPIAdapter()
8689
response = utaa.create_workflow(workflow_definition={"name": "blah"})
8790
wfid = response["id"]
88-
response = utaa.create_running_workflow(workflow_id=wfid)
91+
response = utaa.create_running_workflow(
92+
workflow_id=wfid, project_id="project-00000000-0000-0000-0000-000000000001"
93+
)
8994
rwfid = response["id"]
9095

9196
# Act
@@ -101,7 +106,10 @@ def test_set_running_workflow_done_when_success():
101106
# Arrange
102107
utaa = UnitTestAPIAdapter()
103108
response = utaa.create_workflow(workflow_definition={"name": "blah"})
104-
response = utaa.create_running_workflow(workflow_id=response["id"])
109+
response = utaa.create_running_workflow(
110+
workflow_id=response["id"],
111+
project_id="project-00000000-0000-0000-0000-000000000001",
112+
)
105113
rwfid = response["id"]
106114

107115
# Act
@@ -117,7 +125,10 @@ def test_set_running_workflow_done_when_failed():
117125
# Arrange
118126
utaa = UnitTestAPIAdapter()
119127
response = utaa.create_workflow(workflow_definition={"name": "blah"})
120-
response = utaa.create_running_workflow(workflow_id=response["id"])
128+
response = utaa.create_running_workflow(
129+
workflow_id=response["id"],
130+
project_id="project-00000000-0000-0000-0000-000000000001",
131+
)
121132
rwfid = response["id"]
122133

123134
# Act
@@ -133,7 +144,10 @@ def test_create_running_workflow_step():
133144
# Arrange
134145
utaa = UnitTestAPIAdapter()
135146
response = utaa.create_workflow(workflow_definition={"name": "blah"})
136-
response = utaa.create_running_workflow(workflow_id=response["id"])
147+
response = utaa.create_running_workflow(
148+
workflow_id=response["id"],
149+
project_id="project-00000000-0000-0000-0000-000000000001",
150+
)
137151

138152
# Act
139153
response = utaa.create_running_workflow_step(
@@ -149,7 +163,9 @@ def test_get_running_workflow_step():
149163
utaa = UnitTestAPIAdapter()
150164
response = utaa.create_workflow(workflow_definition={"name": "blah"})
151165
wfid = response["id"]
152-
response = utaa.create_running_workflow(workflow_id=wfid)
166+
response = utaa.create_running_workflow(
167+
workflow_id=wfid, project_id="project-00000000-0000-0000-0000-000000000001"
168+
)
153169
rwfid = response["id"]
154170
response = utaa.create_running_workflow_step(
155171
running_workflow_id=rwfid, step="step-1"
@@ -171,7 +187,9 @@ def test_get_running_workflow_steps():
171187
utaa = UnitTestAPIAdapter()
172188
response = utaa.create_workflow(workflow_definition={"name": "blah"})
173189
wfid = response["id"]
174-
response = utaa.create_running_workflow(workflow_id=wfid)
190+
response = utaa.create_running_workflow(
191+
workflow_id=wfid, project_id="project-00000000-0000-0000-0000-000000000001"
192+
)
175193
rwfid = response["id"]
176194
response = utaa.create_running_workflow_step(
177195
running_workflow_id=rwfid, step="step-1"

tests/test_test_instance_launcher.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ def test_get_nop_job(basic_launcher):
2626
utaa = basic_launcher[0]
2727
launcher = basic_launcher[1]
2828
response = utaa.create_workflow(workflow_definition={"name": "blah"})
29-
response = utaa.create_running_workflow(workflow_id=response["id"])
29+
response = utaa.create_running_workflow(
30+
workflow_id=response["id"],
31+
project_id="project-00000000-0000-0000-0000-000000000001",
32+
)
3033
response = utaa.create_running_workflow_step(
3134
running_workflow_id=response["id"], step="step-1"
3235
)
3336
rwfsid = response["id"]
3437

3538
# Act
3639
result = launcher.launch(
37-
project_id="project-00000000-0000-0000-0000-000000000000",
38-
workflow_id="workflow-00000000-0000-0000-0000-000000000000",
40+
project_id="project-00000000-0000-0000-0000-000000000001",
41+
workflow_id="workflow-00000000-0000-0000-0000-000000000001",
3942
running_workflow_step_id=rwfsid,
4043
workflow_definition={},
4144
step_specification={"job": "nop", "variables": {"x": 1}},

tests/test_workflow_engine_examples.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ def start_workflow(md, da, workflow_file_name) -> str:
7171
assert wfid
7272
print(f"Created workflow definition {wfid}")
7373
# 2.
74-
response = da.create_running_workflow(workflow_id=wfid)
74+
response = da.create_running_workflow(
75+
workflow_id=wfid, project_id="project-00000000-0000-0000-0000-000000000001"
76+
)
7577
r_wfid = response["id"]
7678
assert r_wfid
7779
print(f"Created running workflow {r_wfid}")

workflow/workflow_abc.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def create_running_workflow(
9696
self,
9797
*,
9898
workflow_id: str,
99+
project_id: str,
99100
) -> Dict[str, Any]:
100101
"""Create a RunningWorkflow Record (from a Workflow)"""
101102
# Should return:
@@ -117,7 +118,8 @@ def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
117118
# "running_workflow": {
118119
# "done": False,
119120
# "success": false,
120-
# "workflow": "workflow-000",
121+
# "workflow": {"id": "workflow-000"},
122+
# "project_id": "project-000",
121123
# }
122124
# }
123125

workflow/workflow_engine.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
7777
assert "running_workflow" in response
7878
running_workflow = response["running_workflow"]
7979
_LOGGER.info("RunningWorkflow: %s", running_workflow)
80+
project_id = running_workflow["project_id"]
8081
workflow_id = running_workflow["workflow"]["id"]
8182
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
8283
assert "workflow" in response
@@ -94,7 +95,7 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
9495
step = workflow["steps"][0]
9596
step_specification: Dict[str, Any] = ast.literal_eval(step["specification"])
9697
self._instance_launcher.launch(
97-
project_id="project-000",
98+
project_id=project_id,
9899
workflow_id=workflow_id,
99100
running_workflow_step_id=running_workflow_step_id,
100101
workflow_definition=workflow,
@@ -162,6 +163,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
162163
response = self._api_adapter.get_running_workflow(
163164
running_workflow_id=running_workflow_id
164165
)
166+
project_id = response["running_workflow"]["project_id"]
165167
workflow_id = response["running_workflow"]["workflow"]["id"]
166168
assert workflow_id
167169
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
@@ -189,7 +191,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
189191
next_step["specification"]
190192
)
191193
self._instance_launcher.launch(
192-
project_id="project-000",
194+
project_id=project_id,
193195
workflow_id=workflow_id,
194196
running_workflow_step_id=running_workflow_step_id,
195197
workflow_definition=workflow,

0 commit comments

Comments
 (0)