Skip to content

Commit d32681f

Browse files
authored
Add storage mount options to LeptonExecutor (#237)
Allow users to specify where storage is mounted from on DGX Cloud Lepton jobs, such as on an attached NFS mounted on all of the nodes in the node group. This can be mounted in jobs for shared storage. Signed-off-by: Robert Clark <[email protected]>
1 parent dd0354d commit d32681f

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,7 @@ def move_data(self, sleep: float = 10) -> None:
9999
image="busybox:1.37.0", # Use a very low resource container
100100
command=cmd,
101101
),
102-
mounts=[
103-
Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts
104-
],
102+
mounts=[Mount(**mount) for mount in self.mounts],
105103
)
106104
spec.resource_requirement = ResourceRequirement(
107105
resource_shape="cpu.small",
@@ -161,6 +159,15 @@ def _valid_node_ids(self, node_group_id: DedicatedNodeGroup, client: APIClient)
161159

162160
return valid_node_ids
163161

162+
def _validate_mounts(self):
163+
"""
164+
Ensure the required arguments are specified for mounts.
165+
"""
166+
for mount in self.mounts:
167+
# Verify that 'path' and 'mount_path' are both present in the mounts list
168+
if not all(key in mount for key in ["path", "mount_path"]):
169+
raise RuntimeError("Must specify a 'path' and 'mount_path' for all mounts")
170+
164171
def create_lepton_job(self, name: str):
165172
"""
166173
Creates a distributed PyTorch job using the provided project/cluster IDs.
@@ -192,9 +199,7 @@ def create_lepton_job(self, name: str):
192199
max_failure_retry=None,
193200
max_job_failure_retry=None,
194201
envs=envs,
195-
mounts=[
196-
Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts
197-
],
202+
mounts=[Mount(**mount) for mount in self.mounts],
198203
image_pull_secrets=[],
199204
ttl_seconds_after_finished=None,
200205
intra_job_communication=True,
@@ -211,6 +216,7 @@ def create_lepton_job(self, name: str):
211216
return created_job
212217

213218
def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
219+
self._validate_mounts()
214220
name = name.replace("_", "-").replace(".", "-") # to meet K8s requirements
215221
launch_script = f"""
216222
wget -O init.sh https://raw.githubusercontent.com/leptonai/scripts/main/lepton_env_to_pytorch.sh

test/core/execution/test_lepton.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,55 @@ def test_nproc_per_node_default(self):
396396

397397
assert executor.nproc_per_node() == 1
398398

399+
def test_valid_storage_mounts(self):
400+
executor = LeptonExecutor(
401+
container_image="nvcr.io/nvidia/test:latest",
402+
nemo_run_dir="/workspace/nemo_run",
403+
mounts=[{"path": "/workspace", "mount_path": "/workspace"}],
404+
)
405+
406+
assert executor._validate_mounts() is None
407+
408+
def test_valid_storage_mounts_with_mount_from(self):
409+
executor = LeptonExecutor(
410+
container_image="nvcr.io/nvidia/test:latest",
411+
nemo_run_dir="/workspace/nemo_run",
412+
mounts=[
413+
{"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"}
414+
],
415+
)
416+
417+
assert executor._validate_mounts() is None
418+
419+
def test_missing_storage_mount_options(self):
420+
executor = LeptonExecutor(
421+
container_image="nvcr.io/nvidia/test:latest",
422+
nemo_run_dir="/workspace/nemo_run",
423+
mounts=[{"path": "/workspace"}],
424+
)
425+
426+
with pytest.raises(RuntimeError):
427+
executor._validate_mounts()
428+
429+
def test_missing_storage_mount_options_mount_path(self):
430+
executor = LeptonExecutor(
431+
container_image="nvcr.io/nvidia/test:latest",
432+
nemo_run_dir="/workspace/nemo_run",
433+
mounts=[{"mount_path": "/workspace"}],
434+
)
435+
436+
with pytest.raises(RuntimeError):
437+
executor._validate_mounts()
438+
439+
def test_valid_storage_mounts_with_random_args(self):
440+
executor = LeptonExecutor(
441+
container_image="nvcr.io/nvidia/test:latest",
442+
nemo_run_dir="/workspace/nemo_run",
443+
mounts=[{"path": "/workspace", "mount_path": "/workspace", "random": True}],
444+
)
445+
446+
assert executor._validate_mounts() is None
447+
399448
@patch("nemo_run.core.execution.lepton.APIClient")
400449
def test_status_running_and_ready(self, mock_APIClient):
401450
mock_instance = MagicMock()

0 commit comments

Comments
 (0)