1
+ import json
1
2
import os
2
3
import subprocess
3
4
from datetime import datetime , timezone
4
5
from subprocess import CompletedProcess
5
6
from typing import Any , Dict , List
6
7
8
+ from decoder import decoder as job_decoder
9
+ from decoder .decoder import TextEncoding
7
10
from informaticsmatters .protobuf .datamanager .pod_message_pb2 import PodMessage
8
11
9
12
from tests .api_adapter import UnitTestAPIAdapter
@@ -44,14 +47,16 @@ def launch(
44
47
self ,
45
48
* ,
46
49
project_id : str ,
47
- workflow_id : str ,
50
+ running_workflow_id : str ,
48
51
running_workflow_step_id : str ,
49
- workflow_definition : Dict [ str , Any ] ,
50
- step_specification : Dict [str , Any ],
52
+ step_specification : str ,
53
+ variables : Dict [str , Any ],
51
54
) -> LaunchResult :
52
55
assert project_id
53
- assert workflow_id
56
+ assert running_workflow_id
54
57
assert step_specification
58
+ assert isinstance (step_specification , str )
59
+ assert isinstance (variables , dict )
55
60
56
61
assert project_id == TEST_PROJECT_ID
57
62
@@ -73,17 +78,36 @@ def launch(
73
78
execution_directory = f"project-root/{ project_id } "
74
79
os .makedirs (execution_directory , exist_ok = True )
75
80
76
- # Just run the Python module that matched the 'job' in the step specification.
77
- # Don't care about 'version' or 'collection'. It will be relative to the
78
- # execution directory.
79
- job : str = step_specification ["job" ]
80
- job_module = f"{ _JOB_DIRECTORY } /{ job } .py"
81
- assert os .path .isfile (job_module )
82
-
83
- job_cmd : List [str ] = ["python" , job_module ]
84
- print (f"Running job command: { job_module } " )
81
+ # Apply variables to the step's Job command.
82
+ step_specification_map = json .loads (step_specification )
83
+ job = self ._api_adapter .get_job (
84
+ collection = step_specification_map ["collection" ],
85
+ job = step_specification_map ["job" ],
86
+ version = "do-not-care" ,
87
+ )
88
+ assert job
89
+
90
+ # Now apply the variables to the command
91
+ decoded_command , status = job_decoder .decode (
92
+ job ["command" ],
93
+ variables ,
94
+ running_workflow_step_id ,
95
+ TextEncoding .JINJA2_3_0 ,
96
+ )
97
+ print (f"Decoded command: { decoded_command } " )
98
+ print (f"Status: { status } " )
99
+ assert status
100
+
101
+ # Now run the decoded command, which will be in the _JOB_DIRECTORY
102
+ command = f"{ _JOB_DIRECTORY } /{ decoded_command } "
103
+ command_list = command .split ()
104
+ module = command_list [0 ]
105
+ print (f"Module: { module } " )
106
+ assert os .path .isfile (module )
107
+ subprocess_cmd : List [str ] = ["python" ] + command_list
108
+ print (f"Subprocess command: { subprocess_cmd } " )
85
109
completed_process : CompletedProcess = subprocess .run (
86
- job_cmd , check = False , cwd = execution_directory
110
+ subprocess_cmd , check = False , cwd = execution_directory
87
111
)
88
112
89
113
# Simulate a PodMessage (that will contain the instance ID),
@@ -103,5 +127,5 @@ def launch(
103
127
error_msg = None ,
104
128
instance_id = instance_id ,
105
129
task_id = task_id ,
106
- command = " " .join (job_cmd ),
130
+ command = " " .join (subprocess_cmd ),
107
131
)
0 commit comments