|
| 1 | +## Summary |
| 2 | +### General Motivation |
| 3 | + |
| 4 | +Supporting running Ray applications on spark cluster / databricks runtime, |
| 5 | +Spark is a popular distributed computing framework and is used widely, |
| 6 | +If supporting running Ray applications on spark cluster, user don't need to |
| 7 | +setup a standalone ray cluster and it allows ray workloads and spark workloads |
| 8 | +runs together. |
| 9 | + |
| 10 | +#### Key requirements: |
| 11 | +- Ray application resources (cpu/gpu/memory) allocation must respect spark job resource allocation. |
| 12 | +- Little overhead compared with native Ray cluster. |
| 13 | +- (Optional) Ray UI portal support. |
| 14 | + |
| 15 | +### Should this change be within `ray` or outside? |
| 16 | + |
| 17 | +Yes. For better code maintemance. |
| 18 | + |
| 19 | +## Stewardship |
| 20 | +### Required Reviewers |
| 21 | + |
| 22 | +- @jjyao |
| 23 | +- @ericl |
| 24 | + |
| 25 | +### Shepherd of the Proposal (should be a senior committer) |
| 26 | + |
| 27 | +- @jjyao |
| 28 | + |
| 29 | +## Design and Architecture |
| 30 | + |
| 31 | +### Prerequisites |
| 32 | + |
| 33 | +- The user have an active Spark cluster (apache/spark >= 3.3) |
| 34 | + |
| 35 | +- The user must install Ray packages on every node of the Spark cluster, e.g. via: |
| 36 | + |
| 37 | + ``` |
| 38 | + pip install ray |
| 39 | + ``` |
| 40 | + |
| 41 | + If the user's workload requires Ray add-ons, they must also be installed, e.g. via |
| 42 | + |
| 43 | + ``` |
| 44 | + pip install "ray[debug,dashboard,tune,rllib,serve]" |
| 45 | + ``` |
| 46 | + |
| 47 | +### How to setup Ray cluster over spark cluster ? |
| 48 | + |
| 49 | +The architecture of a Spark cluster is as follows: |
| 50 | + |
| 51 | + |
| 52 | +We intend to represent each Ray worker node as a long-running Spark |
| 53 | +task in which Ray worker nodes have the same set of resources as the Spark task initiating each of |
| 54 | +the Ray workers. Multiple tasks can be run on a single Spark worker node, meaning that |
| 55 | +multiple Ray worker nodes can run on a single Spark worker node. |
| 56 | +Finally, we intend to run the Ray head node on the Spark driver node with a fixed resource allocation. |
| 57 | + |
| 58 | +Note that if multiple ray-node-per-spark-worker issues occur (whether due to shared object store |
| 59 | +location for workers, dashboard visualization confusion, or other unforeseen issues), the implementation |
| 60 | +can be modified to only permit a single Ray cluster per Spark cluster. |
| 61 | + |
| 62 | +To clarify further, the following example demonstrates how a Ray cluster with 16 total worker CPU |
| 63 | +cores can be launched on a Spark cluster, assuming that each Ray node requires 4 CPU cores and |
| 64 | +10GB of memory: |
| 65 | + |
| 66 | +1. On the Spark driver node, launch the Ray head node as follows: |
| 67 | + |
| 68 | +``` |
| 69 | +ray start --head --num-cpus=0 |
| 70 | +``` |
| 71 | + |
| 72 | +This ensures that CPU processing tasks will not execute on the head node. |
| 73 | + |
| 74 | +2. Create a Spark job that executes all tasks in the spark job in standard map-reduce mode. Each |
| 75 | +task is allocated a fixed number of CPUs and a fixed amount of memory. In this example, we will |
| 76 | +allocate 4 CPU cores to each Spark task and at least 10GB of memory (by ensuring that each Spark |
| 77 | +worker node has at least 10GB of memory reserved for every set of 4 CPU cores, as computed by |
| 78 | +``SPARK_WORKER_MEMORY`` / ``TASK_SLOTS_PER_WORKER`` under the assumption that the |
| 79 | +``spark.task.cpus`` configuration is set to ``4``). |
| 80 | + |
| 81 | +3. In each constituent Spark task, launch one Ray worker node and allocate to it |
| 82 | +the full set of resources available to the Spark task (4 CPUs, 10GB of memory). Keep the Spark task |
| 83 | +running until the Ray cluster is destroyed. The command used to start each Ray worker node is as |
| 84 | +follows: |
| 85 | + |
| 86 | +``` |
| 87 | +ray start --num-cpus=X --num-gpus=Y --memory=Z --object-store-memory=M --address={head_ip}:{port} |
| 88 | +``` |
| 89 | + |
| 90 | +4. After the Ray cluster is launched, the user's Ray application(s) can be submitted to the Ray |
| 91 | +cluster via the Ray head node address / port. |
| 92 | + |
| 93 | +5. To shut down the Ray cluster, cancel the Spark job and terminate all Ray node |
| 94 | +services that are running on the Spark driver and Spark workers. |
| 95 | + |
| 96 | +Finally, by adjusting the resource allocation for Ray nodes and Spark tasks, this approach enables |
| 97 | +users to run multiple Ray clusters in isolation of each other on a single Spark cluster, if desired. |
| 98 | + |
| 99 | + |
| 100 | +### Key questions |
| 101 | + |
| 102 | + |
| 103 | +#### Launch Ray head node on spark driver node or spark task side ? |
| 104 | + |
| 105 | +The head node will be initialized on the Spark Driver. To ensure that tasks will not be submitted |
| 106 | +to this node, we will configure `num-cpus`=0 for the head node. |
| 107 | +Additionally, in order to ensure that the head node is running on hardware sufficient to ensure |
| 108 | +that requisite ray system processes, a validation of minimum hardware configuration requirements will |
| 109 | +be performed prior to initialization of the ray cluster (minimum CPU cores, memory requirements) |
| 110 | + |
| 111 | + |
| 112 | +#### Shall we make ray worker node the same shape (assigned with the same resources amount) ? |
| 113 | +Yes. Otherwise, the Ray cluster setup will be nondeterministic, |
| 114 | +and you could get very strange results with bad luck on the node sizing. |
| 115 | + |
| 116 | + |
| 117 | +#### Do Ray nodes have a 1:1 mapping with Spark tasks or with Spark workers? |
| 118 | +In each Spark task, we will launch one Ray worker node. All spark tasks are allocated the same |
| 119 | +number of resources. As a result, all Ray worker nodes will have homogeneous resources because Spark |
| 120 | +task resource configurations must be homogeneous. |
| 121 | + |
| 122 | +#### What is the recommended minimal resource allocation of a Ray node? |
| 123 | + |
| 124 | +Each Ray node should have at least 4 CPU cores and 10GB of available memory. This corresponds to |
| 125 | +a Spark task configuration of: |
| 126 | + |
| 127 | +- ``spark.task.cpus >= 4`` and |
| 128 | +- ``(SPARK_WORKER_MEMORY / TASK_SLOTS_PER_SPARK_WORKER) >= 10GB`` |
| 129 | + |
| 130 | + |
| 131 | +#### On a shared spark cluster, shall we let each user launch individual ray cluster, or one user can create a shared ray cluster and other user can also use it ? |
| 132 | +I think we can provide 2 API to support both cases. |
| 133 | +Shared mode: |
| 134 | +```ray.spark.get_or_create_shared_cluster``` |
| 135 | +and |
| 136 | +```ray.spark.init_private_cluster``` |
| 137 | + |
| 138 | +And if one shared mode ray cluster already exists, new ray cluster is not allowed to be launched. |
| 139 | + |
| 140 | + |
| 141 | +#### How to select the port number used by Ray node ? |
| 142 | +Ray node requires listening on several port, a spark cluster might be shared by many users, |
| 143 | +each users might setup their own ray cluster concurrently, to avoid port conflicts, |
| 144 | +we can randomly select free port and start Ray node service, |
| 145 | +if failed we can retry on another free port. |
| 146 | + |
| 147 | + |
| 148 | +#### How much memory shall we allocate to Ray node service ( set via ray script --memory option) ? |
| 149 | +Spark does not provide explicit API for getting task allowed memory, |
| 150 | +SPARK_WORKER_MEMORY / SPARK_WORKER_CORES * RAY_NODE_NUM_CPUS |
| 151 | + |
| 152 | + |
| 153 | +#### How will we allocate memory for the Ray object store? |
| 154 | +We will calculate the memory reserved for the Ray object store in each Ray node as follows: |
| 155 | + |
| 156 | +1. Calculate the available space in the ``/dev/shm`` mount. |
| 157 | +2. Divide the ``/dev/shm`` available space by the number of Spark tasks (Ray nodes) that can run |
| 158 | + concurrently on a single Spark worker node, obtaining an upper bound on the per-Ray-node object |
| 159 | + store memory allocation. |
| 160 | +3. To provide a buffer and guard against accidental overutilization of mount space, multiply the |
| 161 | + upper bound from (2) by ``0.8``. |
| 162 | + |
| 163 | +This allocation will ensure that we do not exhaust ``/dev/shm`` storage and accidentally spill to |
| 164 | +disk, which would hurt performance substantially. |
| 165 | + |
| 166 | + |
| 167 | +#### How to make ray respect spark GPU resource scheduling ? |
| 168 | +In spark task, we can get GPU IDs allocated to this task, so, when launching |
| 169 | +Ray node, besides specifying `--num-gpus` options, we need to specify `CUDA_VISIBLE_DEVICES` |
| 170 | +environment so that we can restrict Ray node only uses the GPUs allocated to corresponding spark tasks. |
| 171 | + |
| 172 | + |
| 173 | +#### How to support all options of ray start |
| 174 | +Provide a ray_node_options argument (dict type). |
| 175 | + |
| 176 | + |
| 177 | +#### Where should the code live: ray repo or spark repo? |
| 178 | +Ray repo. |
| 179 | +Putting in spark repo is hard, It would be very hard to pass vote. |
| 180 | +Past examples includes horovod, petastorm, xgboost, tensorflow-spark, all of them tried to put in spark repo but failed. |
| 181 | + |
| 182 | + |
| 183 | +#### Does it support autoscaling ? |
| 184 | +We will be investigating and testing the feasibility of supporting autoscaling of ray worker nodes as |
| 185 | +additional spark task slots become available upon the initiation of additional spark worker nodes. |
| 186 | + |
| 187 | +#### What is the fault tolerance of Barrier Execution mode and how are failures handled? |
| 188 | +The current design uses standard job scheduling in Spark and will initiate task submission to new |
| 189 | +Spark worker nodes in the event that a task group fails, similar to the functionality within Spark |
| 190 | +during a task retry. |
| 191 | + |
| 192 | +#### What’s the level of isolation spark provides ? |
| 193 | +Spark task execution provides process-level isolation. |
| 194 | +No VM/container level isolation. |
| 195 | + |
| 196 | + |
| 197 | +#### Custom resources support |
| 198 | +TPUs, specific accelerators, etc. |
| 199 | +Uncommon scenario, We can do it later?. Spark doesn't support this, spark can only schedule CPU, GPU, memory. |
| 200 | + |
| 201 | + |
| 202 | +### API Proposal |
| 203 | + |
| 204 | +#### Initialize ray cluster on spark API |
| 205 | + |
| 206 | +``` |
| 207 | +ray_cluster_on_spark = ray.spark.init_cluster(num_cpus, num_gpus, memory) |
| 208 | +``` |
| 209 | + |
| 210 | +Or |
| 211 | + |
| 212 | +``` |
| 213 | +ray_cluster_on_spark = ray.spark.init_cluster(num_spark_tasks) |
| 214 | +``` |
| 215 | + |
| 216 | +Initialize a ray cluster on the spark cluster, the arguments specified the ray cluster can use how much cpus / gpus / memory. |
| 217 | +And connect to the cluster. |
| 218 | + |
| 219 | +Returns an instance of type `RayClusterOnSpark`` |
| 220 | + |
| 221 | +The ray head node may be of a different configuration compared to that of the worker nodes, but |
| 222 | +the ray workers will be homogenous with respect to CPU / GPU and memory available for heap and |
| 223 | +object store utilization. |
| 224 | +A best-effort mapping of available Spark cluster resources to requested ray cluster resources for |
| 225 | +worker nodes will be performed. A validation check during initialization will be done to: |
| 226 | +- In `safe_mode'=True raise an Exception if the configured resources are insufficient, providing detailed reconciliation steps for the user. |
| 227 | +- In `safe_mode`=False, log a warning of potentially insufficient resources with instructions on how to configure the Spark cluster to avoid this situation. |
| 228 | + |
| 229 | + |
| 230 | +e.g., your case: (8-CPU nodes, or 4-GPU nodes), |
| 231 | +suppose on a spark cluster with config: |
| 232 | + |
| 233 | +spark.task.cpus 2 # means 2 cpu cores per spark task |
| 234 | +spark.task.resource.gpu.amount 1 # means 1 GPU per spark task |
| 235 | +Then the Ray on spark routine will create a spark job with 4 spark tasks running concurrently, |
| 236 | +which means it books (8-CPU nodes, or 4-GPU nodes) resources for ray cluster. |
| 237 | + |
| 238 | + |
| 239 | +### Shutdown ray cluster on spark |
| 240 | + |
| 241 | +When user want to shutdown the ray cluster, he can call: |
| 242 | + |
| 243 | +``` |
| 244 | +ray_cluster_on_spark.shutdown() |
| 245 | +``` |
| 246 | + |
| 247 | +It will terminate the ray cluster. |
| 248 | +On databricks notebook, we can make databricks runtime automatically calls `ray_cluster_handler.shutdown()` when a notebook is detached from spark cluster. We need to install a hook to achieve this. |
| 249 | + |
| 250 | + |
| 251 | +## Compatibility, Deprecation, and Migration Plan |
| 252 | + |
| 253 | +Support apache/spark >= 3.3 and latest ray version. |
| 254 | + |
| 255 | + |
| 256 | +## Test Plan and Acceptance Criteria |
| 257 | + |
| 258 | +### Test Plan |
| 259 | + |
| 260 | +- Setup ray cluster on spark cluster of configs and then run ray applications |
| 261 | + - 1 cpu per spark task |
| 262 | + - 1 cpu and 1 gpu per spark task |
| 263 | + - multiple cpu / gups per spark task |
| 264 | + |
| 265 | +- Concurrently setup multiple ray cluster on a spark cluster |
| 266 | + |
| 267 | +### Acceptance Criteria |
| 268 | + |
| 269 | +- Ray application comply with spark cluster resource (cpu / gpu / memory) allocation. |
| 270 | +- Less than 30% performance overhead. |
0 commit comments