Skip to content

Commit 0d271a9

Browse files
authored
zozhang/dgxc executor data mover (#206)
* adding APIs for data movement Signed-off-by: Zoey Zhang <[email protected]> * adding todos and questions Signed-off-by: Zoey Zhang <[email protected]> * support data movement into PVC before launching job Signed-off-by: Zoey Zhang <[email protected]> * updating comments Signed-off-by: Zoey Zhang <[email protected]> * refactored data movement to custom move_data function and modified code to all use status to get workload status Signed-off-by: Zoey Zhang <[email protected]> * comment back commented out code Signed-off-by: Zoey Zhang <[email protected]> * formatting and adding back PVC checker Signed-off-by: Zoey Zhang <[email protected]> * fixing DGXC Executor tests for data mover redesign Signed-off-by: Zoey Zhang <[email protected]> * cleanup Signed-off-by: Zoey Zhang <[email protected]> * test package configs Signed-off-by: Zoey Zhang <[email protected]> * fix formatting Signed-off-by: Zoey Zhang <[email protected]> * removing unreachable code Signed-off-by: Zoey Zhang <[email protected]> * add flag for launching from inside the cluster Signed-off-by: Zoey Zhang <[email protected]> * fix format Signed-off-by: Zoey Zhang <[email protected]> * fixing nsys prefix Signed-off-by: Zoey Zhang <[email protected]> --------- Signed-off-by: Zoey Zhang <[email protected]>
1 parent fb550a3 commit 0d271a9

File tree

4 files changed

+498
-105
lines changed

4 files changed

+498
-105
lines changed

nemo_run/core/execution/dgxcloud.py

Lines changed: 162 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import base64
12
import json
23
import logging
34
import os
45
import subprocess
56
from dataclasses import dataclass, field
67
from enum import Enum
78
from pathlib import Path
9+
import time
10+
import tempfile
811
from typing import Any, Optional, Type
912

1013
import requests
@@ -13,6 +16,7 @@
1316
from nemo_run.core.execution.base import Executor, ExecutorMacros
1417
from nemo_run.core.packaging.base import Packager
1518
from nemo_run.core.packaging.git import GitArchivePackager
19+
from nemo_run.config import get_nemorun_home
1620

1721
logger = logging.getLogger(__name__)
1822

@@ -50,9 +54,12 @@ class DGXCloudExecutor(Executor):
5054
app_secret: str
5155
project_name: str
5256
container_image: str
57+
pvc_nemo_run_dir: str
58+
launched_from_cluster: bool = False
5359
nodes: int = 1
5460
gpus_per_node: int = 0
5561
nprocs_per_node: int = 1
62+
pvc_job_dir: str = field(init=False, default="")
5663
pvcs: list[dict[str, Any]] = field(default_factory=list)
5764
distributed_framework: str = "PyTorch"
5865
custom_spec: dict[str, Any] = field(default_factory=dict)
@@ -90,29 +97,121 @@ def get_project_and_cluster_id(self, token: str) -> tuple[Optional[str], Optiona
9097
break
9198
return project_id, cluster_id
9299

93-
def create_distributed_job(
94-
self, token: str, project_id: str, cluster_id: str, name: str, cmd: list[str]
95-
):
100+
def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> str:
101+
with tempfile.TemporaryDirectory() as temp_dir:
102+
tarball_path = os.path.join(temp_dir, "archive.tar.gz")
103+
subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True)
104+
with open(tarball_path, "rb") as file:
105+
file_data = file.read()
106+
encoded_data = base64.b64encode(file_data).decode("utf-8")
107+
108+
# Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod
109+
cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz"
110+
return cmd
111+
112+
def create_data_mover_workload(self, token: str, project_id: str, cluster_id: str):
113+
"""
114+
Creates a CPU only workload to move job directory into PVC using the provided project/cluster IDs.
115+
"""
116+
117+
cmd = self.copy_directory_data_command(self.job_dir, self.pvc_job_dir)
118+
119+
url = f"{self.base_url}/workloads/workspaces"
120+
headers = self._default_headers(token=token)
121+
122+
payload = {
123+
"name": "data-mover",
124+
"useGivenNameAsPrefix": True,
125+
"projectId": project_id,
126+
"clusterId": cluster_id,
127+
"spec": {
128+
"command": "sh -c",
129+
"args": f"'{cmd}'",
130+
"image": "busybox:1.37.0",
131+
"storage": {"pvc": self.pvcs},
132+
},
133+
}
134+
135+
response = requests.post(url, json=payload, headers=headers)
136+
137+
logger.debug(
138+
"Created workload; response code=%s, content=%s",
139+
response.status_code,
140+
response.text.strip(),
141+
)
142+
143+
return response
144+
145+
def delete_workload(self, token: str, workload_id: str):
146+
url = f"{self.base_url}/workloads/workspaces/{workload_id}"
147+
headers = self._default_headers(token=token)
148+
149+
response = requests.delete(url, headers=headers)
150+
151+
logger.debug(
152+
"Delete interactive workspace; response code=%s, content=%s",
153+
response.status_code,
154+
response.text.strip(),
155+
)
156+
return response
157+
158+
def move_data(self, token: str, project_id: str, cluster_id: str, sleep: float = 10) -> None:
159+
"""
160+
Moves job directory into PVC and deletes the workload after completion
161+
"""
162+
163+
resp = self.create_data_mover_workload(token, project_id, cluster_id)
164+
if resp.status_code not in [200, 202]:
165+
raise RuntimeError(
166+
f"Failed to create data mover workload, status_code={resp.status_code}"
167+
)
168+
169+
resp_json = resp.json()
170+
workload_id = resp_json["workloadId"]
171+
status = DGXCloudState(resp_json["actualPhase"])
172+
173+
while status in [
174+
DGXCloudState.PENDING,
175+
DGXCloudState.CREATING,
176+
DGXCloudState.INITIALIZING,
177+
DGXCloudState.RUNNING,
178+
]:
179+
time.sleep(sleep)
180+
status = self.status(workload_id)
181+
182+
if status is not DGXCloudState.COMPLETED:
183+
raise RuntimeError("Failed to move data to PVC")
184+
185+
resp = self.delete_workload(token, workload_id)
186+
if resp.status_code >= 200 and resp.status_code < 300:
187+
logger.info(
188+
"Successfully deleted data movement workload %s on DGXCloud with response code %d",
189+
workload_id,
190+
resp.status_code,
191+
)
192+
else:
193+
logger.error(
194+
"Failed to delete data movement workload %s, response code=%d, reason=%s",
195+
workload_id,
196+
resp.status_code,
197+
resp.text,
198+
)
199+
200+
def create_distributed_job(self, token: str, project_id: str, cluster_id: str, name: str):
96201
"""
97202
Creates a distributed PyTorch job using the provided project/cluster IDs.
98203
"""
204+
99205
url = f"{self.base_url}/workloads/distributed"
100206
headers = self._default_headers(token=token)
101-
launch_script = f"""
102-
ln -s {self.job_dir} /nemo_run
103-
cd /nemo_run/code
104-
{" ".join(cmd)}
105-
"""
106-
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
107-
f.write(launch_script)
108207

109208
payload = {
110209
"name": name,
111210
"useGivenNameAsPrefix": True,
112211
"projectId": project_id,
113212
"clusterId": cluster_id,
114213
"spec": {
115-
"command": f"/bin/bash {self.job_dir}/launch_script.sh",
214+
"command": f"/bin/bash {self.pvc_job_dir}/launch_script.sh",
116215
"image": self.container_image,
117216
"distributedFramework": self.distributed_framework,
118217
"minReplicas": self.nodes,
@@ -145,7 +244,21 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
145244
if not project_id or not cluster_id:
146245
raise RuntimeError("Unable to determine project/cluster IDs for job submission")
147246

148-
resp = self.create_distributed_job(token, project_id, cluster_id, name, cmd)
247+
# prepare launch script and move data to PVC
248+
launch_script = f"""
249+
ln -s {self.pvc_job_dir}/ /nemo_run
250+
cd /nemo_run/code
251+
{" ".join(cmd)}
252+
"""
253+
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
254+
f.write(launch_script)
255+
256+
if not self.launched_from_cluster:
257+
logger.info("Creating data movement workload")
258+
self.move_data(token, project_id, cluster_id)
259+
260+
logger.info("Creating distributed workload")
261+
resp = self.create_distributed_job(token, project_id, cluster_id, name)
149262
if resp.status_code not in [200, 202]:
150263
raise RuntimeError(f"Failed to create job, status_code={resp.status_code}")
151264

@@ -167,10 +280,10 @@ def nproc_per_node(self) -> int:
167280
return 1
168281

169282
def status(self, job_id: str) -> Optional[DGXCloudState]:
170-
url = f"{self.base_url}/workloads/distributed/{job_id}"
283+
url = f"{self.base_url}/workloads/{job_id}"
171284
token = self.get_auth_token()
172285
if not token:
173-
logger.error("Failed to retrieve auth token for cancellation request.")
286+
logger.error("Failed to retrieve auth token for status request.")
174287
return None
175288

176289
headers = self._default_headers(token=token)
@@ -179,7 +292,7 @@ def status(self, job_id: str) -> Optional[DGXCloudState]:
179292
return DGXCloudState("Unknown")
180293

181294
r_json = response.json()
182-
return DGXCloudState(r_json["actualPhase"])
295+
return DGXCloudState(r_json["phase"])
183296

184297
def cancel(self, job_id: str):
185298
# Retrieve the authentication token for the REST calls
@@ -225,18 +338,49 @@ def assign(
225338
self.job_name = task_id
226339
self.experiment_dir = exp_dir
227340
self.job_dir = os.path.join(exp_dir, task_dir)
228-
self.experiment_id = exp_id
229341
assert any(
230342
map(
231343
lambda x: os.path.commonpath(
232-
[os.path.abspath(x["path"]), os.path.abspath(self.job_dir)]
344+
[os.path.abspath(x["path"]), os.path.abspath(self.pvc_nemo_run_dir)]
233345
)
234346
== os.path.abspath(x["path"]),
235347
self.pvcs,
236348
)
237349
), (
238-
f"Need to specify atleast one PVC containing {self.job_dir}.\nTo update job dir to a PVC path, you can use set_nemorun_home() or the NEMORUN_HOME env var."
350+
f"Need to specify at least one PVC containing {self.pvc_nemo_run_dir}. Update your PVC path or pvc_nemo_run_dir."
351+
)
352+
353+
# setting linked PVC job directory
354+
nemo_run_home = get_nemorun_home()
355+
job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash
356+
self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir)
357+
358+
logger.info(
359+
"PVC job directory set as: %s",
360+
self.pvc_job_dir,
239361
)
362+
self.experiment_id = exp_id
363+
364+
def get_launcher_prefix(self) -> Optional[list[str]]:
365+
launcher = self.get_launcher()
366+
if launcher.nsys_profile:
367+
return launcher.get_nsys_prefix(profile_dir="/nemo_run")
368+
369+
def package_configs(self, *cfgs: tuple[str, str]) -> list[str]:
370+
filenames = []
371+
basepath = os.path.join(self.job_dir, "configs")
372+
for name, cfg in cfgs:
373+
filename = os.path.join(basepath, name)
374+
os.makedirs(os.path.dirname(filename), exist_ok=True)
375+
with open(filename, "w") as f:
376+
f.write(cfg)
377+
filenames.append(
378+
os.path.join(
379+
"/nemo_run/configs",
380+
name,
381+
)
382+
)
383+
return filenames
240384

241385
def package(self, packager: Packager, job_name: str):
242386
assert self.experiment_id, "Executor not assigned to an experiment."

nemo_run/run/torchx_backend/schedulers/dgxcloud.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def _submit_dryrun( # type: ignore
8787
cfg: Executor,
8888
) -> AppDryRunInfo[DGXRequest]:
8989
assert isinstance(cfg, DGXCloudExecutor), (
90-
f"{cfg.__class__} not supported for skypilot scheduler."
90+
f"{cfg.__class__} not supported for DGXCloud scheduler."
9191
)
9292
executor = cfg
9393

0 commit comments

Comments
 (0)