Skip to content

Commit 7651331

Browse files
authored
[ML] Avoid using a temporary zip file during local job invocation (Azure#30386)
* pylint: Prevent unnecessary parameter shadowing * refactor: Unzip directly from bytes instead of intermediary file
1 parent 8994be5 commit 7651331

File tree

2 files changed

+18
-29
lines changed

2 files changed

+18
-29
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@
137137
ANONYMOUS_ENV_NAME = "CliV2AnonymousEnvironment"
138138
SKIP_VALIDATION_MESSAGE = "To skip this validation use the --skip-validation param"
139139
MLTABLE_METADATA_SCHEMA_URL_FALLBACK = "https://azuremlschemasprod.azureedge.net/latest/MLTable.schema.json"
140-
INVOCATION_ZIP_FILE = "invocation.zip"
141140
INVOCATION_BAT_FILE = "Invocation.bat"
142141
INVOCATION_BASH_FILE = "Invocation.sh"
143142
AZUREML_RUN_SETUP_DIR = "azureml-setup"

sdk/ml/azure-ai-ml/azure/ai/ml/operations/_local_job_invoker.py

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# ---------------------------------------------------------
44

55
import base64
6+
import io
67
import json
78
import logging
89
import os
@@ -18,6 +19,9 @@
1819
from threading import Thread
1920
from typing import Dict, Optional, Tuple
2021

22+
from azure.core.credentials import TokenCredential
23+
from azure.core.exceptions import AzureError
24+
2125
from azure.ai.ml._restclient.v2022_02_01_preview.models import JobBaseData
2226
from azure.ai.ml._utils._http_utils import HttpPipeline
2327
from azure.ai.ml._utils.utils import DockerProxy
@@ -27,12 +31,9 @@
2731
EXECUTION_SERVICE_URL_KEY,
2832
INVOCATION_BASH_FILE,
2933
INVOCATION_BAT_FILE,
30-
INVOCATION_ZIP_FILE,
3134
LOCAL_JOB_FAILURE_MSG,
3235
)
3336
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, JobException
34-
from azure.core.credentials import TokenCredential
35-
from azure.core.exceptions import AzureError
3637

3738
docker = DockerProxy()
3839
module_logger = logging.getLogger(__name__)
@@ -41,12 +42,8 @@
4142
def unzip_to_temporary_file(job_definition: JobBaseData, zip_content: bytes) -> Path:
4243
temp_dir = Path(tempfile.gettempdir(), AZUREML_RUNS_DIR, job_definition.name)
4344
temp_dir.mkdir(parents=True, exist_ok=True)
44-
zip_path = temp_dir / INVOCATION_ZIP_FILE
45-
with zip_path.open(mode="wb") as file:
46-
file.write(zip_content)
47-
with zipfile.ZipFile(zip_path, "r") as zip_ref:
45+
with zipfile.ZipFile(io.BytesIO(zip_content)) as zip_ref:
4846
zip_ref.extractall(temp_dir)
49-
zip_path.unlink()
5047
return temp_dir
5148

5249

@@ -273,23 +270,16 @@ def get_common_runtime_info_from_response(self, response: Dict[str, str]) -> Tup
273270
:rtype: Tuple[Dict[str, str], str]
274271
"""
275272

276-
with tempfile.TemporaryDirectory() as tempdir:
277-
invocation_zip_path = os.path.join(tempdir, INVOCATION_ZIP_FILE)
278-
with open(invocation_zip_path, "wb") as file:
279-
file.write(response)
280-
281-
with zipfile.ZipFile(invocation_zip_path, "r") as zip_ref:
282-
bootstrapper_path = f"{AZUREML_RUN_SETUP_DIR}/{self.COMMON_RUNTIME_BOOTSTRAPPER_INFO}"
283-
job_spec_path = f"{AZUREML_RUN_SETUP_DIR}/{self.COMMON_RUNTIME_JOB_SPEC}"
284-
if not all(file_path in zip_ref.namelist() for file_path in [bootstrapper_path, job_spec_path]):
285-
raise RuntimeError(
286-
f"{bootstrapper_path}, {job_spec_path} are not in the execution service response."
287-
)
273+
with zipfile.ZipFile(io.BytesIO(response)) as zip_ref:
274+
bootstrapper_path = f"{AZUREML_RUN_SETUP_DIR}/{self.COMMON_RUNTIME_BOOTSTRAPPER_INFO}"
275+
job_spec_path = f"{AZUREML_RUN_SETUP_DIR}/{self.COMMON_RUNTIME_JOB_SPEC}"
276+
if not all(file_path in zip_ref.namelist() for file_path in [bootstrapper_path, job_spec_path]):
277+
raise RuntimeError(f"{bootstrapper_path}, {job_spec_path} are not in the execution service response.")
288278

289-
with zip_ref.open(bootstrapper_path, "r") as bootstrapper_file:
290-
bootstrapper_json = json.loads(base64.b64decode(bootstrapper_file.read()))
291-
with zip_ref.open(job_spec_path, "r") as job_spec_file:
292-
job_spec = job_spec_file.read().decode("utf-8")
279+
with zip_ref.open(bootstrapper_path, "r") as bootstrapper_file:
280+
bootstrapper_json = json.loads(base64.b64decode(bootstrapper_file.read()))
281+
with zip_ref.open(job_spec_path, "r") as job_spec_file:
282+
job_spec = job_spec_file.read().decode("utf-8")
293283

294284
return bootstrapper_json, job_spec
295285

@@ -427,13 +417,13 @@ def start_run_if_local(
427417
return snapshot_id
428418

429419

430-
def _log_subprocess(io, file, show_in_console=False):
431-
def log_subprocess(io, file, show_in_console):
432-
for line in iter(io.readline, ""):
420+
def _log_subprocess(output_io, file, show_in_console=False):
421+
def log_subprocess():
422+
for line in iter(output_io.readline, ""):
433423
if show_in_console:
434424
print(line, end="")
435425
file.write(line)
436426

437-
thread = Thread(target=log_subprocess, args=(io, file, show_in_console))
427+
thread = Thread(target=log_subprocess)
438428
thread.daemon = True
439429
thread.start()

0 commit comments

Comments
 (0)