@@ -125,6 +125,15 @@ def __init__(
125125 self .sagemaker_session .boto_session .client ("scheduler" ),
126126 )
127127
128+ @property
129+ def latest_pipeline_version_id (self ):
130+ """Retrieves the latest version id of this pipeline"""
131+ summaries = self .list_pipeline_versions (max_results = 1 )["PipelineVersionSummaries" ]
132+ if not summaries :
133+ return None
134+ else :
135+ return summaries [0 ].get ("PipelineVersionId" )
136+
128137 def create (
129138 self ,
130139 role_arn : str = None ,
@@ -166,7 +175,8 @@ def create(
166175 kwargs ,
167176 Tags = tags ,
168177 )
169- return self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
178+ response = self .sagemaker_session .sagemaker_client .create_pipeline (** kwargs )
179+ return response
170180
171181 def _create_args (
172182 self , role_arn : str , description : str , parallelism_config : ParallelismConfiguration
@@ -214,15 +224,21 @@ def _create_args(
214224 )
215225 return kwargs
216226
217- def describe (self ) -> Dict [str , Any ]:
227+ def describe (self , pipeline_version_id : int = None ) -> Dict [str , Any ]:
218228 """Describes a Pipeline in the Workflow service.
219229
230+ Args:
231+ pipeline_version_id (Optional[str]): version ID of the pipeline to describe.
232+
220233 Returns:
221234 Response dict from the service. See `boto3 client documentation
222235 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/\
223236 sagemaker.html#SageMaker.Client.describe_pipeline>`_
224237 """
225- return self .sagemaker_session .sagemaker_client .describe_pipeline (PipelineName = self .name )
238+ kwargs = dict (PipelineName = self .name )
239+ if pipeline_version_id :
240+ kwargs ["PipelineVersionId" ] = pipeline_version_id
241+ return self .sagemaker_session .sagemaker_client .describe_pipeline (** kwargs )
226242
227243 def update (
228244 self ,
@@ -257,7 +273,8 @@ def update(
257273 return self .sagemaker_session .sagemaker_client .update_pipeline (self , description )
258274
259275 kwargs = self ._create_args (role_arn , description , parallelism_config )
260- return self .sagemaker_session .sagemaker_client .update_pipeline (** kwargs )
276+ response = self .sagemaker_session .sagemaker_client .update_pipeline (** kwargs )
277+ return response
261278
262279 def upsert (
263280 self ,
@@ -332,6 +349,7 @@ def start(
332349 execution_description : str = None ,
333350 parallelism_config : ParallelismConfiguration = None ,
334351 selective_execution_config : SelectiveExecutionConfig = None ,
352+ pipeline_version_id : int = None ,
335353 ):
336354 """Starts a Pipeline execution in the Workflow service.
337355
@@ -345,6 +363,8 @@ def start(
345363 over the parallelism configuration of the parent pipeline.
346364 selective_execution_config (Optional[SelectiveExecutionConfig]): The configuration for
347365 selective step execution.
366+ pipeline_version_id (Optional[str]): version ID of the pipeline to start the execution from. If not
367+ specified, uses the latest version ID.
348368
349369 Returns:
350370 A `_PipelineExecution` instance, if successful.
@@ -366,6 +386,7 @@ def start(
366386 PipelineExecutionDisplayName = execution_display_name ,
367387 ParallelismConfiguration = parallelism_config ,
368388 SelectiveExecutionConfig = selective_execution_config ,
389+ PipelineVersionId = pipeline_version_id ,
369390 )
370391 if self .sagemaker_session .local_mode :
371392 update_args (kwargs , PipelineParameters = parameters )
@@ -461,6 +482,32 @@ def list_executions(
461482 if key in response
462483 }
463484
485+ def list_pipeline_versions (
486+ self , sort_order : str = None , max_results : int = None , next_token : str = None
487+ ) -> str :
488+ """Lists a pipeline's versions.
489+
490+ Args:
491+ sort_order (str): The sort order for results (Ascending/Descending).
492+ max_results (int): The maximum number of pipeline executions to return in the response.
493+ next_token (str): If the result of the previous `ListPipelineExecutions` request was
494+ truncated, the response includes a `NextToken`. To retrieve the next set of pipeline
495+ executions, use the token in the next request.
496+
497+ Returns:
498+ List of Pipeline Version Summaries. See
499+ boto3 client list_pipeline_versions
500+ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/list_pipeline_versions.html#
501+ """
502+ kwargs = dict (PipelineName = self .name )
503+ update_args (
504+ kwargs ,
505+ SortOrder = sort_order ,
506+ NextToken = next_token ,
507+ MaxResults = max_results ,
508+ )
509+ return self .sagemaker_session .sagemaker_client .list_pipeline_versions (** kwargs )
510+
464511 def _get_latest_execution_arn (self ):
465512 """Retrieves the latest execution of this pipeline"""
466513 response = self .list_executions (
@@ -855,7 +902,7 @@ def describe(self):
855902 sagemaker.html#SageMaker.Client.describe_pipeline_execution>`_.
856903 """
857904 return self .sagemaker_session .sagemaker_client .describe_pipeline_execution (
858- PipelineExecutionArn = self .arn ,
905+ PipelineExecutionArn = self .arn
859906 )
860907
861908 def list_steps (self ):
0 commit comments