|
| 1 | +# Ray Clusters & Jobs |
| 2 | + |
| 3 | +--- |
| 4 | + |
| 5 | +> **Audience**: You already know how to configure executors with NeMo-Run and want distributed *Ray* on either Kubernetes **or** Slurm. |
| 6 | +> |
| 7 | +> **TL;DR**: `RayCluster` manages the _cluster_; `RayJob` submits a job with an ephemeral cluster. Everything else is syntactic sugar. |
| 8 | +
|
| 9 | +## RayCluster vs. RayJob – which one do I need? |
| 10 | + |
| 11 | +| Aspect | RayCluster (interactive) | RayJob (batch) | |
| 12 | +|--------|--------------------------|----------------| |
| 13 | +| Cluster lifetime | Remains until you call `.stop()` | Ephemeral – cluster auto-deletes after the job finishes | |
| 14 | +| Spin-up cost per run | Paid **once** (reuse for many jobs) | Paid **per** submission | |
| 15 | +| Multiple jobs on same cluster | Yes | No (one job per submission) | |
| 16 | +| Dashboard access | `.port_forward()` to open | Not exposed by default | |
| 17 | +| Best for | Debugging, notebooks, iterative dev, hyper-param sweeps that reuse workers | CI/CD pipelines, scheduled training/eval, one-off runs | |
| 18 | +| Resource efficiency | Great when you launch many jobs interactively | Great when you just need results & want resources freed asap | |
| 19 | + |
| 20 | +**Rules of thumb** |
| 21 | + |
| 22 | +• Pick **RayCluster** when you want a long-lived playground: start it once, poke around with the Ray CLI or a Jupyter notebook, submit multiple Ray Jobs yourself, and tear it down when you're done. |
| 23 | + |
| 24 | +• Pick **RayJob** when you simply need *"run this script with N GPUs and tell me when you're done"* – the backend spins up a transient cluster, runs the entrypoint, collects logs/status, and cleans everything up automatically. |
| 25 | + |
| 26 | +## 1. Mental model |
| 27 | + |
| 28 | +| Object | What it abstracts | Back-ends supported | |
| 29 | +|-------------|-------------------|---------------------| |
| 30 | +| `run.ray.cluster.RayCluster` | Lifecycle of a Ray **cluster** (create ⇒ wait ⇢ status ⇢ port-forward ⇢ delete). | `KubeRayExecutor`, `SlurmExecutor` | |
| 31 | +| `run.ray.job.RayJob` | Lifecycle of a Ray **job** (submit ⇒ monitor ⇢ logs ⇢ cancel). | same | |
| 32 | + |
| 33 | +The two helpers share a uniform API; the chosen *Executor* decides whether we talk to the **KubeRay** operator (K8s) or a **Slurm** job under the hood. |
| 34 | + |
| 35 | +```mermaid |
| 36 | +classDiagram |
| 37 | + RayCluster <|-- KubeRayCluster |
| 38 | + RayCluster <|-- SlurmRayCluster |
| 39 | + RayJob <|-- KubeRayJob |
| 40 | + RayJob <|-- SlurmRayJob |
| 41 | +``` |
| 42 | + |
| 43 | +## 2. KubeRay quick-start |
| 44 | + |
| 45 | +```python |
| 46 | +from nemo_run.core.execution.kuberay import KubeRayExecutor, KubeRayWorkerGroup |
| 47 | +from nemo_run.run.ray.cluster import RayCluster |
| 48 | +from nemo_run.run.ray.job import RayJob |
| 49 | + |
| 50 | +# 1) Configure a KubeRay executor (resources + cluster policy) |
| 51 | +executor = KubeRayExecutor( |
| 52 | + namespace="my-k8s-namespace", |
| 53 | + ray_version="2.43.0", |
| 54 | + image="anyscale/ray:2.43.0-py312-cu125", |
| 55 | + head_cpu="4", |
| 56 | + head_memory="12Gi", |
| 57 | + worker_groups=[ |
| 58 | + KubeRayWorkerGroup( |
| 59 | + group_name="worker", # arbitrary string |
| 60 | + replicas=2, # two worker pods |
| 61 | + gpus_per_worker=8, |
| 62 | + ) |
| 63 | + ], |
| 64 | + # Optional tweaks ---------------------------------------------------- |
| 65 | + reuse_volumes_in_worker_groups=True, # mount PVCs on workers too |
| 66 | + spec_kwargs={"schedulerName": "runai-scheduler"}, # e.g. Run:ai |
| 67 | + volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], |
| 68 | + volumes=[{ |
| 69 | + "name": "workspace", |
| 70 | + "persistentVolumeClaim": {"claimName": "my-workspace-pvc"}, |
| 71 | + }], |
| 72 | + env_vars={ |
| 73 | + "UV_PROJECT_ENVIRONMENT": "/home/ray/venvs/driver", |
| 74 | + "NEMO_RL_VENV_DIR": "/home/ray/venvs", |
| 75 | + "HF_HOME": "/workspace/hf_cache", |
| 76 | + }, |
| 77 | + container_kwargs={ |
| 78 | + "securityContext": { |
| 79 | + "allowPrivilegeEscalation": False, |
| 80 | + "runAsUser": 0, |
| 81 | + } |
| 82 | + }, |
| 83 | +) |
| 84 | + |
| 85 | +# 2) Commands executed in EVERY Ray container before the daemon starts |
| 86 | +pre_ray_start = [ |
| 87 | + "pip install uv", |
| 88 | + "echo 'unset RAY_RUNTIME_ENV_HOOK' >> /home/ray/.bashrc", |
| 89 | +] |
| 90 | + |
| 91 | +# 3) Spin-up the cluster & expose the dashboard |
| 92 | +cluster = RayCluster(name="demo-kuberay-cluster", executor=executor) |
| 93 | +cluster.start(timeout=900, pre_ray_start_commands=pre_ray_start) |
| 94 | +cluster.port_forward(port=8265, target_port=8265, wait=False) # dashboard → http://localhost:8265 |
| 95 | + |
| 96 | +# 4) Submit a Ray Job that runs inside that cluster |
| 97 | +job = RayJob(name="demo-kuberay-job", executor=executor) |
| 98 | +job.start( |
| 99 | + command="uv run python examples/train.py --config cfgs/train.yaml", |
| 100 | + workdir="/path/to/project/", # synced to PVC automatically |
| 101 | + runtime_env_yaml="/path/to/runtime_env.yaml", # optional |
| 102 | + pre_ray_start_commands=pre_ray_start, |
| 103 | +) |
| 104 | +job.logs(follow=True) |
| 105 | + |
| 106 | +# 5) Clean-up |
| 107 | +cluster.stop() |
| 108 | +``` |
| 109 | + |
| 110 | +### Notes |
| 111 | +1. `workdir` is rsync'ed into the first declared `volume_mounts` on the executor, so relative imports *just work*. |
| 112 | +2. Add `pre_ray_start_commands=["apt-get update && …"]` to inject shell snippets that run inside the **head** and **worker** containers **before** Ray starts. |
| 113 | + |
| 114 | +## 3. Slurm quick-start |
| 115 | + |
| 116 | +```python |
| 117 | +import os |
| 118 | +from pathlib import Path |
| 119 | + |
| 120 | +import nemo_run as run |
| 121 | +from nemo_run.core.execution.slurm import SlurmExecutor, SlurmJobDetails, SSHTunnel |
| 122 | +from nemo_run.run.ray.cluster import RayCluster |
| 123 | +from nemo_run.run.ray.job import RayJob |
| 124 | + |
| 125 | +# 1) SSH tunnel to the Slurm login node so we can launch remotely |
| 126 | +ssh = SSHTunnel( |
| 127 | + host="login.my-hpc.com", # public hostname of login node |
| 128 | + user="jdoe", # your cluster username |
| 129 | + job_dir="/scratch/jdoe/runs", # where NeMo-Run stores Ray artefacts like logs, code, etc. |
| 130 | + identity="~/.ssh/id_ed25519", # optional SSH key |
| 131 | +) |
| 132 | + |
| 133 | +# 2) Create a Slurm executor and tweak defaults |
| 134 | +executor = SlurmExecutor( |
| 135 | + account="gpu-dept", |
| 136 | + partition="a100", |
| 137 | + nodes=2, |
| 138 | + gpus_per_node=8, |
| 139 | + gres="gpu:8", |
| 140 | + time="04:00:00", |
| 141 | + container_image="nvcr.io/nvidia/pytorch:24.05-py3", |
| 142 | + container_mounts=["/scratch:/scratch"], |
| 143 | + env_vars={"HF_HOME": "/scratch/hf_cache"}, |
| 144 | + tunnel=ssh, |
| 145 | +) |
| 146 | + |
| 147 | +# Optional: customise where Slurm writes stdout/err |
| 148 | +class CustomJobDetails(SlurmJobDetails): |
| 149 | + @property |
| 150 | + def stdout(self) -> Path: # noqa: D401 – illustrative only |
| 151 | + assert self.folder |
| 152 | + return Path(self.folder) / "sbatch_stdout.out" # Will write sbatch output here. |
| 153 | + |
| 154 | + @property |
| 155 | + def stderr(self) -> Path: # noqa: D401 |
| 156 | + assert self.folder |
| 157 | + return Path(self.folder) / "sbatch_stderr.err" |
| 158 | + |
| 159 | +executor.job_details = CustomJobDetails() |
| 160 | + |
| 161 | +# 3) Commands executed on every node right before Ray starts |
| 162 | +pre_ray_start = [ |
| 163 | + "pip install uv", |
| 164 | +] |
| 165 | + |
| 166 | +# 4) Bring up the Ray cluster (Slurm array job under the hood) |
| 167 | +cluster = RayCluster(name="demo-slurm-ray", executor=executor) |
| 168 | +cluster.start(timeout=1800, pre_ray_start_commands=pre_ray_start) |
| 169 | +cluster.port_forward(port=8265, target_port=8265) # dashboard → http://localhost:8265 |
| 170 | + |
| 171 | +# 5) Submit a Ray job that runs inside the cluster |
| 172 | +job = RayJob(name="demo-slurm-job", executor=executor) |
| 173 | +job.start( |
| 174 | + command="uv run python train.py --config cfgs/train.yaml cluster.num_nodes=2", |
| 175 | + workdir="/path/to/project/", # rsync'ed via SSH to the cluster_dir/code/ |
| 176 | + pre_ray_start_commands=pre_ray_start, |
| 177 | +) |
| 178 | +job.logs(follow=True) |
| 179 | + |
| 180 | +# 6) Tear everything down (or just let the wall-time expire) |
| 181 | +cluster.stop() |
| 182 | +``` |
| 183 | + |
| 184 | +### Tips for Slurm users |
| 185 | +* `executor.packager = run.GitArchivePackager()` if you prefer packaging a git tree instead of rsync. |
| 186 | +* `cluster.port_forward()` opens an SSH tunnel from *your laptop* to the Ray dashboard running on the head node. |
| 187 | + |
| 188 | +## 4. API reference cheat-sheet |
| 189 | + |
| 190 | +```python |
| 191 | +cluster = RayCluster(name, executor) |
| 192 | +cluster.start(wait_until_ready=True, timeout=600, pre_ray_start_commands=["pip install -r …"]) |
| 193 | +cluster.status(display=True) |
| 194 | +cluster.port_forward(port=8265, target_port=8265, wait=False) |
| 195 | +cluster.stop() |
| 196 | + |
| 197 | +job = RayJob(name, executor) |
| 198 | +job.start(command, workdir, runtime_env_yaml=None, pre_ray_start_commands=None) |
| 199 | +job.status() |
| 200 | +job.logs(follow=True) |
| 201 | +job.stop() |
| 202 | +``` |
| 203 | + |
| 204 | +All methods are synchronous and **return immediately** when their work is done; the helpers hide the messy details (kubectl, squeue, ssh, …). |
| 205 | + |
| 206 | +## 5. Rolling your own CLI |
| 207 | + |
| 208 | +Because `RayCluster` and `RayJob` are plain Python, you can compose them inside **argparse**, **Typer**, **Click** – anything. Here is a minimal **argparse** script: |
| 209 | + |
| 210 | +```python |
| 211 | +import argparse |
| 212 | +from nemo_run.core.execution.kuberay import KubeRayExecutor, KubeRayWorkerGroup |
| 213 | +from nemo_run.run.ray.cluster import RayCluster |
| 214 | +from nemo_run.run.ray.job import RayJob |
| 215 | + |
| 216 | + |
| 217 | +def main() -> None: |
| 218 | + parser = argparse.ArgumentParser(description="Submit a Ray job via NeMo-Run") |
| 219 | + parser.add_argument("--name", default="demo", help="Base name for cluster + job") |
| 220 | + parser.add_argument( |
| 221 | + "--image", |
| 222 | + default="anyscale/ray:2.43.0-py312-cu125", |
| 223 | + help="Ray container image", |
| 224 | + ) |
| 225 | + parser.add_argument( |
| 226 | + "--command", |
| 227 | + default="python script.py", |
| 228 | + help="Entrypoint to execute inside Ray job", |
| 229 | + ) |
| 230 | + args = parser.parse_args() |
| 231 | + |
| 232 | + # 1) Build the executor programmatically |
| 233 | + executor = KubeRayExecutor( |
| 234 | + namespace="ml-team", |
| 235 | + ray_version="2.43.0", |
| 236 | + image=args.image, |
| 237 | + worker_groups=[KubeRayWorkerGroup(group_name="worker", replicas=1, gpus_per_worker=8)], |
| 238 | + ) |
| 239 | + |
| 240 | + # 2) Spin up a cluster and keep it for the lifetime of the script |
| 241 | + cluster = RayCluster(name=f"{args.name}-cluster", executor=executor) |
| 242 | + cluster.start() |
| 243 | + |
| 244 | + # 3) Submit a job against that cluster |
| 245 | + job = RayJob(name=f"{args.name}-job", executor=executor) |
| 246 | + job.start(command=args.command, workdir="./") |
| 247 | + |
| 248 | + # 4) Stream logs and block until completion |
| 249 | + job.logs(follow=True) |
| 250 | + |
| 251 | + # 5) Tidy-up |
| 252 | + cluster.stop() |
| 253 | + |
| 254 | + |
| 255 | +if __name__ == "__main__": |
| 256 | + main() |
| 257 | +``` |
| 258 | + |
| 259 | +From there you can wrap the script with `uvx`, bake it into a Docker image, or integrate it into a larger orchestration system – the underlying NeMo-Run APIs stay the same. |
| 260 | + |
| 261 | +--- |
| 262 | + |
| 263 | +Happy distributed hacking! 🚀 |
0 commit comments