@@ -68,39 +68,64 @@ def __init__(
6868 elif os .path .isdir (file_path ):
6969 shutil .rmtree (file_path )
7070
71- def launch (self , launch_parameters : LaunchParameters ) -> LaunchResult :
71+ def launch (self , * , launch_parameters : LaunchParameters ) -> LaunchResult :
7272 assert launch_parameters
7373 assert launch_parameters .project_id == TEST_PROJECT_ID
7474 assert launch_parameters .specification
7575 assert isinstance (launch_parameters .specification , dict )
7676
7777 os .makedirs (EXECUTION_DIRECTORY , exist_ok = True )
7878
79- # We're passed a RunningWorkflowStep ID but a record is expected to have been
80- # created bt the caller, we simply create instance records.
81- response , _ = self ._api_adapter .get_running_workflow_step (
82- running_workflow_step_id = launch_parameters .running_workflow_step_id
83- )
84- # Now simulate the creation of a Task and Instance record
85- response = self ._api_adapter .create_instance (
86- running_workflow_step_id = launch_parameters .running_workflow_step_id
87- )
79+ if launch_parameters .step_replication_number :
80+ assert (
81+ launch_parameters .step_replication_number
82+ <= launch_parameters .total_number_of_replicas
83+ )
84+
85+ # Create an Instance record (and dummy Task ID)
86+ response = self ._api_adapter .create_instance ()
8887 instance_id = response ["id" ]
8988 task_id = "task-00000000-0000-0000-0000-000000000001"
9089
91- # Apply variables to the step's Job command.
90+ # Create a running workflow step
91+ assert launch_parameters .running_workflow_id
92+ assert launch_parameters .step_name
93+ response , _ = self ._api_adapter .create_running_workflow_step (
94+ running_workflow_id = launch_parameters .running_workflow_id ,
95+ step = launch_parameters .step_name ,
96+ instance_id = instance_id ,
97+ replica = launch_parameters .step_replication_number ,
98+ replicas = launch_parameters .total_number_of_replicas ,
99+ )
100+ assert "id" in response
101+ rwfs_id : str = response ["id" ]
102+ # And add the variables we've been provided with
103+ if launch_parameters .variables :
104+ _ = self ._api_adapter .set_running_workflow_step_variables (
105+ running_workflow_step_id = rwfs_id , variables = launch_parameters .variables
106+ )
107+
108+ # Now add the running workflow ID ot the instance record.
109+ self ._api_adapter .set_instance_running_workflow_step_id (
110+ instance_id = instance_id ,
111+ running_workflow_step_id = rwfs_id ,
112+ )
113+
114+ # Get the job defitnion.
115+ # This is expected to exist in the tests/job-definitions directory.
92116 job , _ = self ._api_adapter .get_job (
93117 collection = launch_parameters .specification ["collection" ],
94118 job = launch_parameters .specification ["job" ],
95119 version = "do-not-care" ,
96120 )
97121 assert job
98122
99- # Now apply the variables to the command
123+ # Now apply the provided variables to the command.
124+ # The command may not need any, but we do the decoding anyway.
100125 decoded_command , status = job_decoder .decode (
101126 job ["command" ],
102- launch_parameters .specification_variables ,
103- launch_parameters . running_workflow_step_id ,
127+ launch_parameters .variables ,
128+ rwfs_id ,
104129 TextEncoding .JINJA2_3_0 ,
105130 )
106131 print (f"Decoded command: { decoded_command } " )
@@ -132,6 +157,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
132157 self ._msg_dispatcher .send (pod_message )
133158
134159 return LaunchResult (
160+ running_workflow_step_id = rwfs_id ,
135161 instance_id = instance_id ,
136162 task_id = task_id ,
137163 command = " " .join (subprocess_cmd ),
0 commit comments