Skip to content

Commit 243c3ae

Browse files
aoguo64Ao Guo
authored andcommitted
feature: local mode executor implementation
Co-authored-by: Ao Guo <[email protected]>
1 parent 87b4058 commit 243c3ae

File tree

6 files changed

+1211
-47
lines changed

6 files changed

+1211
-47
lines changed

src/sagemaker/local/entities.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,10 @@ def __init__(
687687
self.status = _LocalExecutionStatus.EXECUTING.value
688688
self.failure_reason = None
689689
self.creation_time = datetime.datetime.now()
690-
self.step_execution = self._initialize_step_execution()
690+
self.step_execution = {}
691+
self._initialize_step_execution(self.pipeline.steps)
691692
self.pipeline_parameters = self._initialize_and_validate_parameters(PipelineParameters)
693+
self.blockout_steps = {}
692694

693695
def describe(self):
694696
"""Placeholder docstring"""
@@ -709,21 +711,29 @@ def list_steps(self):
709711
"""Placeholder docstring"""
710712
# TODO
711713

714+
def update_execution_success(self):
715+
"""Mark execution as succeeded."""
716+
self.status = _LocalExecutionStatus.SUCCEEDED.value
717+
712718
def update_execution_failure(self, step_name, failure_message):
713719
"""Mark execution as failed."""
714720
self.status = _LocalExecutionStatus.FAILED.value
715721
self.failure_reason = f"Step {step_name} failed with message: {failure_message}"
716722
logger.error("Pipeline execution failed because step %s failed.", step_name)
717723

724+
def update_step_properties(self, step_name, step_properties):
725+
"""Update pipeline step execution output properties."""
726+
self.step_execution.get(step_name).update_step_properties(step_properties)
727+
718728
def update_step_failure(self, step_name, failure_message):
719729
"""Mark step_name as failed."""
720730
self.step_execution.get(step_name).update_step_failure(failure_message)
721731

722-
def mark_step_starting(self, step_name):
732+
def mark_step_executing(self, step_name):
723733
"""Update step's status to EXECUTING"""
724-
self.step_execution.get(step_name).status = _LocalExecutionStatus.EXECUTING
734+
self.step_execution.get(step_name).status = _LocalExecutionStatus.EXECUTING.value
725735

726-
def _initialize_step_execution(self):
736+
def _initialize_step_execution(self, steps):
727737
"""Initialize step_execution dict."""
728738
from sagemaker.workflow.steps import StepTypeEnum
729739

@@ -735,15 +745,15 @@ def _initialize_step_execution(self):
735745
StepTypeEnum.FAIL,
736746
)
737747

738-
step_execution = {}
739-
for step in self.pipeline.steps:
748+
for step in steps:
740749
if step.step_type not in supported_steps_types:
741750
error_msg = self._construct_validation_exception_message(
742751
"Step type {} is not supported in local mode.".format(step.step_type.value)
743752
)
744753
raise ClientError(error_msg, "start_pipeline_execution")
745-
step_execution[step.name] = _LocalPipelineStepExecution(step.name, step.step_type)
746-
return step_execution
754+
self.step_execution[step.name] = _LocalPipelineStepExecution(step.name, step.step_type)
755+
if step.step_type == StepTypeEnum.CONDITION:
756+
self._initialize_step_execution(step.if_steps + step.else_steps)
747757

748758
def _initialize_and_validate_parameters(self, overridden_parameters):
749759
"""Initialize and validate pipeline parameters."""
@@ -794,7 +804,7 @@ def __init__(
794804
):
795805
self.step_name = step_name
796806
self.step_type = step_type
797-
self.status = status or _LocalExecutionStatus.STARTING
807+
self.status = status or _LocalExecutionStatus.STARTING.value
798808
self.failure_reason = failure_reason
799809
self.properties = properties or {}
800810
self.creation_time = datetime.datetime.now()

0 commit comments

Comments
 (0)