2424
2525import pandas as pd
2626
27+ from sagemaker .utils import retry_with_backoff
2728from tests .integ .sagemaker .workflow .helpers import wait_pipeline_execution
2829from tests .integ .s3_utils import extract_files_from_s3
2930from sagemaker .workflow .model_step import (
@@ -1002,7 +1003,7 @@ def test_create_and_update_with_parallelism_config(
10021003 assert response ["ParallelismConfiguration" ]["MaxParallelExecutionSteps" ] == 50
10031004
10041005 pipeline .parameters = [ParameterInteger (name = "InstanceCount" , default_value = 1 )]
1005- response = pipeline .update (role , parallelism_config = {"MaxParallelExecutionSteps" : 55 })
1006+ response = pipeline .upsert (role , parallelism_config = {"MaxParallelExecutionSteps" : 55 })
10061007 update_arn = response ["PipelineArn" ]
10071008 assert re .match (
10081009 rf"arn:aws:sagemaker:{ region_name } :\d{{12}}:pipeline/{ pipeline_name } " ,
@@ -1019,6 +1020,99 @@ def test_create_and_update_with_parallelism_config(
10191020 pass
10201021
10211022
1023+ def test_create_and_start_without_parallelism_config_override (
1024+ pipeline_session , role , pipeline_name , script_dir
1025+ ):
1026+ sklearn_train = SKLearn (
1027+ framework_version = "0.20.0" ,
1028+ entry_point = os .path .join (script_dir , "train.py" ),
1029+ instance_type = "ml.m5.xlarge" ,
1030+ sagemaker_session = pipeline_session ,
1031+ role = role ,
1032+ )
1033+
1034+ train_steps = [
1035+ TrainingStep (
1036+ name = f"my-train-{ count } " ,
1037+ display_name = "TrainingStep" ,
1038+ description = "description for Training step" ,
1039+ step_args = sklearn_train .fit (),
1040+ )
1041+ for count in range (2 )
1042+ ]
1043+ pipeline = Pipeline (
1044+ name = pipeline_name ,
1045+ steps = train_steps ,
1046+ sagemaker_session = pipeline_session ,
1047+ )
1048+
1049+ try :
1050+ pipeline .create (role , parallelism_config = dict (MaxParallelExecutionSteps = 1 ))
1051+ # No ParallelismConfiguration given in pipeline.start, so it won't override that in pipeline.create
1052+ execution = pipeline .start ()
1053+
1054+ def validate ():
1055+ # Only one step would be scheduled initially
1056+ assert len (execution .list_steps ()) == 1
1057+
1058+ retry_with_backoff (validate , num_attempts = 4 )
1059+
1060+ wait_pipeline_execution (execution = execution )
1061+
1062+ finally :
1063+ try :
1064+ pipeline .delete ()
1065+ except Exception :
1066+ pass
1067+
1068+
1069+ def test_create_and_start_with_parallelism_config_override (
1070+ pipeline_session , role , pipeline_name , script_dir
1071+ ):
1072+ sklearn_train = SKLearn (
1073+ framework_version = "0.20.0" ,
1074+ entry_point = os .path .join (script_dir , "train.py" ),
1075+ instance_type = "ml.m5.xlarge" ,
1076+ sagemaker_session = pipeline_session ,
1077+ role = role ,
1078+ )
1079+
1080+ train_steps = [
1081+ TrainingStep (
1082+ name = f"my-train-{ count } " ,
1083+ display_name = "TrainingStep" ,
1084+ description = "description for Training step" ,
1085+ step_args = sklearn_train .fit (),
1086+ )
1087+ for count in range (2 )
1088+ ]
1089+ pipeline = Pipeline (
1090+ name = pipeline_name ,
1091+ steps = train_steps ,
1092+ sagemaker_session = pipeline_session ,
1093+ )
1094+
1095+ try :
1096+ pipeline .create (role , parallelism_config = dict (MaxParallelExecutionSteps = 1 ))
1097+ # Override ParallelismConfiguration in pipeline.start
1098+ execution = pipeline .start (parallelism_config = dict (MaxParallelExecutionSteps = 2 ))
1099+
1100+ def validate ():
1101+ assert len (execution .list_steps ()) == 2
1102+ for step in execution .list_steps ():
1103+ assert step ["StepStatus" ] == "Executing"
1104+
1105+ retry_with_backoff (validate , num_attempts = 4 )
1106+
1107+ wait_pipeline_execution (execution = execution )
1108+
1109+ finally :
1110+ try :
1111+ pipeline .delete ()
1112+ except Exception :
1113+ pass
1114+
1115+
10221116def test_model_registration_with_tuning_model (
10231117 pipeline_session ,
10241118 role ,
0 commit comments