Skip to content

Commit d3be9ac

Browse files
roclarkzoeyz101
andauthored
Add RayCluster support for DGX Cloud Lepton (#389)
* Add Ray support for DGX Cloud Lepton Add support for launching a RayCluster on DGX Cloud Lepton and submitting RayJobs on the clusters using the lepton SDK. This uses the new RayCluster feature on DGX Cloud Lepton to dynamically spawn clusters up and down via the Python SDK and jobs can be submitted to deployed clusters directly. Signed-Off-By: Robert Clark <[email protected]> * making name unique and add resource shape for lepton raycluster Signed-off-by: Zoey Zhang <[email protected]> * adding head node reference in RayCluster, support defining secrets in RayCluster and linting Signed-off-by: Zoey Zhang <[email protected]> * Remove Slurm packager comments from RayCluster Removed the placeholder Slurm packager handling comments from the Lepton RayCluster code. For now, the "workdir" parameter should be used for transferring local data to the remote Ray cluster. Signed-Off-By: Robert Clark <[email protected]> * Fix RayCluster head resource shape Fix issue to ensure the proper head node resource shape is used if it isn't explicitly given by the user. Signed-Off-By: Robert Clark <[email protected]> * Update LeptonRay comments Updated the comments in the LeptonRayCluster and LeptonRayJob classes to accurately reflect the code. Signed-Off-By: Robert Clark <[email protected]> * Fix RayJob logs streaming connection dropping The RayJob logs stream would sometimes timeout and reset, causing a very long output of logs in the terminal as it continually resets. Signed-Off-By: Robert Clark <[email protected]> * Make RayCluster head resource shape optional The head node resource shape for a LeptonRayCluster should be optional. If it isn't specified by the user, it should default to the same shape used for the worker nodes. Signed-Off-By: Robert Clark <[email protected]> * Add doc for DGXC Lepton RayClusters Added an example to the Ray quick-start guide on how to use RayClusters and RayJobs with NeMo-Run on DGX Cloud Lepton. Signed-Off-By: Robert Clark <[email protected]> * Update license date Signed-Off-By: Robert Clark <[email protected]> * Fix Ray guide typo Signed-Off-By: Robert Clark <[email protected]> * Make cluster readiness timeout a variable Allows users to specify how long to wait for a RayCluster to be created on DGX Cloud Lepton. Signed-Off-By: Robert Clark <[email protected]> * Remove implicit returns Signed-Off-By: Robert Clark <[email protected]> * Remove unused local variable Signed-Off-By: Robert Clark <[email protected]> * Fix linting errors Signed-Off-By: Robert Clark <[email protected]> * Fix formatting errors Signed-Off-By: Robert Clark <[email protected]> * Move LeptonExecutor parameters to definition Move the RayCluster-specific settings to the LeptonExecutor class for a more seamless interface for launching and interacting with RayClusters on DGX Cloud Lepton. Signed-Off-By: Robert Clark <[email protected]> * Updated leptonai package version Need a newer version of the leptonai SDK to support RayClusters. Signed-Off-By: Robert Clark <[email protected]> * Add Lepton RayCluster tests Signed-Off-By: Robert Clark <[email protected]> --------- Signed-off-by: Robert Clark <[email protected]> Signed-off-by: Zoey Zhang <[email protected]> Co-authored-by: Zoey Zhang <[email protected]>
1 parent e6c5a5e commit d3be9ac

File tree

8 files changed

+1776
-40
lines changed

8 files changed

+1776
-40
lines changed

docs/guides/ray.md

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@
2525

2626
| Object | What it abstracts | Back-ends supported |
2727
|-------------|-------------------|---------------------|
28-
| `run.ray.cluster.RayCluster` | Lifecycle of a Ray **cluster** (create ⇒ wait ⇢ status ⇢ port-forward ⇢ delete). | `KubeRayExecutor`, `SlurmExecutor` |
28+
| `run.ray.cluster.RayCluster` | Lifecycle of a Ray **cluster** (create ⇒ wait ⇢ status ⇢ port-forward ⇢ delete). | `KubeRayExecutor`, `SlurmExecutor`, `LeptonExecutor` |
2929
| `run.ray.job.RayJob` | Lifecycle of a Ray **job** (submit ⇒ monitor ⇢ logs ⇢ cancel). | same |
3030

31-
The two helpers share a uniform API; the chosen *Executor* decides whether we talk to the **KubeRay** operator (K8s) or a **Slurm** job under the hood.
31+
The two helpers share a uniform API; the chosen *Executor* decides whether we talk to the **KubeRay** operator (K8s), **DGX Cloud Lepton's RayCluster**, or a **Slurm** job under the hood.
3232

3333
```mermaid
3434
classDiagram
3535
RayCluster <|-- KubeRayCluster
3636
RayCluster <|-- SlurmRayCluster
37+
RayCluster <|-- LeptonRayCluster
3738
RayJob <|-- KubeRayJob
3839
RayJob <|-- SlurmRayJob
40+
RayJob <|-- LeptonRayJob
3941
```
4042

4143
## 2. KubeRay quick-start
@@ -183,7 +185,77 @@ cluster.stop()
183185
* `executor.packager = run.GitArchivePackager()` if you prefer packaging a git tree instead of rsync.
184186
* `cluster.port_forward()` opens an SSH tunnel from *your laptop* to the Ray dashboard running on the head node.
185187

186-
## 4. API reference cheat-sheet
188+
## 4. DGX Cloud Lepton RayCluster quick-start
189+
190+
```python
191+
import os
192+
from pathlib import Path
193+
194+
import nemo_run as run
195+
from nemo_run.core.execution.lepton import LeptonExecutor
196+
from nemo_run.run.ray.cluster import RayCluster
197+
from nemo_run.run.ray.job import RayJob
198+
199+
# 1) Create a LeptonExecutor and tweak defaults
200+
mounts = [
201+
{
202+
"path": "/",
203+
"mount_path": "/nemo-workspace",
204+
"from": "node-nfs:lepton-shared-fs",
205+
}
206+
]
207+
208+
executor = LeptonExecutor(
209+
resource_shape="gpu.8xh100",
210+
container_image="rayproject/ray:2.49.2-gpu",
211+
nemo_run_dir="/nemo-workspace/nemo-run",
212+
head_resource_shape="cpu.large",
213+
ray_version="2.49.2",
214+
mounts=mounts,
215+
node_group="my-node-group",
216+
nodes=1,
217+
nprocs_per_node=8,
218+
env_vars={
219+
"TORCH_HOME": "/nemo-workspace/.cache",
220+
},
221+
secret_vars=[
222+
{"WANDB_API_KEY": "WANDB_API_KEY"},
223+
{"HF_TOKEN": "HUGGING_FACE_HUB_TOKEN"},
224+
],
225+
launcher="torchrun",
226+
image_pull_secrets=[],
227+
pre_launch_commands=[],
228+
)
229+
230+
# 2) Bring up the RayCluster on DGX Cloud Lepton and show the status
231+
cluster = RayCluster(
232+
name="lepton-ray-cluster",
233+
executor=executor,
234+
)
235+
cluster.start(timeout=1800)
236+
cluster.status(display=True)
237+
238+
# 3) Submit a RayJob that runs inside the created RayCluster
239+
job = RayJob(
240+
name="demo-lepton-ray-job",
241+
executor=executor,
242+
cluster_name="lepton-ray-cluster",
243+
)
244+
job.start(
245+
command="uv run python train.py --config cfgs/train.yaml cluster.num_nodes=2",
246+
workdir="/path/to/project/", # rsync'ed from local to the RayCluster
247+
)
248+
job.status(display=True) # Display the RayJob status
249+
job.logs(follow=True) # Tail the job logs as it runs
250+
251+
# 4) Tear down the RayCluster and free up resources
252+
cluster.stop()
253+
```
254+
255+
### Tips for DGX Cloud Lepton users
256+
* This assumes the [DGX Cloud Lepton CLI](https://docs.nvidia.com/dgx-cloud/lepton/reference/cli/get-started/) is installed and has been authenticated.
257+
258+
## 5. API reference cheat-sheet
187259

188260
```python
189261
cluster = RayCluster(name, executor)
@@ -201,7 +273,7 @@ job.stop()
201273

202274
All methods are synchronous and **return immediately** when their work is done; the helpers hide the messy details (kubectl, squeue, ssh, …).
203275

204-
## 5. Rolling your own CLI
276+
## 6. Rolling your own CLI
205277

206278
Because `RayCluster` and `RayJob` are plain Python, you can compose them inside **argparse**, **Typer**, **Click** – anything. Here is a minimal **argparse** script:
207279

nemo_run/core/execution/lepton.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class LeptonExecutor(Executor):
8181
) # Image pull secrets for container registry authentication
8282
custom_spec: dict[str, Any] = field(default_factory=dict)
8383
pre_launch_commands: list[str] = field(default_factory=list) # Custom commands before launch
84+
head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster
85+
ray_version: Optional[str] = None # Only used for LeptonRayCluster
8486

8587
def stop_job(self, job_id: str):
8688
"""

nemo_run/run/ray/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
from typing import Optional, Type
1818

1919
from nemo_run.core.execution.base import Executor
20+
from nemo_run.core.execution.lepton import LeptonExecutor
2021
from nemo_run.core.execution.slurm import SlurmExecutor
2122
from nemo_run.core.frontend.console.api import configure_logging
23+
from nemo_run.run.ray.lepton import LeptonRayCluster
2224
from nemo_run.run.ray.slurm import SlurmRayCluster
2325

2426
# Import guard for Kubernetes dependencies
@@ -43,6 +45,7 @@ def __post_init__(self):
4345
configure_logging(level=self.log_level)
4446
backend_map: dict[Type[Executor], Type] = {
4547
SlurmExecutor: SlurmRayCluster,
48+
LeptonExecutor: LeptonRayCluster,
4649
}
4750

4851
if _KUBERAY_AVAILABLE and KubeRayExecutor is not None and KubeRayCluster is not None:

nemo_run/run/ray/job.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
from typing import Any, Optional, Type
1818

1919
from nemo_run.core.execution.base import Executor
20+
from nemo_run.core.execution.lepton import LeptonExecutor
2021
from nemo_run.core.execution.slurm import SlurmExecutor
2122
from nemo_run.core.frontend.console.api import configure_logging
23+
from nemo_run.run.ray.lepton import LeptonRayJob
2224
from nemo_run.run.ray.slurm import SlurmRayJob
2325

2426
# Import guard for Kubernetes dependencies
@@ -41,10 +43,13 @@ class RayJob:
4143
executor: Executor
4244
pre_ray_start_commands: Optional[list[str]] = None
4345
log_level: str = "INFO"
46+
cluster_name: Optional[str] = None # Used to connect to existing RayCluster
47+
cluster_ready_timeout: Optional[int] = 1800 # Only used for LeptonRayJob
4448

4549
def __post_init__(self) -> None: # noqa: D401 – simple implementation
4650
configure_logging(level=self.log_level)
4751
backend_map: dict[Type[Executor], Type[Any]] = {
52+
LeptonExecutor: LeptonRayJob,
4853
SlurmExecutor: SlurmRayJob,
4954
}
5055

@@ -57,6 +62,10 @@ def __post_init__(self) -> None: # noqa: D401 – simple implementation
5762
backend_cls = backend_map[self.executor.__class__]
5863
self.backend = backend_cls(name=self.name, executor=self.executor)
5964

65+
if isinstance(self.executor, LeptonExecutor):
66+
self.backend.cluster_name = self.cluster_name
67+
self.backend.cluster_ready_timeout = self.cluster_ready_timeout
68+
6069
# ------------------------------------------------------------------
6170
# Public API
6271
# ------------------------------------------------------------------
@@ -84,8 +93,8 @@ def start(
8493
dryrun=dryrun,
8594
)
8695

87-
def stop(self) -> None:
88-
self.backend.stop() # type: ignore[attr-defined]
96+
def stop(self, wait: bool = False) -> None:
97+
self.backend.stop(wait=wait) # type: ignore[attr-defined]
8998

9099
def status(self, display: bool = True):
91100
return self.backend.status(display=display) # type: ignore[attr-defined]

0 commit comments

Comments
 (0)