diff --git a/pyproject.toml b/pyproject.toml index c6508f54ad..17dfab3571 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,8 @@ dependencies = [ "tblib>=1.7.0,<4", "tqdm", "urllib3>=1.26.8,<3.0.0", - "uvicorn" + "uvicorn", + "graphene>=3,<4" ] [project.scripts] diff --git a/src/sagemaker/modules/configs.py b/src/sagemaker/modules/configs.py index ac54e2ad0b..3739c73c5d 100644 --- a/src/sagemaker/modules/configs.py +++ b/src/sagemaker/modules/configs.py @@ -30,7 +30,6 @@ from sagemaker_core.shapes import ( StoppingCondition, RetryStrategy, - OutputDataConfig, Channel, ShuffleConfig, DataSource, @@ -43,8 +42,6 @@ RemoteDebugConfig, SessionChainingConfig, InstanceGroup, - TensorBoardOutputConfig, - CheckpointConfig, ) from sagemaker.modules.utils import convert_unassigned_to_none @@ -131,6 +128,8 @@ class Compute(shapes.ResourceConfig): subsequent training jobs. instance_groups (Optional[List[InstanceGroup]]): A list of instance groups for heterogeneous clusters to be used in the training job. + training_plan_arn (Optional[str]): + The Amazon Resource Name (ARN) of the training plan to use for this resource configuration. enable_managed_spot_training (Optional[bool]): To train models using managed spot training, choose True. Managed spot training provides a fully managed and scalable infrastructure for training machine learning @@ -151,8 +150,12 @@ def _to_resource_config(self) -> shapes.ResourceConfig: compute_config_dict = self.model_dump() resource_config_fields = set(shapes.ResourceConfig.__annotations__.keys()) filtered_dict = { - k: v for k, v in compute_config_dict.items() if k in resource_config_fields + k: v + for k, v in compute_config_dict.items() + if k in resource_config_fields and v is not None } + if not filtered_dict: + return None return shapes.ResourceConfig(**filtered_dict) @@ -194,10 +197,12 @@ def _model_validator(self) -> "Networking": def _to_vpc_config(self) -> shapes.VpcConfig: """Convert to a sagemaker_core.shapes.VpcConfig object.""" compute_config_dict = self.model_dump() - resource_config_fields = set(shapes.VpcConfig.__annotations__.keys()) + vpc_config_fields = set(shapes.VpcConfig.__annotations__.keys()) filtered_dict = { - k: v for k, v in compute_config_dict.items() if k in resource_config_fields + k: v for k, v in compute_config_dict.items() if k in vpc_config_fields and v is not None } + if not filtered_dict: + return None return shapes.VpcConfig(**filtered_dict) @@ -224,3 +229,66 @@ class InputData(BaseConfig): channel_name: str = None data_source: Union[str, FileSystemDataSource, S3DataSource] = None + + +class OutputDataConfig(shapes.OutputDataConfig): + """OutputDataConfig. + + The OutputDataConfig class is a subclass of ``sagemaker_core.shapes.OutputDataConfig`` + and allows the user to specify the output data configuration for the training job. + + Parameters: + s3_output_path (Optional[str]): + The S3 URI where the output data will be stored. This is the location where the + training job will save its output data, such as model artifacts and logs. + kms_key_id (Optional[str]): + The Amazon Web Services Key Management Service (Amazon Web Services KMS) key that + SageMaker uses to encrypt the model artifacts at rest using Amazon S3 server-side + encryption. + compression_type (Optional[str]): + The model output compression type. Select `NONE` to output an uncompressed model, + recommended for large model outputs. Defaults to `GZIP`. + """ + + s3_output_path: Optional[str] = None + kms_key_id: Optional[str] = None + compression_type: Optional[str] = None + + +class TensorBoardOutputConfig(shapes.TensorBoardOutputConfig): + """TensorBoardOutputConfig. + + The TensorBoardOutputConfig class is a subclass of ``sagemaker_core.shapes.TensorBoardOutputConfig`` + and allows the user to specify the storage locations for the Amazon SageMaker + Debugger TensorBoard. + + Parameters: + s3_output_path (Optional[str]): + Path to Amazon S3 storage location for TensorBoard output. If not specified, will + default to + ``s3://////tensorboard-output`` + local_path (Optional[str]): + Path to local storage location for tensorBoard output. Defaults to /opt/ml/output/tensorboard. + """ + + s3_output_path: Optional[str] = None + local_path: Optional[str] = "/opt/ml/output/tensorboard" + + +class CheckpointConfig(shapes.CheckpointConfig): + """CheckpointConfig. + + The CheckpointConfig class is a subclass of ``sagemaker_core.shapes.CheckpointConfig`` + and allows the user to specify the checkpoint configuration for the training job. + + Parameters: + s3_uri (Optional[str]): + Path to Amazon S3 storage location for the Checkpoint data. If not specified, will + default to + ``s3://////checkpoints`` + local_path (Optional[str]): + The local directory where checkpoints are written. The default directory is /opt/ml/checkpoints. + """ + + s3_uri: Optional[str] = None + local_path: Optional[str] = "/opt/ml/checkpoints" diff --git a/src/sagemaker/modules/train/model_trainer.py b/src/sagemaker/modules/train/model_trainer.py index 96078d1aeb..58ae724074 100644 --- a/src/sagemaker/modules/train/model_trainer.py +++ b/src/sagemaker/modules/train/model_trainer.py @@ -25,6 +25,7 @@ from sagemaker_core.main import resources from sagemaker_core.resources import TrainingJob +from sagemaker_core import shapes from sagemaker_core.shapes import AlgorithmSpecification from pydantic import BaseModel, ConfigDict, PrivateAttr, validate_call @@ -48,11 +49,11 @@ from sagemaker.utils import resolve_value_from_config from sagemaker.modules import Session, get_execution_role +from sagemaker.modules import configs from sagemaker.modules.configs import ( Compute, StoppingCondition, RetryStrategy, - OutputDataConfig, SourceCode, TrainingImageConfig, Channel, @@ -64,8 +65,6 @@ InfraCheckConfig, RemoteDebugConfig, SessionChainingConfig, - TensorBoardOutputConfig, - CheckpointConfig, InputData, ) @@ -221,9 +220,9 @@ class ModelTrainer(BaseModel): training_image: Optional[str] = None training_image_config: Optional[TrainingImageConfig] = None algorithm_name: Optional[str] = None - output_data_config: Optional[OutputDataConfig] = None + output_data_config: Optional[shapes.OutputDataConfig] = None input_data_config: Optional[List[Union[Channel, InputData]]] = None - checkpoint_config: Optional[CheckpointConfig] = None + checkpoint_config: Optional[shapes.CheckpointConfig] = None training_input_mode: Optional[str] = "File" environment: Optional[Dict[str, str]] = {} hyperparameters: Optional[Union[Dict[str, Any], str]] = {} @@ -234,7 +233,7 @@ class ModelTrainer(BaseModel): _latest_training_job: Optional[resources.TrainingJob] = PrivateAttr(default=None) # Private TrainingJob Parameters - _tensorboard_output_config: Optional[TensorBoardOutputConfig] = PrivateAttr(default=None) + _tensorboard_output_config: Optional[shapes.TensorBoardOutputConfig] = PrivateAttr(default=None) _retry_strategy: Optional[RetryStrategy] = PrivateAttr(default=None) _infra_check_config: Optional[InfraCheckConfig] = PrivateAttr(default=None) _session_chaining_config: Optional[SessionChainingConfig] = PrivateAttr(default=None) @@ -265,8 +264,8 @@ class ModelTrainer(BaseModel): "networking": Networking, "stopping_condition": StoppingCondition, "training_image_config": TrainingImageConfig, - "output_data_config": OutputDataConfig, - "checkpoint_config": CheckpointConfig, + "output_data_config": configs.OutputDataConfig, + "checkpoint_config": configs.CheckpointConfig, } def _populate_intelligent_defaults(self): @@ -318,7 +317,7 @@ def _populate_intelligent_defaults_from_training_job_space(self): config_path=TRAINING_JOB_OUTPUT_DATA_CONFIG_PATH ) if default_output_data_config: - self.output_data_config = OutputDataConfig( + self.output_data_config = configs.OutputDataConfig( **self._convert_keys_to_snake(default_output_data_config) ) @@ -477,6 +476,20 @@ def model_post_init(self, __context: Any): ) logger.warning(f"Compute not provided. Using default:\n{self.compute}") + if self.compute.instance_type is None: + self.compute.instance_type = DEFAULT_INSTANCE_TYPE + logger.warning(f"Instance type not provided. Using default:\n{DEFAULT_INSTANCE_TYPE}") + if self.compute.instance_count is None: + self.compute.instance_count = 1 + logger.warning( + f"Instance count not provided. Using default:\n{self.compute.instance_count}" + ) + if self.compute.volume_size_in_gb is None: + self.compute.volume_size_in_gb = 30 + logger.warning( + f"Volume size not provided. Using default:\n{self.compute.volume_size_in_gb}" + ) + if self.stopping_condition is None: self.stopping_condition = StoppingCondition( max_runtime_in_seconds=3600, @@ -486,6 +499,12 @@ def model_post_init(self, __context: Any): logger.warning( f"StoppingCondition not provided. Using default:\n{self.stopping_condition}" ) + if self.stopping_condition.max_runtime_in_seconds is None: + self.stopping_condition.max_runtime_in_seconds = 3600 + logger.info( + "Max runtime not provided. Using default:\n" + f"{self.stopping_condition.max_runtime_in_seconds}" + ) if self.hyperparameters and isinstance(self.hyperparameters, str): if not os.path.exists(self.hyperparameters): @@ -510,24 +529,41 @@ def model_post_init(self, __context: Any): "Must be a valid JSON or YAML file." ) - if self.training_mode == Mode.SAGEMAKER_TRAINING_JOB and self.output_data_config is None: - session = self.sagemaker_session - base_job_name = self.base_job_name - self.output_data_config = OutputDataConfig( - s3_output_path=f"s3://{self._fetch_bucket_name_and_prefix(session)}" - f"/{base_job_name}", - compression_type="GZIP", - kms_key_id=None, - ) - logger.warning( - f"OutputDataConfig not provided. Using default:\n{self.output_data_config}" - ) + if self.training_mode == Mode.SAGEMAKER_TRAINING_JOB: + if self.output_data_config is None: + session = self.sagemaker_session + base_job_name = self.base_job_name + self.output_data_config = configs.OutputDataConfig( + s3_output_path=f"s3://{self._fetch_bucket_name_and_prefix(session)}" + f"/{base_job_name}", + compression_type="GZIP", + kms_key_id=None, + ) + logger.warning( + f"OutputDataConfig not provided. Using default:\n{self.output_data_config}" + ) + if self.output_data_config.s3_output_path is None: + session = self.sagemaker_session + base_job_name = self.base_job_name + self.output_data_config.s3_output_path = ( + f"s3://{self._fetch_bucket_name_and_prefix(session)}/{base_job_name}" + ) + logger.warning( + f"OutputDataConfig s3_output_path not provided. Using default:\n" + f"{self.output_data_config.s3_output_path}" + ) + if self.output_data_config.compression_type is None: + self.output_data_config.compression_type = "GZIP" + logger.warning( + f"OutputDataConfig compression type not provided. Using default:\n" + f"{self.output_data_config.compression_type}" + ) - # TODO: Autodetect which image to use if source_code is provided if self.training_image: logger.info(f"Training image URI: {self.training_image}") - def _fetch_bucket_name_and_prefix(self, session: Session) -> str: + @staticmethod + def _fetch_bucket_name_and_prefix(session: Session) -> str: """Helper function to get the bucket name with the corresponding prefix if applicable""" if session.default_bucket_prefix is not None: return f"{session.default_bucket()}/{session.default_bucket_prefix}" @@ -559,15 +595,25 @@ def train( self._populate_intelligent_defaults() current_training_job_name = _get_unique_name(self.base_job_name) input_data_key_prefix = f"{self.base_job_name}/{current_training_job_name}/input" - if input_data_config: - self.input_data_config = input_data_config - input_data_config = [] + self.input_data_config = input_data_config or self.input_data_config or [] + if self.input_data_config: - input_data_config = self._get_input_data_config( + self.input_data_config = self._get_input_data_config( self.input_data_config, input_data_key_prefix ) + if self.checkpoint_config and not self.checkpoint_config.s3_uri: + self.checkpoint_config.s3_uri = ( + f"s3://{self._fetch_bucket_name_and_prefix(self.sagemaker_session)}/" + f"{self.base_job_name}/{current_training_job_name}/checkpoints" + ) + if self._tensorboard_output_config and not self._tensorboard_output_config.s3_output_path: + self._tensorboard_output_config.s3_output_path = ( + f"s3://{self._fetch_bucket_name_and_prefix(self.sagemaker_session)}/" + f"{self.base_job_name}" + ) + string_hyper_parameters = {} if self.hyperparameters: for hyper_parameter, value in self.hyperparameters.items(): @@ -597,7 +643,7 @@ def train( data_source=self.source_code.source_dir, key_prefix=input_data_key_prefix, ) - input_data_config.append(source_code_channel) + self.input_data_config.append(source_code_channel) self._prepare_train_script( tmp_dir=tmp_dir, @@ -618,7 +664,7 @@ def train( data_source=tmp_dir.name, key_prefix=input_data_key_prefix, ) - input_data_config.append(sm_drivers_channel) + self.input_data_config.append(sm_drivers_channel) # If source_code is provided, we will always use # the default container entrypoint and arguments @@ -645,7 +691,7 @@ def train( training_job_name=current_training_job_name, algorithm_specification=algorithm_specification, hyper_parameters=string_hyper_parameters, - input_data_config=input_data_config, + input_data_config=self.input_data_config, resource_config=resource_config, vpc_config=vpc_config, # Public Instance Attributes @@ -690,7 +736,7 @@ def train( sagemaker_session=self.sagemaker_session, container_entrypoint=algorithm_specification.container_entrypoint, container_arguments=algorithm_specification.container_arguments, - input_data_config=input_data_config, + input_data_config=self.input_data_config, hyper_parameters=string_hyper_parameters, environment=self.environment, ) @@ -909,22 +955,55 @@ def from_recipe( requirements: Optional[str] = None, training_image: Optional[str] = None, training_image_config: Optional[TrainingImageConfig] = None, - output_data_config: Optional[OutputDataConfig] = None, + output_data_config: Optional[shapes.OutputDataConfig] = None, input_data_config: Optional[List[Union[Channel, InputData]]] = None, - checkpoint_config: Optional[CheckpointConfig] = None, + checkpoint_config: Optional[shapes.CheckpointConfig] = None, training_input_mode: Optional[str] = "File", environment: Optional[Dict[str, str]] = None, tags: Optional[List[Tag]] = None, sagemaker_session: Optional[Session] = None, role: Optional[str] = None, base_job_name: Optional[str] = None, - ) -> "ModelTrainer": + ) -> "ModelTrainer": # noqa: D412 """Create a ModelTrainer from a training recipe. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + from sagemaker.modules.configs import Compute + + recipe_overrides = { + "run": { + "results_dir": "/opt/ml/model", + }, + "model": { + "data": { + "use_synthetic_data": True + } + } + } + + compute = Compute( + instance_type="ml.p5.48xlarge", + keep_alive_period_in_seconds=3600 + ) + + model_trainer = ModelTrainer.from_recipe( + training_recipe="fine-tuning/deepseek/hf_deepseek_r1_distilled_llama_8b_seq8k_gpu_fine_tuning", + recipe_overrides=recipe_overrides, + compute=compute, + ) + + model_trainer.train(wait=False) + + Args: training_recipe (str): The training recipe to use for training the model. This must be the name of a sagemaker training recipe or a path to a local training recipe .yaml file. + For available training recipes, see: https://github.com/aws/sagemaker-hyperpod-recipes/ compute (Compute): The compute configuration. This is used to specify the compute resources for the training job. If not specified, will default to 1 instance of ml.m5.xlarge. @@ -1032,55 +1111,140 @@ def from_recipe( return model_trainer def with_tensorboard_output_config( - self, tensorboard_output_config: TensorBoardOutputConfig - ) -> "ModelTrainer": + self, tensorboard_output_config: Optional[shapes.TensorBoardOutputConfig] = None + ) -> "ModelTrainer": # noqa: D412 """Set the TensorBoard output configuration. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + + model_trainer = ModelTrainer( + ... + ).with_tensorboard_output_config() + Args: tensorboard_output_config (sagemaker.modules.configs.TensorBoardOutputConfig): The TensorBoard output configuration. """ - self._tensorboard_output_config = tensorboard_output_config + self._tensorboard_output_config = ( + tensorboard_output_config or configs.TensorBoardOutputConfig() + ) return self - def with_retry_strategy(self, retry_strategy: RetryStrategy) -> "ModelTrainer": + def with_retry_strategy(self, retry_strategy: RetryStrategy) -> "ModelTrainer": # noqa: D412 """Set the retry strategy for the training job. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + from sagemaker.modules.configs import RetryStrategy + + retry_strategy = RetryStrategy(maximum_retry_attempts=3) + + model_trainer = ModelTrainer( + ... + ).with_retry_strategy(retry_strategy) + Args: - retry_strategy (RetryStrategy): + retry_strategy (sagemaker.modules.configs.RetryStrategy): The retry strategy for the training job. """ self._retry_strategy = retry_strategy return self - def with_infra_check_config(self, infra_check_config: InfraCheckConfig) -> "ModelTrainer": + def with_infra_check_config( + self, infra_check_config: Optional[InfraCheckConfig] = None + ) -> "ModelTrainer": # noqa: D412 """Set the infra check configuration for the training job. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + + model_trainer = ModelTrainer( + ... + ).with_infra_check_config() + Args: - infra_check_config (InfraCheckConfig): + infra_check_config (sagemaker.modules.configs.InfraCheckConfig): The infra check configuration for the training job. """ - self._infra_check_config = infra_check_config + self._infra_check_config = infra_check_config or InfraCheckConfig(enable_infra_check=True) return self def with_session_chaining_config( - self, session_chaining_config: SessionChainingConfig - ) -> "ModelTrainer": + self, session_chaining_config: Optional[SessionChainingConfig] = None + ) -> "ModelTrainer": # noqa: D412 """Set the session chaining configuration for the training job. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + + model_trainer = ModelTrainer( + ... + ).with_session_chaining_config() + Args: - session_chaining_config (SessionChainingConfig): + session_chaining_config (sagemaker.modules.configs.SessionChainingConfig): The session chaining configuration for the training job. """ - self._session_chaining_config = session_chaining_config + self._session_chaining_config = session_chaining_config or SessionChainingConfig( + enable_session_tag_chaining=True + ) return self - def with_remote_debug_config(self, remote_debug_config: RemoteDebugConfig) -> "ModelTrainer": + def with_remote_debug_config( + self, remote_debug_config: Optional[RemoteDebugConfig] = None + ) -> "ModelTrainer": # noqa: D412 """Set the remote debug configuration for the training job. + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + + model_trainer = ModelTrainer( + ... + ).with_remote_debug_config() + Args: - remote_debug_config (RemoteDebugConfig): + remote_debug_config (sagemaker.modules.configs.RemoteDebugConfig): The remote debug configuration for the training job. """ - self._remote_debug_config = remote_debug_config + self._remote_debug_config = remote_debug_config or RemoteDebugConfig( + enable_remote_debug=True + ) + return self + + def with_checkpoint_config( + self, checkpoint_config: Optional[shapes.CheckpointConfig] = None + ) -> "ModelTrainer": # noqa: D412 + """Set the checkpoint configuration for the training job. + + Example: + + .. code:: python + + from sagemaker.modules.train import ModelTrainer + + model_trainer = ModelTrainer( + ... + ).with_checkpoint_config() + + Args: + checkpoint_config (sagemaker.modules.configs.CheckpointConfig): + The checkpoint configuration for the training job. + """ + self.checkpoint_config = checkpoint_config or configs.CheckpointConfig() return self diff --git a/tests/unit/sagemaker/modules/train/test_model_trainer.py b/tests/unit/sagemaker/modules/train/test_model_trainer.py index 6001c5db36..b1348b5ac9 100644 --- a/tests/unit/sagemaker/modules/train/test_model_trainer.py +++ b/tests/unit/sagemaker/modules/train/test_model_trainer.py @@ -324,13 +324,7 @@ def test_train_with_intelligent_defaults_training_job_space( hyper_parameters={}, input_data_config=[], resource_config=ResourceConfig( - volume_size_in_gb=30, - instance_type="ml.m5.xlarge", - instance_count=1, - volume_kms_key_id=None, - keep_alive_period_in_seconds=None, - instance_groups=None, - training_plan_arn=None, + volume_size_in_gb=30, instance_type="ml.m5.xlarge", instance_count=1 ), vpc_config=None, session=ANY, @@ -870,8 +864,6 @@ def mock_upload_data(path, bucket, key_prefix): volume_size_in_gb=compute.volume_size_in_gb, volume_kms_key_id=compute.volume_kms_key_id, keep_alive_period_in_seconds=compute.keep_alive_period_in_seconds, - instance_groups=None, - training_plan_arn=None, ), vpc_config=VpcConfig( security_group_ids=networking.security_group_ids, @@ -1228,3 +1220,41 @@ def test_hyperparameters_invalid(mock_exists, modules_session): compute=DEFAULT_COMPUTE_CONFIG, hyperparameters="hyperparameters.yaml", ) + + +@patch("sagemaker.modules.train.model_trainer._get_unique_name") +@patch("sagemaker.modules.train.model_trainer.TrainingJob") +def test_model_trainer_default_paths(mock_training_job, mock_unique_name, modules_session): + def mock_upload_data(path, bucket, key_prefix): + return f"s3://{bucket}/{key_prefix}" + + unique_name = "base-job-0123456789" + base_name = "base-job" + + modules_session.upload_data.side_effect = mock_upload_data + mock_unique_name.return_value = unique_name + + model_trainer = ( + ModelTrainer( + training_image=DEFAULT_IMAGE, + sagemaker_session=modules_session, + base_job_name=base_name, + ) + .with_tensorboard_output_config() + .with_checkpoint_config() + ) + + model_trainer.train() + + _, kwargs = mock_training_job.create.call_args + + default_base_path = f"s3://{DEFAULT_BUCKET}/{DEFAULT_BUCKET_PREFIX}/{base_name}" + + assert kwargs["output_data_config"].s3_output_path == default_base_path + assert kwargs["output_data_config"].compression_type == "GZIP" + + assert kwargs["checkpoint_config"].s3_uri == f"{default_base_path}/{unique_name}/checkpoints" + assert kwargs["checkpoint_config"].local_path == "/opt/ml/checkpoints" + + assert kwargs["tensor_board_output_config"].s3_output_path == default_base_path + assert kwargs["tensor_board_output_config"].local_path == "/opt/ml/output/tensorboard"