2727from six .moves .urllib .parse import urlparse
2828
2929import sagemaker
30- from sagemaker import git_utils , image_uris , vpc_utils
30+ from sagemaker import git_utils , image_uris , vpc_utils , s3
3131from sagemaker .analytics import TrainingJobAnalytics
3232from sagemaker .config import (
3333 TRAINING_JOB_VOLUME_KMS_KEY_ID_PATH ,
@@ -672,6 +672,9 @@ def __init__(
672672 enable_network_isolation = self ._enable_network_isolation ,
673673 )
674674
675+ # Internal flag
676+ self ._is_output_path_set_from_default_bucket_and_prefix = False
677+
675678 @abstractmethod
676679 def training_image_uri (self ):
677680 """Return the Docker image to use for training.
@@ -772,7 +775,13 @@ def _prepare_for_training(self, job_name=None):
772775 if self .sagemaker_session .local_mode and local_code :
773776 self .output_path = ""
774777 else :
775- self .output_path = "s3://{}/" .format (self .sagemaker_session .default_bucket ())
778+ self .output_path = s3 .s3_path_join (
779+ "s3://" ,
780+ self .sagemaker_session .default_bucket (),
781+ self .sagemaker_session .default_bucket_prefix ,
782+ with_end_slash = True ,
783+ )
784+ self ._is_output_path_set_from_default_bucket_and_prefix = True
776785
777786 if self .git_config :
778787 updated_paths = git_utils .git_clone_repo (
@@ -847,7 +856,8 @@ def _stage_user_code_in_s3(self) -> UploadedCode:
847856 if is_pipeline_variable (self .output_path ):
848857 if self .code_location is None :
849858 code_bucket = self .sagemaker_session .default_bucket ()
850- code_s3_prefix = self ._assign_s3_prefix ()
859+ key_prefix = self .sagemaker_session .default_bucket_prefix
860+ code_s3_prefix = self ._assign_s3_prefix (key_prefix )
851861 kms_key = None
852862 else :
853863 code_bucket , key_prefix = parse_s3_url (self .code_location )
@@ -860,16 +870,30 @@ def _stage_user_code_in_s3(self) -> UploadedCode:
860870 if local_mode :
861871 if self .code_location is None :
862872 code_bucket = self .sagemaker_session .default_bucket ()
863- code_s3_prefix = self ._assign_s3_prefix ()
873+ key_prefix = self .sagemaker_session .default_bucket_prefix
874+ code_s3_prefix = self ._assign_s3_prefix (key_prefix )
864875 kms_key = None
865876 else :
866877 code_bucket , key_prefix = parse_s3_url (self .code_location )
867878 code_s3_prefix = self ._assign_s3_prefix (key_prefix )
868879 kms_key = None
869880 else :
870881 if self .code_location is None :
871- code_bucket , _ = parse_s3_url (self .output_path )
872- code_s3_prefix = self ._assign_s3_prefix ()
882+ code_bucket , possible_key_prefix = parse_s3_url (self .output_path )
883+
884+ if self ._is_output_path_set_from_default_bucket_and_prefix :
885+ # Only include possible_key_prefix if the output_path was created from the
886+ # Session's default bucket and prefix. In that scenario, possible_key_prefix
887+ # will either be "" or Session.default_bucket_prefix.
888+ # Note: We cannot do `if (code_bucket == session.default_bucket() and
889+ # key_prefix == session.default_bucket_prefix)` instead because the user
890+ # could have passed in equivalent values themselves to output_path. And
891+ # including the prefix in that case could result in a potentially backwards
892+ # incompatible behavior change for the end user.
893+ code_s3_prefix = self ._assign_s3_prefix (possible_key_prefix )
894+ else :
895+ code_s3_prefix = self ._assign_s3_prefix ()
896+
873897 kms_key = self .output_kms_key
874898 else :
875899 code_bucket , key_prefix = parse_s3_url (self .code_location )
@@ -905,18 +929,13 @@ def _assign_s3_prefix(self, key_prefix=""):
905929 """
906930 from sagemaker .workflow .utilities import _pipeline_config
907931
908- code_s3_prefix = "/" . join ( filter ( None , [ key_prefix , self ._current_job_name , "source" ]) )
932+ code_s3_prefix = s3 . s3_path_join ( key_prefix , self ._current_job_name , "source" )
909933 if _pipeline_config and _pipeline_config .code_hash :
910- code_s3_prefix = "/" .join (
911- filter (
912- None ,
913- [
914- key_prefix ,
915- _pipeline_config .pipeline_name ,
916- "code" ,
917- _pipeline_config .code_hash ,
918- ],
919- )
934+ code_s3_prefix = s3 .s3_path_join (
935+ key_prefix ,
936+ _pipeline_config .pipeline_name ,
937+ "code" ,
938+ _pipeline_config .code_hash ,
920939 )
921940 return code_s3_prefix
922941
@@ -1060,8 +1079,12 @@ def _set_source_s3_uri(self, rule):
10601079 if "source_s3_uri" in (rule .rule_parameters or {}):
10611080 parse_result = urlparse (rule .rule_parameters ["source_s3_uri" ])
10621081 if parse_result .scheme != "s3" :
1063- desired_s3_uri = os .path .join (
1064- "s3://" , self .sagemaker_session .default_bucket (), rule .name , str (uuid .uuid4 ())
1082+ desired_s3_uri = s3 .s3_path_join (
1083+ "s3://" ,
1084+ self .sagemaker_session .default_bucket (),
1085+ self .sagemaker_session .default_bucket_prefix ,
1086+ rule .name ,
1087+ str (uuid .uuid4 ()),
10651088 )
10661089 s3_uri = S3Uploader .upload (
10671090 local_path = rule .rule_parameters ["source_s3_uri" ],
0 commit comments