@@ -350,38 +350,46 @@ def _validate_step_command(
350
350
# Now we have to inspect the workflow step 'inputs' (and 'options')
351
351
# and see if there are further variables that need constructing
352
352
# and then adding (merging) into the 'all_variables' dictionary.
353
- #
354
- # TBD
355
353
356
354
wf_step_data , _ = self ._wapi_adapter .get_workflow_steps_driving_this_step (
357
355
running_workflow_step_id = running_workflow_step_id ,
358
356
)
359
357
360
- wf_steps = wf_step_data .get ("steps" , [])
361
- try :
362
- previous_step = wf_steps [wf_step_data ["caller_step_index" ] - 1 ]
363
- except IndexError :
364
- previous_step = {}
365
-
358
+ # We must always process the current step's variables
359
+ _LOGGER .debug ("Validating step %s" , step )
366
360
inputs = step .get ("inputs" , [])
367
361
outputs = step .get ("outputs" , [])
368
- previous_step_outputs = previous_step .get ("outputs" , [])
369
-
362
+ previous_step_outputs = []
363
+ our_index : int = wf_step_data ["caller_step_index" ]
364
+ assert our_index >= 0
365
+ _LOGGER .debug ("We are at workflow step index %d" , our_index )
366
+
367
+ if our_index > 0 :
368
+ previous_step = wf_step_data ["steps" ][our_index - 1 ]
369
+ previous_step_outputs = previous_step .get ("outputs" , [])
370
+
371
+ _LOGGER .debug ("Index %s workflow_variables=%s" , our_index , all_variables )
372
+ _LOGGER .debug ("Index %s inputs=%s" , our_index , inputs )
373
+ _LOGGER .debug ("Index %s outputs=%s" , our_index , outputs )
374
+ _LOGGER .debug (
375
+ "Index %s previous_step_outputs=%s" , our_index , previous_step_outputs
376
+ )
370
377
step_vars = self ._set_step_variables (
378
+ workflow_variables = all_variables ,
371
379
inputs = inputs ,
372
380
outputs = outputs ,
373
381
previous_step_outputs = previous_step_outputs ,
374
- workflow_variables = all_variables ,
375
382
)
376
-
377
383
all_variables |= step_vars
378
- print ( " all_variables" , all_variables )
384
+ _LOGGER . debug ( "Index %s all_variables=%s " , our_index , previous_step_outputs )
379
385
386
+ # Set the variables for this step (so they can be inspected on error)
380
387
self ._wapi_adapter .set_running_workflow_step_variables (
381
388
running_workflow_step_id = running_workflow_step_id ,
382
389
variables = all_variables ,
383
390
)
384
391
392
+ # Now ... can the command be compiled!?
385
393
message , success = decode (
386
394
job ["command" ], all_variables , "command" , TextEncoding .JINJA2_3_0
387
395
)
@@ -413,17 +421,16 @@ def _launch(
413
421
)
414
422
if isinstance (error_or_variables , str ):
415
423
error_msg = error_or_variables
416
- _LOGGER .warning (
417
- "First step '%s' failed command validation (%s)" , step_name , error_msg
418
- )
419
- self ._set_step_error (step_name , rwf_id , rwfs_id , 1 , error_msg )
424
+ msg = f"Failed command validation error_msg={ error_msg } "
425
+ _LOGGER .warning (msg )
426
+ self ._set_step_error (step_name , rwf_id , rwfs_id , 1 , msg )
420
427
return
421
428
422
429
project_id = rwf ["project" ]["id" ]
423
430
variables : dict [str , Any ] = error_or_variables
424
431
425
432
_LOGGER .info (
426
- "Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
433
+ "Launching step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
427
434
" (name=%s project=%s, variables=%s)" ,
428
435
rwf_id ,
429
436
rwfs_id ,
@@ -455,33 +462,35 @@ def _set_step_error(
455
462
step_name : str ,
456
463
r_wfid : str ,
457
464
r_wfsid : str ,
458
- error : Optional [int ],
465
+ error_num : Optional [int ],
459
466
error_msg : Optional [str ],
460
467
) -> None :
461
468
"""Set the error state for a running workflow step (and the running workflow).
462
469
Calling this method essentially 'ends' the running workflow."""
463
470
_LOGGER .warning (
464
- "Failed to launch step '%s' (error =%d error_msg=%s)" ,
471
+ "Failed to launch step '%s' (error_num =%d error_msg=%s)" ,
465
472
step_name ,
466
- error ,
473
+ error_num ,
467
474
error_msg ,
468
475
)
476
+ r_wf_error : str = f"Step '{ step_name } ' ERROR({ error_num } ): { error_msg } "
469
477
self ._wapi_adapter .set_running_workflow_step_done (
470
478
running_workflow_step_id = r_wfsid ,
471
479
success = False ,
472
- error_num = error ,
473
- error_msg = error_msg ,
480
+ error_num = error_num ,
481
+ error_msg = r_wf_error ,
474
482
)
475
483
# We must also set the running workflow as done (failed)
476
484
self ._wapi_adapter .set_running_workflow_done (
477
485
running_workflow_id = r_wfid ,
478
486
success = False ,
479
- error_num = error ,
480
- error_msg = error_msg ,
487
+ error_num = error_num ,
488
+ error_msg = r_wf_error ,
481
489
)
482
490
483
491
def _set_step_variables (
484
492
self ,
493
+ * ,
485
494
inputs : list [dict [str , Any ]],
486
495
outputs : list [dict [str , Any ]],
487
496
previous_step_outputs : list [dict [str , Any ]],
0 commit comments