1
1
import json
2
2
import os
3
+ import shutil
3
4
import subprocess
4
5
from datetime import datetime , timezone
5
6
from subprocess import CompletedProcess
14
15
from tests .message_dispatcher import UnitTestMessageDispatcher
15
16
from workflow .workflow_abc import InstanceLauncher , LaunchResult
16
17
17
- _JOB_DIRECTORY : str = os .path .join (os .path .dirname (__file__ ), "jobs" )
18
+ # Full path to the 'jobs' directory
19
+ _JOB_PATH : str = os .path .join (os .path .dirname (__file__ ), "jobs" )
20
+ # Relative path to the execution (project) directory
21
+ _EXECUTION_DIRECTORY : str = os .path .join ("tests" , "project-root" , TEST_PROJECT_ID )
22
+
23
+
24
+ def project_file_exists (file_name : str ) -> bool :
25
+ """A convenient test function to verify a file exists
26
+ in the execution (project) directory."""
27
+ return os .path .isfile (os .path .join (_EXECUTION_DIRECTORY , file_name ))
18
28
19
29
20
30
class UnitTestInstanceLauncher (InstanceLauncher ):
@@ -43,6 +53,11 @@ def __init__(
43
53
self ._api_adapter = api_adapter
44
54
self ._msg_dispatcher = msg_dispatcher
45
55
56
+ # Every launcher starts with an empty execution directory...
57
+ print (f"Removing execution directory ({ _EXECUTION_DIRECTORY } )" )
58
+ assert _EXECUTION_DIRECTORY .startswith ("tests/project-root" )
59
+ shutil .rmtree (_EXECUTION_DIRECTORY , ignore_errors = True )
60
+
46
61
def launch (
47
62
self ,
48
63
* ,
@@ -60,6 +75,8 @@ def launch(
60
75
61
76
assert project_id == TEST_PROJECT_ID
62
77
78
+ os .makedirs (_EXECUTION_DIRECTORY , exist_ok = True )
79
+
63
80
# We're passed a RunningWorkflowStep ID but a record is expected to have been
64
81
# created bt the caller, we simply create instance records.
65
82
response = self ._api_adapter .get_running_workflow_step (
@@ -74,10 +91,6 @@ def launch(
74
91
response = self ._api_adapter .create_task (instance_id = instance_id )
75
92
task_id = response ["id" ]
76
93
77
- # Where to run the job (i.e. in the test project directory)
78
- execution_directory = f"tests/project-root/{ project_id } "
79
- os .makedirs (execution_directory , exist_ok = True )
80
-
81
94
# Apply variables to the step's Job command.
82
95
step_specification_map = json .loads (step_specification )
83
96
job = self ._api_adapter .get_job (
@@ -99,15 +112,15 @@ def launch(
99
112
assert status
100
113
101
114
# Now run the decoded command, which will be in the _JOB_DIRECTORY
102
- command = f"{ _JOB_DIRECTORY } /{ decoded_command } "
115
+ command = f"{ _JOB_PATH } /{ decoded_command } "
103
116
command_list = command .split ()
104
117
module = command_list [0 ]
105
118
print (f"Module: { module } " )
106
119
assert os .path .isfile (module )
107
120
subprocess_cmd : List [str ] = ["python" ] + command_list
108
121
print (f"Subprocess command: { subprocess_cmd } " )
109
122
completed_process : CompletedProcess = subprocess .run (
110
- subprocess_cmd , check = False , cwd = execution_directory
123
+ subprocess_cmd , check = False , cwd = _EXECUTION_DIRECTORY
111
124
)
112
125
113
126
# Simulate a PodMessage (that will contain the instance ID),
0 commit comments