26
26
import sys
27
27
from typing import Any , Optional
28
28
29
- from decoder .decoder import TextEncoding , decode
29
+ import decoder .decoder as job_defintion_decoder
30
+ from decoder .decoder import TextEncoding
30
31
from google .protobuf .message import Message
31
32
from informaticsmatters .protobuf .datamanager .pod_message_pb2 import PodMessage
32
33
from informaticsmatters .protobuf .datamanager .workflow_message_pb2 import WorkflowMessage
40
41
41
42
from .decoder import (
42
43
Translation ,
44
+ get_step ,
43
45
get_step_prior_step_variable_mapping ,
44
46
get_step_workflow_variable_mapping ,
45
47
)
@@ -127,7 +129,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
127
129
# Launch it.
128
130
# If there's a launch problem the step (and running workflow) will have
129
131
# and error, stopping it. There will be no Pod event as the launch has failed.
130
- self ._launch (rwf = rwf_response , step = first_step )
132
+ self ._launch (wf = wf_response , rwf = rwf_response , step = first_step )
131
133
132
134
def _handle_workflow_stop_message (self , r_wfid : str ) -> None :
133
135
"""Logic to handle a STOP message."""
@@ -263,7 +265,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
263
265
# There's another step!
264
266
# For this simple logic it is the next step.
265
267
next_step = wf_response ["steps" ][step_index + 1 ]
266
- self ._launch (rwf = rwf_response , step = next_step )
268
+ self ._launch (wf = wf_response , rwf = rwf_response , step = next_step )
267
269
268
270
# Something was started (or there was a launch error and the step
269
271
# and running workflow error will have been set).
@@ -278,28 +280,21 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
278
280
success = True ,
279
281
)
280
282
281
- def _validate_step_command (
282
- self ,
283
- * ,
284
- running_workflow_id : str ,
285
- step : dict [str , Any ],
286
- running_workflow_variables : dict [str , Any ],
287
- ) -> str | dict [str , Any ]:
288
- """Returns an error message if the command isn't valid.
289
- Without a message we return all the variables that were (successfully)
290
- applied to the command."""
291
-
283
+ def _get_step_job (self , * , step : dict [str , Any ]) -> dict [str , Any ]:
284
+ """Gets the Job definition for a given Step."""
292
285
# We get the Job from the step specification, which must contain
293
286
# the keys "collection", "job", and "version". Here we assume that
294
287
# the workflow definition has passed the RUN-level validation
295
288
# which means we can get these values.
289
+ assert "specification" in step
296
290
step_spec : dict [str , Any ] = step ["specification" ]
297
291
job_collection : str = step_spec ["collection" ]
298
292
job_job : str = step_spec ["job" ]
299
293
job_version : str = step_spec ["version" ]
300
294
job , _ = self ._wapi_adapter .get_job (
301
295
collection = job_collection , job = job_job , version = job_version
302
296
)
297
+
303
298
_LOGGER .debug (
304
299
"API.get_job(%s, %s, %s) returned: -\n %s" ,
305
300
job_collection ,
@@ -308,6 +303,19 @@ def _validate_step_command(
308
303
str (job ),
309
304
)
310
305
306
+ return job
307
+
308
+ def _validate_step_command (
309
+ self ,
310
+ * ,
311
+ running_workflow_id : str ,
312
+ step : dict [str , Any ],
313
+ running_workflow_variables : dict [str , Any ],
314
+ ) -> str | dict [str , Any ]:
315
+ """Returns an error message if the command isn't valid.
316
+ Without a message we return all the variables that were (successfully)
317
+ applied to the command."""
318
+
311
319
# Start with any variables provided in the step's specification.
312
320
# This will be ou t"all variables" map for this step,
313
321
# whcih we will add to (and maybe even over-write)...
@@ -345,12 +353,15 @@ def _validate_step_command(
345
353
all_variables [tr .out ] = prior_step ["variables" ][tr .in_ ]
346
354
347
355
# Now ... can the command be compiled!?
348
- message , success = decode (
356
+ job : dict [str , Any ] = self ._get_step_job (step = step )
357
+ message , success = job_defintion_decoder .decode (
349
358
job ["command" ], all_variables , "command" , TextEncoding .JINJA2_3_0
350
359
)
351
360
return all_variables if success else message
352
361
353
- def _launch (self , * , rwf : dict [str , Any ], step : dict [str , Any ]) -> None :
362
+ def _launch (
363
+ self , * , wf : dict [str , Any ], rwf : dict [str , Any ], step : dict [str , Any ]
364
+ ) -> None :
354
365
step_name : str = step ["name" ]
355
366
rwf_id : str = rwf ["id" ]
356
367
project_id = rwf ["project" ]["id" ]
@@ -380,17 +391,53 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
380
391
# A step replication number,
381
392
# used only for steps expected to run in parallel (even if just once)
382
393
step_replication_number : int = 0
394
+ # Do we replicate this step (run it more than once)?
395
+ # We do if a variable in this step's mapping block
396
+ # refers to an output of a prior step whose type is 'files'.
397
+ # If the prior step is a 'splitter' we populate the 'replication_values' array
398
+ # with the list of files the prior step genrated for its output.
383
399
replication_values : list [str ] = []
384
- source_is_splitter : bool = False
385
400
iter_variable : str | None = None
401
+ tr_map : dict [str , list [Translation ]] = get_step_prior_step_variable_mapping (
402
+ step = step
403
+ )
404
+ for p_step_name , tr_list in tr_map .items ():
405
+ # We need to get the Job definition for each step
406
+ # and then check whether the (ouptu) variable is of type 'files'...
407
+ wf_step : dict [str , Any ] = get_step (wf , p_step_name )
408
+ assert wf_step
409
+ job_definition : dict [str , Any ] = self ._get_step_job (step = wf_step )
410
+ jd_outputs : dict [str , Any ] = job_defintion_decoder .get_outputs (
411
+ job_definition
412
+ )
413
+ for tr in tr_list :
414
+ if jd_outputs .get (tr .in_ , {}).get ("type" ) == "files" :
415
+ iter_variable = tr .out
416
+ # Get the prior running step's output values
417
+ response , _ = self ._wapi_adapter .get_running_workflow_step_by_name (
418
+ name = p_step_name ,
419
+ running_workflow_id = rwf_id ,
420
+ )
421
+ rwfs_id = response ["id" ]
422
+ assert rwfs_id
423
+ result , _ = (
424
+ self ._wapi_adapter .get_running_workflow_step_output_values_for_output (
425
+ running_workflow_step_id = rwfs_id ,
426
+ output_variable = tr .in_ ,
427
+ )
428
+ )
429
+ replication_values = result ["output" ].copy ()
430
+ break
431
+ # Stop if we've got an iteration variable
432
+ if iter_variable :
433
+ break
386
434
387
435
num_step_instances : int = max (1 , len (replication_values ))
388
436
for iteration in range (num_step_instances ):
389
437
390
438
# If we are replicating this step then we must replace the step's variable
391
439
# with a value expected for this iteration.
392
- if source_is_splitter :
393
- assert iter_variable
440
+ if iter_variable :
394
441
iter_value : str = replication_values [iteration ]
395
442
_LOGGER .info (
396
443
"Replicating step: %s iteration=%s variable=%s value=%s" ,
@@ -427,7 +474,7 @@ def _launch(self, *, rwf: dict[str, Any], step: dict[str, Any]) -> None:
427
474
step_replication_number = step_replication_number ,
428
475
)
429
476
lr : LaunchResult = self ._instance_launcher .launch (launch_parameters = lp )
430
- rwfs_id : str | None = lr .running_workflow_step_id
477
+ rwfs_id = lr .running_workflow_step_id
431
478
assert rwfs_id
432
479
433
480
if lr .error_num :
0 commit comments