From 18ea484a21c78b6ea4e22ebbaf461478b4062f8a Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Thu, 20 Mar 2025 00:38:35 +0000 Subject: [PATCH 1/2] aligned disable_output_compression for remote decorator and function with estimator --- src/sagemaker/remote_function/client.py | 14 +++++- src/sagemaker/remote_function/job.py | 9 +++- .../test_feature_scheduler.py | 1 + .../sagemaker/remote_function/test_job.py | 50 +++++++++++++++++-- 4 files changed, 68 insertions(+), 6 deletions(-) diff --git a/src/sagemaker/remote_function/client.py b/src/sagemaker/remote_function/client.py index 76a8443fba..55b4654aa9 100644 --- a/src/sagemaker/remote_function/client.py +++ b/src/sagemaker/remote_function/client.py @@ -90,6 +90,7 @@ def remote( spark_config: SparkConfig = None, use_spot_instances=False, max_wait_time_in_seconds=None, + disable_output_compression: bool = False, use_torchrun: bool = False, use_mpirun: bool = False, nproc_per_node: Optional[int] = None, @@ -283,13 +284,16 @@ def remote( After this amount of time Amazon SageMaker will stop waiting for managed spot training job to complete. Defaults to ``None``. + disable_output_compression (bool): Optional. When set to true, Model is uploaded to + Amazon S3 without compression after training finishes. + use_torchrun (bool): Specifies whether to use torchrun for distributed training. Defaults to ``False``. use_mpirun (bool): Specifies whether to use mpirun for distributed training. Defaults to ``False``. - nproc_per_node (Optional int): Specifies the number of processes per node for + nproc_per_node (int): Optional. Specifies the number of processes per node for distributed training. Defaults to ``None``. This is defined automatically configured on the instance type. """ @@ -324,6 +328,7 @@ def _remote(func): spark_config=spark_config, use_spot_instances=use_spot_instances, max_wait_time_in_seconds=max_wait_time_in_seconds, + disable_output_compression=disable_output_compression, use_torchrun=use_torchrun, use_mpirun=use_mpirun, nproc_per_node=nproc_per_node, @@ -543,6 +548,7 @@ def __init__( spark_config: SparkConfig = None, use_spot_instances=False, max_wait_time_in_seconds=None, + disable_output_compression: bool = False, use_torchrun: bool = False, use_mpirun: bool = False, nproc_per_node: Optional[int] = None, @@ -736,13 +742,16 @@ def __init__( After this amount of time Amazon SageMaker will stop waiting for managed spot training job to complete. Defaults to ``None``. + disable_output_compression (bool): Optional. When set to true, Model is uploaded to + Amazon S3 without compression after training finishes. + use_torchrun (bool): Specifies whether to use torchrun for distributed training. Defaults to ``False``. use_mpirun (bool): Specifies whether to use mpirun for distributed training. Defaults to ``False``. - nproc_per_node (Optional int): Specifies the number of processes per node for + nproc_per_node (int): Optional. Specifies the number of processes per node for distributed training. Defaults to ``None``. This is defined automatically configured on the instance type. """ @@ -790,6 +799,7 @@ def __init__( spark_config=spark_config, use_spot_instances=use_spot_instances, max_wait_time_in_seconds=max_wait_time_in_seconds, + disable_output_compression=disable_output_compression, use_torchrun=use_torchrun, use_mpirun=use_mpirun, nproc_per_node=nproc_per_node, diff --git a/src/sagemaker/remote_function/job.py b/src/sagemaker/remote_function/job.py index 52cb0ff04f..9000ccda08 100644 --- a/src/sagemaker/remote_function/job.py +++ b/src/sagemaker/remote_function/job.py @@ -373,6 +373,7 @@ def __init__( spark_config: SparkConfig = None, use_spot_instances=False, max_wait_time_in_seconds=None, + disable_output_compression: bool = False, use_torchrun: bool = False, use_mpirun: bool = False, nproc_per_node: Optional[int] = None, @@ -558,13 +559,16 @@ def __init__( After this amount of time Amazon SageMaker will stop waiting for managed spot training job to complete. Defaults to ``None``. + disable_output_compression (bool): Optional. When set to true, Model is uploaded to + Amazon S3 without compression after training finishes. + use_torchrun (bool): Specifies whether to use torchrun for distributed training. Defaults to ``False``. use_mpirun (bool): Specifies whether to use mpirun for distributed training. Defaults to ``False``. - nproc_per_node (Optional int): Specifies the number of processes per node for + nproc_per_node (int): Optional. Specifies the number of processes per node for distributed training. Defaults to ``None``. This is defined automatically configured on the instance type. """ @@ -725,6 +729,7 @@ def __init__( tags = format_tags(tags) self.tags = self.sagemaker_session._append_sagemaker_config_tags(tags, REMOTE_FUNCTION_TAGS) + self.disable_output_compression = disable_output_compression self.use_torchrun = use_torchrun self.use_mpirun = use_mpirun self.nproc_per_node = nproc_per_node @@ -954,6 +959,8 @@ def compile( output_config = {"S3OutputPath": s3_base_uri} if job_settings.s3_kms_key is not None: output_config["KmsKeyId"] = job_settings.s3_kms_key + if job_settings.disable_output_compression: + output_config["CompressionType"] = "NONE" request_dict["OutputDataConfig"] = output_config container_args = ["--s3_base_uri", s3_base_uri] diff --git a/tests/unit/sagemaker/feature_store/feature_processor/test_feature_scheduler.py b/tests/unit/sagemaker/feature_store/feature_processor/test_feature_scheduler.py index 00bd3ca090..7b35174940 100644 --- a/tests/unit/sagemaker/feature_store/feature_processor/test_feature_scheduler.py +++ b/tests/unit/sagemaker/feature_store/feature_processor/test_feature_scheduler.py @@ -907,6 +907,7 @@ def test_remote_decorator_fields_consistency(get_execution_role, session): "use_spot_instances", "max_wait_time_in_seconds", "custom_file_filter", + "disable_output_compression", "use_torchrun", "use_mpirun", "nproc_per_node", diff --git a/tests/unit/sagemaker/remote_function/test_job.py b/tests/unit/sagemaker/remote_function/test_job.py index 671f091d02..5be84fe5ba 100644 --- a/tests/unit/sagemaker/remote_function/test_job.py +++ b/tests/unit/sagemaker/remote_function/test_job.py @@ -291,8 +291,8 @@ def mock_get_current_run(): return current_run -def describe_training_job_response(job_status): - return { +def describe_training_job_response(job_status, disable_output_compression=False): + job_response = { "TrainingJobArn": TRAINING_JOB_ARN, "TrainingJobStatus": job_status, "ResourceConfig": { @@ -300,15 +300,38 @@ def describe_training_job_response(job_status): "InstanceType": "ml.c4.xlarge", "VolumeSizeInGB": 30, }, - "OutputDataConfig": {"S3OutputPath": "s3://sagemaker-123/image_uri/output"}, } + if disable_output_compression: + output_config = { + "S3OutputPath": "s3://sagemaker-123/image_uri/output", + "CompressionType": "NONE", + } + else: + output_config = { + "S3OutputPath": "s3://sagemaker-123/image_uri/output", + "CompressionType": "NONE", + } + + job_response["OutputDataConfig"] = output_config + + return job_response + COMPLETED_TRAINING_JOB = describe_training_job_response("Completed") INPROGRESS_TRAINING_JOB = describe_training_job_response("InProgress") CANCELLED_TRAINING_JOB = describe_training_job_response("Stopped") FAILED_TRAINING_JOB = describe_training_job_response("Failed") +COMPLETED_TRAINING_JOB_DISABLE_OUTPUT_COMPRESSION = describe_training_job_response( + "Completed", True +) +INPROGRESS_TRAINING_JOB_DISABLE_OUTPUT_COMPRESSION = describe_training_job_response( + "InProgress", True +) +CANCELLED_TRAINING_JOB_DISABLE_OUTPUT_COMPRESSION = describe_training_job_response("Stopped", True) +FAILED_TRAINING_JOB_DISABLE_OUTPUT_COMPRESSION = describe_training_job_response("Failed", True) + def mock_session(): session = Mock() @@ -1303,6 +1326,27 @@ def test_describe(session, *args): session().sagemaker_client.describe_training_job.assert_called_once() +@patch("sagemaker.remote_function.job._prepare_and_upload_runtime_scripts") +@patch("sagemaker.remote_function.job._prepare_and_upload_workspace") +@patch("sagemaker.remote_function.job.StoredFunction") +@patch("sagemaker.remote_function.job.Session", return_value=mock_session()) +def test_describe_disable_output_compression(session, *args): + + job_settings = _JobSettings( + image_uri=IMAGE, + s3_root_uri=S3_URI, + role=ROLE_ARN, + instance_type="ml.m5.large", + disable_output_compression=True, + ) + job = _Job.start(job_settings, job_function, func_args=(1, 2), func_kwargs={"c": 3, "d": 4}) + + job.describe() + assert job.describe() == COMPLETED_TRAINING_JOB_DISABLE_OUTPUT_COMPRESSION + + session().sagemaker_client.describe_training_job.assert_called_once() + + @patch("sagemaker.remote_function.job._prepare_and_upload_runtime_scripts") @patch("sagemaker.remote_function.job._prepare_and_upload_workspace") @patch("sagemaker.remote_function.job.StoredFunction") From 37778ffbe6d29d4e7cac22cae1cf915274151ff9 Mon Sep 17 00:00:00 2001 From: brunopistone Date: Thu, 20 Mar 2025 09:46:52 +0000 Subject: [PATCH 2/2] fixed compatibility tests with step decorator --- tests/unit/sagemaker/remote_function/test_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/sagemaker/remote_function/test_client.py b/tests/unit/sagemaker/remote_function/test_client.py index 6c2a373dbc..de8758bfad 100644 --- a/tests/unit/sagemaker/remote_function/test_client.py +++ b/tests/unit/sagemaker/remote_function/test_client.py @@ -1504,6 +1504,7 @@ def test_consistency_between_remote_and_step_decorator(): "s3_kms_key", "s3_root_uri", "sagemaker_session", + "disable_output_compression", "use_torchrun", "use_mpirun", "nproc_per_node",