@@ -65,6 +65,7 @@ class StepPreparationResponse:
65
65
variables : dict [str , Any ] | None = None
66
66
iteration_variable : str | None = None
67
67
iteration_values : list [str ] | None = None
68
+ error_num : int = 0
68
69
error_msg : str | None = None
69
70
70
71
@@ -306,12 +307,20 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
306
307
sp_resp = self ._prepare_step (
307
308
wf = wf_response , step_definition = next_step , rwf = rwf_response
308
309
)
309
- if sp_resp .iterations == 0 or sp_resp . error_msg :
310
+ if sp_resp .iterations == 0 :
310
311
# Cannot prepare variables for this step,
311
- # we have to leave.
312
+ # it might be a combiner step and some prior steps may still
313
+ # be running ... or something's gone wrong.
314
+ if sp_resp .error_num :
315
+ self ._wapi_adapter .set_running_workflow_done (
316
+ running_workflow_id = r_wfid ,
317
+ success = False ,
318
+ error_num = sp_resp .error_num ,
319
+ error_msg = sp_resp .error_msg ,
320
+ )
312
321
return
313
- assert sp_resp .variables is not None
314
322
323
+ assert sp_resp .variables is not None
315
324
self ._launch (
316
325
rwf = rwf_response ,
317
326
step_definition = next_step ,
@@ -359,17 +368,19 @@ def _get_step_job(self, *, step: dict[str, Any]) -> dict[str, Any]:
359
368
def _prepare_step (
360
369
self ,
361
370
* ,
362
- wf : dict [str , Any ],
363
371
step_definition : dict [str , Any ],
372
+ wf : dict [str , Any ],
364
373
rwf : dict [str , Any ],
365
374
) -> StepPreparationResponse :
366
375
"""Attempts to prepare a map of step variables. If variables cannot be
367
- presented to the step we return an object with 'iterations' set to zero."""
376
+ presented to the step we return an object with 'iterations' set to zero.
377
+ If there's a problem that means we should be able to proceed but cannot,
378
+ we set 'error_num' and 'error_msg'."""
368
379
369
380
step_name : str = step_definition ["name" ]
370
381
rwf_id : str = rwf ["id" ]
371
382
372
- # Before we move on, are we combiner?
383
+ # Before we move on, are we a combiner?
373
384
#
374
385
# We are if a variable in our step's plumbing refers to an input of ours
375
386
# that is of type 'files'. If we are a combiner then we use the name of the
@@ -397,17 +408,24 @@ def _prepare_step(
397
408
break
398
409
if step_name_being_combined :
399
410
break
400
- if step_name_being_combined :
411
+
412
+ if step_is_combiner :
413
+ assert step_name_being_combined
414
+ assert combiner_input_variable
415
+
416
+ # Are all the step instances we're combining done?
417
+
401
418
response , _ = self ._wapi_adapter .get_status_of_all_step_instances_by_name (
402
419
name = step_name_being_combined ,
403
420
running_workflow_id = rwf_id ,
404
421
)
405
- # Assume succes...
406
422
assert "count" in response
407
423
num_step_recplicas_being_combined = response ["count" ]
408
424
assert num_step_recplicas_being_combined > 0
409
425
assert "status" in response
410
426
427
+ # Assume they're all done
428
+ # and undo our assumption if not...
411
429
all_step_instances_done : bool = True
412
430
all_step_instances_successful : bool = True
413
431
for status in response ["status" ]:
@@ -418,7 +436,7 @@ def _prepare_step(
418
436
all_step_instances_successful = False
419
437
break
420
438
if not all_step_instances_done :
421
- # Can't move on - but other steps need to finish.
439
+ # Can't move on - other steps need to finish.
422
440
_LOGGER .debug (
423
441
"Assessing start of combiner step (%s)"
424
442
" but not all steps (%s) to be combined are done" ,
@@ -428,15 +446,16 @@ def _prepare_step(
428
446
return StepPreparationResponse (iterations = 0 )
429
447
elif not all_step_instances_successful :
430
448
# Can't move on - all prior steps are done,
431
- # but at least one was in error .
432
- _LOGGER .debug (
449
+ # but at least one was not successful .
450
+ _LOGGER .warning (
433
451
"Assessing start of combiner step (%s)"
434
452
" but at least one step (%s) to be combined failed" ,
435
453
step_name ,
436
454
step_name_being_combined ,
437
455
)
438
456
return StepPreparationResponse (
439
457
iterations = 0 ,
458
+ error_num = 1 ,
440
459
error_msg = f"Prior instance of step '{ step_name_being_combined } ' has failed" ,
441
460
)
442
461
@@ -448,11 +467,11 @@ def _prepare_step(
448
467
variables : dict [str , Any ] = step_definition ["specification" ].get (
449
468
"variables" , {}
450
469
)
451
-
452
- # All the running workflow variables
470
+ # ...and the running workflow variables
453
471
rwf_variables : dict [str , Any ] = rwf .get ("variables" , {})
454
472
455
- # Process the step's plumbing realting to workflow variables.
473
+ # Process the step's "plumbing" relating to workflow variables.
474
+ #
456
475
# This will be a list of Connectors of "in" and "out" variable names.
457
476
# "in" variables are worklfow variables, and "out" variables
458
477
# are expected Job variables. We use this to add variables
@@ -463,13 +482,12 @@ def _prepare_step(
463
482
assert connector .in_ in rwf_variables
464
483
variables [connector .out ] = rwf_variables [connector .in_ ]
465
484
466
- # Now we apply variables from the "plumbing" block
467
- # related to values used in prior steps. The decoder gives
468
- # us a map indexed by prior step name that's a list of "in" "out"
469
- # tuples as above.
485
+ # Now process variables (from the "plumbing" block)
486
+ # that relate to values used in prior steps.
470
487
#
471
- # If this is a combiner step remember that we need to inspect
472
- # variables from all the prior steps.
488
+ # The decoder gives us a map indexed by prior step name that's a list of
489
+ # "in" "out" connectors as above. If this is a combiner step remember
490
+ # that we need to inspect variables from all the prior steps.
473
491
prior_step_plumbing : dict [str , list [Connector ]] = get_step_prior_step_plumbing (
474
492
step_definition = step_definition
475
493
)
@@ -486,6 +504,8 @@ def _prepare_step(
486
504
)
487
505
)
488
506
# Copy "in" value to "out"...
507
+ # accumulating thiose for the 'combining' variable,
508
+ # which will be set as a list when we're done.
489
509
for connector in connections :
490
510
assert connector .in_ in prior_step ["variables" ]
491
511
if connector .out == combiner_input_variable :
@@ -508,16 +528,17 @@ def _prepare_step(
508
528
assert connector .in_ in prior_step ["variables" ]
509
529
variables [connector .out ] = prior_step ["variables" ][connector .in_ ]
510
530
511
- # Now ... can the command be compiled!?
531
+ # All variables are set ...
532
+ # is this enough to satisfy the setp's Job command?
533
+
512
534
job : dict [str , Any ] = self ._get_step_job (step = step_definition )
513
535
message , success = job_defintion_decoder .decode (
514
536
job ["command" ], variables , "command" , TextEncoding .JINJA2_3_0
515
537
)
516
538
if not success :
517
- msg = f"Failed command validation error_msg={ message } "
539
+ msg = f"Failed command validation for step { step_name } error_msg={ message } "
518
540
_LOGGER .warning (msg )
519
- self ._set_step_error (step_name , rwf_id , None , 1 , msg )
520
- return StepPreparationResponse (iterations = 0 )
541
+ return StepPreparationResponse (iterations = 0 , error_num = 2 , error_msg = msg )
521
542
522
543
# Do we replicate this step (run it more than once)?
523
544
#
0 commit comments