@@ -669,10 +669,8 @@ def start(self, **kwargs):
669669 execution = _LocalPipelineExecution (execution_id , self .pipeline , ** kwargs )
670670
671671 self ._executions [execution_id ] = execution
672- logger .info (
673- "Starting execution for pipeline %s. Execution ID is %s" ,
674- self .pipeline .name ,
675- execution_id ,
672+ print (
673+ f"Starting execution for pipeline { self .pipeline .name } . Execution ID is { execution_id } "
676674 )
677675 self .last_modified_time = datetime .datetime .now ().timestamp ()
678676
@@ -690,6 +688,8 @@ def __init__(
690688 PipelineExecutionDescription = None ,
691689 PipelineExecutionDisplayName = None ,
692690 ):
691+ from sagemaker .workflow .pipeline import PipelineGraph
692+
693693 self .pipeline = pipeline
694694 self .pipeline_execution_name = execution_id
695695 self .pipeline_execution_description = PipelineExecutionDescription
@@ -699,7 +699,8 @@ def __init__(
699699 self .creation_time = datetime .datetime .now ().timestamp ()
700700 self .last_modified_time = self .creation_time
701701 self .step_execution = {}
702- self ._initialize_step_execution (self .pipeline .steps )
702+ self .pipeline_dag = PipelineGraph .from_pipeline (self .pipeline )
703+ self ._initialize_step_execution (self .pipeline_dag .step_map .values ())
703704 self .pipeline_parameters = self ._initialize_and_validate_parameters (PipelineParameters )
704705 self ._blocked_steps = {}
705706
@@ -732,37 +733,36 @@ def update_execution_success(self):
732733 """Mark execution as succeeded."""
733734 self .status = _LocalExecutionStatus .SUCCEEDED .value
734735 self .last_modified_time = datetime .datetime .now ().timestamp ()
735- logger . info ( "Pipeline execution %s SUCCEEDED" , self .pipeline_execution_name )
736+ print ( f "Pipeline execution { self .pipeline_execution_name } SUCCEEDED" )
736737
737738 def update_execution_failure (self , step_name , failure_message ):
738739 """Mark execution as failed."""
739740 self .status = _LocalExecutionStatus .FAILED .value
740- self .failure_reason = f"Step { step_name } failed with message: { failure_message } "
741+ self .failure_reason = f"Step ' { step_name } ' failed with message: { failure_message } "
741742 self .last_modified_time = datetime .datetime .now ().timestamp ()
742- logger .info (
743- "Pipeline execution %s FAILED because step %s failed." ,
744- self .pipeline_execution_name ,
745- step_name ,
743+ print (
744+ f"Pipeline execution { self .pipeline_execution_name } FAILED because step "
745+ f"'{ step_name } ' failed."
746746 )
747747
748748 def update_step_properties (self , step_name , step_properties ):
749749 """Update pipeline step execution output properties."""
750750 self .step_execution .get (step_name ).update_step_properties (step_properties )
751- logger . info ( "Pipeline step %s SUCCEEDED." , step_name )
751+ print ( f "Pipeline step ' { step_name } ' SUCCEEDED." )
752752
753753 def update_step_failure (self , step_name , failure_message ):
754754 """Mark step_name as failed."""
755+ print (f"Pipeline step '{ step_name } ' FAILED. Failure message is: { failure_message } " )
755756 self .step_execution .get (step_name ).update_step_failure (failure_message )
756- logger .info ("Pipeline step %s FAILED. Failure message is: %s" , step_name , failure_message )
757757
758758 def mark_step_executing (self , step_name ):
759759 """Update pipelines step's status to EXECUTING and start_time to now."""
760- logger . info ( "Starting pipeline step: %s" , step_name )
760+ print ( f "Starting pipeline step: ' { step_name } '" )
761761 self .step_execution .get (step_name ).mark_step_executing ()
762762
763763 def _initialize_step_execution (self , steps ):
764764 """Initialize step_execution dict."""
765- from sagemaker .workflow .steps import StepTypeEnum
765+ from sagemaker .workflow .steps import StepTypeEnum , Step
766766
767767 supported_steps_types = (
768768 StepTypeEnum .TRAINING ,
@@ -774,16 +774,17 @@ def _initialize_step_execution(self, steps):
774774 )
775775
776776 for step in steps :
777- if step .step_type not in supported_steps_types :
778- error_msg = self ._construct_validation_exception_message (
779- "Step type {} is not supported in local mode." .format (step .step_type .value )
777+ if isinstance (step , Step ):
778+ if step .step_type not in supported_steps_types :
779+ error_msg = self ._construct_validation_exception_message (
780+ "Step type {} is not supported in local mode." .format (step .step_type .value )
781+ )
782+ raise ClientError (error_msg , "start_pipeline_execution" )
783+ self .step_execution [step .name ] = _LocalPipelineExecutionStep (
784+ step .name , step .step_type , step .description , step .display_name
780785 )
781- raise ClientError (error_msg , "start_pipeline_execution" )
782- self .step_execution [step .name ] = _LocalPipelineExecutionStep (
783- step .name , step .step_type , step .description , step .display_name
784- )
785- if step .step_type == StepTypeEnum .CONDITION :
786- self ._initialize_step_execution (step .if_steps + step .else_steps )
786+ if step .step_type == StepTypeEnum .CONDITION :
787+ self ._initialize_step_execution (step .if_steps + step .else_steps )
787788
788789 def _initialize_and_validate_parameters (self , overridden_parameters ):
789790 """Initialize and validate pipeline parameters."""
0 commit comments