forked from NVIDIA-NeMo/RL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathray.sub
More file actions
303 lines (269 loc) · 11 KB
/
ray.sub
File metadata and controls
303 lines (269 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --exclusive
#SBATCH --account=ACCOUNT
#SBATCH --job-name=JOB_NAME
#SBATCH --partition=PARTITION
#SBATCH --time=1:0:0
#SBATCH --dependency=singleton
#SBATCH --gres=gpu:8
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eoux pipefail
########################################################
# User defined variables
########################################################
CONTAINER=$CONTAINER
MOUNTS=$MOUNTS
COMMAND=${COMMAND:-} # This is a script relative to the SLURM_SUBMIT_DIR. If left empty, it will leave the cluster idle after it's brought up.
########################################################
# Ports for all nodes (should be odd numbers since we place head/worker[0] on the same node) so all workers get the odd ports, but the head will get +1 the ports
NODE_MANAGER_PORT=${NODE_MANAGER_PORT:-53001}
OBJECT_MANAGER_PORT=${OBJECT_MANAGER_PORT:-53003}
RUNTIME_ENV_AGENT_PORT=${RUNTIME_ENV_AGENT_PORT:-53005}
DASHBOARD_AGENT_GRPC_PORT=${DASHBOARD_AGENT_GRPC_PORT:-53007}
METRICS_EXPORT_PORT=${METRICS_EXPORT_PORT:-53009}
# Ports for the head node
PORT=${PORT:-6379}
RAY_CLIENT_SERVER_PORT=${RAY_CLIENT_SERVER_PORT:-10001}
#REDIT_SHARD_PORTS=${REDIT_SHARD_PORTS:-"random"} ??
DASHBOARD_GRPC_PORT=${DASHBOARD_GRPC_PORT:-52367}
DASHBOARD_PORT=${DASHBOARD_PORT:-8265} # Also used by debugger
DASHBOARD_AGENT_LISTEN_PORT=${DASHBOARD_AGENT_LISTEN_PORT:-52365}
# On our clusters, the largest port range on an idle worker appeared between 52369-64607
# (not including the other ports set by this script). So this range is chosen to be
# somewhere in the middle
MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001}
MAX_WORKER_PORT=${MAX_WORKER_PORT:-54257}
########################################################
# Unset UV_CACHE_DIR to avoid local cache directory interferring with the container cache
unset UV_CACHE_DIR
if [[ -n "${UV_CACHE_DIR_OVERRIDE:-}" ]]; then
mkdir -p "$UV_CACHE_DIR_OVERRIDE"
if [[ -n $MOUNTS ]]; then
MOUNTS+=",$UV_CACHE_DIR_OVERRIDE:/root/.cache/uv"
else
MOUNTS="$UV_CACHE_DIR_OVERRIDE:/root/.cache/uv"
fi
fi
# Create logs directory
BASE_LOG_DIR=${BASE_LOG_DIR:-$SLURM_SUBMIT_DIR}
LOG_DIR="$BASE_LOG_DIR/$SLURM_JOB_ID-logs"
mkdir -p $LOG_DIR
COMMON_SRUN_ARGS=""
COMMON_SRUN_ARGS+=" --no-container-mount-home"
COMMON_SRUN_ARGS+=" --mpi=pmix"
COMMON_SRUN_ARGS+=" --container-mounts=$MOUNTS"
COMMON_SRUN_ARGS+=" --container-image=$CONTAINER"
COMMON_SRUN_ARGS+=" --container-workdir=$SLURM_SUBMIT_DIR"
# TODO: delete these (just for debugging)
COMMON_SRUN_ARGS+=" -p $SLURM_JOB_PARTITION"
COMMON_SRUN_ARGS+=" -A $SLURM_JOB_ACCOUNT"
COMMON_SRUN_ARGS+=" --gres=gpu:8"
# Number of GPUs per worker node
GPUS_PER_NODE=${GPUS_PER_NODE:-8}
# Number of CPUs per worker node
CPUS_PER_WORKER=${CPUS_PER_WORKER:-$((GPUS_PER_NODE * 16))}
num_retries=3
# Getting the node names and IP addresses in the SLURM allocation
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
ip_addresses_array=()
for node in $nodes; do
ip_address=$(host $node | awk '/has address/ { print $4 }')
# Add the IP address to the array
ip_addresses_array+=("$ip_address")
done
head_node=${nodes_array[0]}
head_node_ip=${ip_addresses_array[0]}
ip_head=$head_node_ip:$PORT
# First we start the head of the ray cluster on one of the physical nodes
# Give the head node actual resources to make it schedulable
head_cmd=$(cat <<EOF
# Touch a file to indicate that the head node has started
# Overlapping srun commands will check this file to determine if we can overlap a container command
touch $LOG_DIR/STARTED_RAY_HEAD
env
exit-dramatically() {
# Use SIGTERM to forcefully terminate the srun process
pkill -P $$ || true
kill -TERM 0 || true
# As a last resort, exit with a non-zero code
exit 1
}
export -f exit-dramatically
# Background process to check for ENDED file
monitor-sidecar() {
set +x
while true; do
sleep 60
if [[ -f "$LOG_DIR/ENDED" ]]; then
echo "Detected ENDED file, terminating..."
exit-dramatically
fi
done
}
monitor-sidecar &
cat <<EOFINNER | tee /launch-head.sh
ray start --head \
--disable-usage-stats \
--resources="{\"worker_units\": $GPUS_PER_NODE, \"slurm_managed_ray_cluster\": 1}" \
--node-ip-address="$head_node_ip" \
--port=${PORT} \
--ray-client-server-port=${RAY_CLIENT_SERVER_PORT} \
--dashboard-grpc-port=${DASHBOARD_GRPC_PORT} \
--dashboard-port=${DASHBOARD_PORT} \
\
--node-manager-port=$((${NODE_MANAGER_PORT} + 1)) \
--object-manager-port=$((${OBJECT_MANAGER_PORT} + 1)) \
--runtime-env-agent-port=$((${RUNTIME_ENV_AGENT_PORT} + 1)) \
--dashboard-agent-grpc-port=$((${DASHBOARD_AGENT_GRPC_PORT} + 1)) \
--dashboard-agent-listen-port=$((${DASHBOARD_AGENT_LISTEN_PORT} + 1)) \
--metrics-export-port=$((${METRICS_EXPORT_PORT} + 1)) \
\
--block
EOFINNER
chmod +x /launch-head.sh
count=0
while [[ \$count -lt $num_retries ]]; do
bash /launch-head.sh
count=\$((count+1))
echo "Head node failed \$count/$num_retries times, restarting in 5 seconds..."
sleep 5
done
touch $LOG_DIR/ENDED
exit 1
EOF
)
srun $COMMON_SRUN_ARGS --container-name=ray-head --nodes=1 --ntasks=1 --cpus-per-task=$CPUS_PER_WORKER -w "$head_node" -o $LOG_DIR/ray-head.log bash -x -c "$head_cmd" &
NUM_ACTORS=$((GPUS_PER_NODE * SLURM_JOB_NUM_NODES))
# Start Ray worker nodes
# We want 1 Ray worker node per physical node (excluding the head node)
# Worker nodes are started with ray start but without the --head flag
# Start from node 1 since node 0 is running the head
for ((i = 1; i < SLURM_JOB_NUM_NODES; i++)); do
node_i=${nodes_array[$i]}
worker_cmd=$(cat <<EOF
env
exit-dramatically() {
# Use SIGTERM to forcefully terminate the srun process
pkill -P $$ || true
kill -TERM 0 || true
# As a last resort, exit with a non-zero code
exit 1
}
# Background process to check for ENDED file
monitor-sidecar() {
set +x
while true; do
sleep 60
if [[ -f "$LOG_DIR/ENDED" ]]; then
echo "Detected ENDED file, terminating..."
exit-dramatically
fi
done
}
monitor-sidecar &
cat <<EOFINNER | tee /launch-worker.sh
ray start --address "$ip_head" \
--disable-usage-stats \
--resources="{\"worker_units\": $GPUS_PER_NODE, \"slurm_managed_ray_cluster\": 1}" \
--min-worker-port=${MIN_WORKER_PORT} \
--max-worker-port=${MAX_WORKER_PORT} \
\
--node-manager-port=${NODE_MANAGER_PORT} \
--object-manager-port=${OBJECT_MANAGER_PORT} \
--runtime-env-agent-port=${RUNTIME_ENV_AGENT_PORT} \
--dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \
--dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \
--metrics-export-port=${METRICS_EXPORT_PORT} \
\
--block
EOFINNER
count=0
while [[ \$count -lt $num_retries ]]; do
bash /launch-worker.sh
count=\$((count+1))
echo "Worker failed \$count/$num_retries times, restarting in 5 seconds..."
sleep 5
done
touch $LOG_DIR/ENDED
exit 1
EOF
)
srun $COMMON_SRUN_ARGS --container-name=ray-worker-$i --exact --nodes=1 --ntasks=1 --cpus-per-task=$CPUS_PER_WORKER -w "$node_i" -o $LOG_DIR/ray-worker-$i.log bash -x -c "$worker_cmd" &
sleep 3
done
# Then we wait here for the file to be created by the head node container
while ! srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STARTED_RAY_HEAD; do
echo "[INFO][$(date)] Waiting for head node container to start..."
sleep 2
done
# At this stage the Ray cluster bringup has started on the physical nodes in the allocation
# Before we launch a job on this cluster we need to make sure that the bringup is complete
# We do so by querying the number of worker_units in the ray cluster and asserting = NUM_ACTORS
extract_worker_units() {
status_output=$(srun --overlap --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" ray status)
if echo "$status_output" | grep -q "worker_units"; then
worker_units=$(echo "$status_output" | grep "worker_units" | awk -F'[/. ]' '{print $4}')
echo $worker_units
else
echo 0
fi
}
# Poll to make sure that all Ray worker nodes have connected to the head.
# All workers have connected when number of GPUs in ray cluster
# is equal to NUM_ACTORS. We use the utility function above
# to check how many GPUs have come online in the ray cluster
while true; do
worker_units=$(extract_worker_units)
echo "[INFO] Number of actors online: $worker_units/$NUM_ACTORS"
if [ "$worker_units" -eq "$NUM_ACTORS" ]; then
break
fi
sleep 2
done
echo "All workers connected!"
# We can now launch a job on this cluster
# We do so by launching a driver process on the physical node that the head node is on
# This driver process is responsible for launching a job on the Ray cluster
CONTAINER_CWD=$(scontrol show job $SLURM_JOB_ID --json | jq -r '.jobs[].current_working_directory')
if [[ -n "$COMMAND" ]]; then
srun --no-container-mount-home --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/ray-driver.log bash -c "$COMMAND"
else
echo "[INFO]: Ray Cluster is idled, run this on the slurm head node to get a shell to the head node:"
cat <<EOF >$SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh
# No args launches on the head node (node 0)
# Args 1-N launch on worker nodes (nodes 1 through N-1)
WORKER_NUM=\${1:-}
if [[ -z "\$WORKER_NUM" ]]; then
# Empty means we are on the head node
srun --no-container-mount-home --gres=gpu:8 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-head --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "$head_node" --jobid $SLURM_JOB_ID --pty bash
else
# Worker numbers 1 through N-1 correspond to ray-worker-1 through ray-worker-(N-1)
# and use nodes_array[1] through nodes_array[N-1]
if [[ \$WORKER_NUM -lt 1 || \$WORKER_NUM -ge $SLURM_JOB_NUM_NODES ]]; then
echo "Error: WORKER_NUM must be between 1 and $((SLURM_JOB_NUM_NODES-1))"
exit 1
fi
nodes_array=($nodes)
srun --no-container-mount-home --gres=gpu:8 -A $SLURM_JOB_ACCOUNT -p $SLURM_JOB_PARTITION --overlap --container-name=ray-worker-\$WORKER_NUM --container-workdir=$CONTAINER_CWD --nodes=1 --ntasks=1 -w "\${nodes_array[\$WORKER_NUM]}" --jobid $SLURM_JOB_ID --pty bash
fi
EOF
chmod +x $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh
echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh # to attach to head node (i.e., 'worker 0')"
echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 1 # to attach to worker 1"
echo " bash $SLURM_SUBMIT_DIR/${SLURM_JOB_ID}-attach.sh 2 # to attach to worker 2, etc."
sleep infinity
fi