Skip to content

Commit 0906128

Browse files
authored
Add doc and tests for mpi (#162)
* add doc and UTs for mpi * fixes * update * fixes lint * fixes * fixes * fixes import
1 parent cd48d0e commit 0906128

File tree

11 files changed

+344
-81
lines changed

11 files changed

+344
-81
lines changed

.github/workflows/raydp.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ jobs:
4646
- name: Install extra dependencies for macOS
4747
if: matrix.os == 'macos-latest'
4848
run: |
49-
brew install libuv libomp
49+
brew install libuv libomp mpich
50+
- name: Install extra dependencies for Ubuntu
51+
if: matrix.os == 'ubuntu-latest'
52+
run: |
53+
sudo apt-get install -y mpich
54+
5055
- name: Cache pip - Ubuntu
5156
if: matrix.os == 'ubuntu-latest'
5257
uses: actions/cache@v2

README.md

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,18 @@
11
# RayDP
22

3-
RayDP is a distributed data processing library that provides simple APIs for running Spark on [Ray](https://github.com/ray-project/ray) and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, RaySGD or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.
4-
5-
### Spark on Ray
6-
7-
RayDP provides an API for starting a Spark job on Ray in your python program without a need to setup a Spark cluster manually. RayDP supports Ray as a Spark resource manger and runs Spark executors in Ray actors. RayDP utilizes Ray's in-memory object store to efficiently exchange data between Spark and other Ray libraries. You can use Spark to read the input data, process the data using SQL, Spark DataFrame, or Pandas (via [Koalas](https://github.com/databricks/koalas)) API, extract and transform features using Spark MLLib, and feed the output to deep learning and machine learning frameworks.
8-
9-
### Integrating Spark with Deep Learning and Machine Learning Frameworks
10-
11-
#### MLDataset API
12-
RayDP provides an API for creating a Ray MLDataset from a Spark dataframe. MLDataset represents a distributed dataset stored in Ray's in-memory object store. It supports transformation on each shard and can be converted to a PyTorch or Tensorflow dataset for distributed training. If you prefer to using Horovod on Ray or RaySGD for distributed training, you can use MLDataset to seamlessly integrate Spark with them.
13-
14-
#### Estimator API
15-
RayDP also provides high level scikit-learn style Estimator APIs for distributed training. The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training.
3+
RayDP is a distributed data processing library that provides simple APIs for running Spark/MPI on [Ray](https://github.com/ray-project/ray) and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, RaySGD or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.
164

175
## Installation
186

197

208
You can install latest RayDP using pip. RayDP requires Ray (>=1.3.0) and PySpark (>=3.0.0). Please also make sure java is installed and JAVA_HOME is set properly.
9+
2110
```shell
2211
pip install raydp
2312
```
2413

2514
Or you can install our nightly build:
15+
2616
```shell
2717
pip install raydp-nightly
2818
```
@@ -34,38 +24,75 @@ If you'd like to build and install the latest master, use the following command:
3424
pip install dist/raydp*.whl
3525
```
3626

37-
## Getting Started
38-
To start a Spark job on Ray, you can use the `raydp.init_spark` API. You can write Spark, PyTorch/Tensorflow, Ray code in the same python program to easily implement an end-to-end pipeline.
27+
## Spark on Ray
28+
29+
RayDP provides an API for starting a Spark job on Ray in your python program without a need to setup a Spark cluster manually. RayDP supports Ray as a Spark resource manger and runs Spark executors in Ray actors. RayDP utilizes Ray's in-memory object store to efficiently exchange data between Spark and other Ray libraries. You can use Spark to read the input data, process the data using SQL, Spark DataFrame, or Pandas (via [Koalas](https://github.com/databricks/koalas)) API, extract and transform features using Spark MLLib, and feed the output to deep learning and machine learning frameworks.
3930

4031
### Classic Spark Word Count Example
41-
After we use RayDP to initialize a Spark cluster, we can use Spark as usual.
32+
33+
To start a Spark job on Ray, you can use the `raydp.init_spark` API. After we use RayDP to initialize a Spark cluster, we can use Spark as usual.
34+
4235
```python
4336
import ray
4437
import raydp
4538

39+
# connect to ray cluster
4640
ray.init(address='auto')
4741

42+
# create a Spark cluster with specified resource requirements
4843
spark = raydp.init_spark('word_count',
4944
num_executors=2,
5045
executor_cores=2,
5146
executor_memory='1G')
5247

48+
# normal data processesing with Spark
5349
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
5450
df.show()
5551
word_count = df.groupBy('word').count()
5652
word_count.show()
5753

54+
# stop the spark cluster
5855
raydp.stop_spark()
5956
```
6057

61-
### Integration with PyTorch
62-
However, combined with other ray components, such as RaySGD and RayServe, we can easily build an end-to-end deep learning pipeline. In this example. we show how to use our estimator API, which is a wrapper around RaySGD, to perform data preprocessing using Spark, and train a model using PyTorch.
58+
### Dynamic Resource Allocation
59+
60+
RayDP now supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
61+
62+
```python
63+
raydp.init_spark(..., configs={"spark.shuffle.service.enabled": "true"})
64+
```
65+
66+
The user-provided config will overwrite those specified in `spark-defaults.conf`. By default Spark will load `spark-defaults.conf` from `$SPARK_HOME/conf`, you can also modify this location by setting `SPARK_CONF_DIR`.
67+
68+
Similarly, you can also enable Dynamic Executor Allocation this way. However, because Ray does not support object ownership tranferring now(1.3.0), you must use Dynamic Executor Allocation with data persistence. You can write the data frame in spark to HDFS as a parquet as shown below:
69+
70+
```python
71+
ds = RayMLDataset.from_spark(..., fs_directory="hdfs://host:port/your/directory")
72+
```
73+
74+
### Spark Submit
75+
76+
RayDP provides a substitute for spark-submit in Apache Spark. You can run your java or scala application on RayDP cluster by using `bin/raydp-submit`. You can add it to `PATH` for convenience. When using `raydp-submit`, you should specify number of executors, number of cores and memory each executor by Spark properties, such as `--conf spark.executor.cores=1`, `--conf spark.executor.instances=1` and `--conf spark.executor.memory=500m`. `raydp-submit` only supports Ray cluster. Spark standalone, Apache Mesos, Apache Yarn are not supported, please use traditional `spark-submit` in that case. For the same reason, you do not need to specify `--master` in the command. Besides, RayDP does not support cluster as deploy-mode.
77+
78+
### Integrating Spark with Deep Learning and Machine Learning Frameworks
79+
80+
Combined with other ray components, such as RaySGD and RayServe, we can easily build an end-to-end deep learning pipeline.
81+
82+
***MLDataset API***
83+
84+
RayDP provides an API for creating a Ray MLDataset from a Spark dataframe. MLDataset represents a distributed dataset stored in Ray's in-memory object store. It supports transformation on each shard and can be converted to a PyTorch or Tensorflow dataset for distributed training. If you prefer to using Horovod on Ray or RaySGD for distributed training, you can use MLDataset to seamlessly integrate Spark with them.
85+
86+
***Estimator API***
87+
88+
RayDP also provides high level scikit-learn style Estimator APIs for distributed training. The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training.
89+
6390
```python
6491
import ray
6592
import raydp
6693
from raydp.torch import TorchEstimator
6794

68-
ray.init()
95+
ray.init(address="auto")
6996
spark = raydp.init_spark(app_name="RayDP example",
7097
num_executors=2,
7198
executor_cores=2,
@@ -86,20 +113,9 @@ estimator.fit_on_spark(train_df)
86113
raydp.stop_spark()
87114
```
88115

89-
## Dynamic Executor Allocation
90-
RayDP now supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
91-
```python
92-
raydp.init_spark(..., configs={"spark.shuffle.service.enabled": "true"})
93-
```
94-
The user-provided config will overwrite those specified in `spark-defaults.conf`. By default Spark will load `spark-defaults.conf` from `$SPARK_HOME/conf`, you can also modify this location by setting `SPARK_CONF_DIR`.
116+
## MPI on Ray
95117

96-
Similarly, you can also enable Dynamic Executor Allocation this way. However, because Ray does not support object ownership tranferring now(1.3.0), you must use Dynamic Executor Allocation with data persistence. You can write the data frame in spark to HDFS as a parquet as shown below:
97-
```python
98-
ds = RayMLDataset.from_spark(..., fs_directory="hdfs://host:port/your/directory")
99-
```
118+
RayDP also provides a simple API to running MPI job on top of Ray. Currently, we support three types of MPI: `intel_mpi`, `openmpi` and `MPICH`. You can refer [doc/mpi.md](./doc/mpi.md) for more details.
100119

101120
## More Examples
102121
Not sure how to use RayDP? Check the `examples` folder. We have added many examples showing how RayDP works together with PyTorch, TensorFlow, XGBoost, Horovod, and so on. If you still cannot find what you want, feel free to post an issue to ask us!
103-
104-
## Spark Submit
105-
RayDP provides a substitute for spark-submit in Apache Spark. You can run your java or scala application on RayDP cluster by using `bin/raydp-submit`. You can add it to `PATH` for convenience. When using `raydp-submit`, you should specify number of executors, number of cores and memory each executor by Spark properties, such as `--conf spark.executor.cores=1`, `--conf spark.executor.instances=1` and `--conf spark.executor.memory=500m`. `raydp-submit` only supports Ray cluster. Spark standalone, Apache Mesos, Apache Yarn are not supported, please use traditional `spark-submit` in that case. For the same reason, you do not need to specify `--master` in the command. Besides, RayDP does not support cluster as deploy-mode.

doc/mpi.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# MPI on Ray
2+
3+
RayDP also provides a simple API to running MPI job on top of Ray. Currently, we support three types of MPI: `intel_mpi`, `openmpi` and `MPICH`. To use the following API, make sure you have installed the given type of MPI on each of Ray worker node.
4+
5+
### API
6+
7+
```python
8+
def create_mpi_job(job_name: str,
9+
world_size: int,
10+
num_cpus_per_process: int,
11+
num_processes_per_node: int,
12+
mpi_script_prepare_fn: Callable = None,
13+
timeout: int = 1,
14+
mpi_type: str = "intel_mpi",
15+
placement_group=None,
16+
placement_group_bundle_indexes: List[int] = None) -> MPIJob:
17+
""" Create a MPI Job
18+
19+
:param job_name: the job name
20+
:param world_size: the world size
21+
:param num_cpus_per_process: num cpus per process, this used to request resource from Ray
22+
:param num_processes_per_node: num processes per node
23+
:param mpi_script_prepare_fn: a function used to create mpi script, it will pass in a
24+
MPIJobcontext instance. It will use the default script if not provides.
25+
:param timeout: the timeout used to wait for job creation
26+
:param mpi_type: the mpi type, now only support openmpi, intel_mpi and mpich
27+
:param placement_group: the placement_group for request mpi resources
28+
:param placement_group_bundle_indexes: this should be equal with
29+
world_size / num_processes_per_node if provides.
30+
"""
31+
```
32+
33+
### Create a simple MPI Job
34+
35+
```python
36+
from raydp.mpi import create_mpi_job, MPIJobContext, WorkerContext
37+
38+
# Define the MPI JOb. We want to create a 4 world_size MPIJob, and each process requires 2 cpus.
39+
# We have set the num_processes_per_node to 2, so the processes will be strictly spread into two nodes.
40+
41+
# You could also to specify the placement group to reserve the resources for MPI job. The num_cpus_per_process
42+
# will be ignored if the placement group is provided. And the size of
43+
# placement_group_bundle_indexes should be equal with world_size // num_processes_per_node.
44+
job = create_mpi_job(job_name="example",
45+
world_size=4,
46+
num_cpus_per_process=2,
47+
num_processes_per_node=2,
48+
timeout=5,
49+
mpi_type="intel_mpi",
50+
placement_group=None,
51+
placement_group_bundle_indexes: List[int] = None)
52+
53+
# Start the MPI Job, this will start up the MPI processes and connect to the ray cluster
54+
job.start()
55+
56+
# define the MPI task function
57+
def func(context: WorkerContext):
58+
return context.job_id
59+
60+
# run the MPI task, this is a blocking operation. And the results is a world_size array.
61+
results = job.run(func)
62+
63+
# stop the MPI job
64+
job.stop()
65+
```
66+
67+
### Use `with` auto start/stop MPIJob
68+
```python
69+
with create_mpi_job(job_name="example",
70+
world_size=4,
71+
num_cpus_per_process=2,
72+
num_processes_per_node=2,
73+
timeout=5,
74+
mpi_type="intel_mpi") as job:
75+
def f(context: WorkerContext):
76+
return context.job_id
77+
results = job.run(f)
78+
```
79+
80+
### Specify the MPI script and environments
81+
82+
You could customize the MPI job environments and MPI scritps with `mpi_script_prepare_fn` argument.
83+
84+
```python
85+
def script_prepare_fn(context: MPIJobContext):
86+
context.add_env("OMP_NUM_THREADS", "2")
87+
default_script = ["mpirun", "--allow-run-as-root", "--tag-output", "-H",
88+
",".join(context.hosts), "-N", f"{context.num_procs_per_node}"]
89+
return default_script
90+
91+
job = create_mpi_job(job_name="example",
92+
world_size=4,
93+
num_cpus_per_process=2,
94+
num_processes_per_node=2,
95+
timeout=5,
96+
mpi_type="intel_mpi",
97+
mpi_script_prepare_fn=script_prepare_fn)
98+
```

python/raydp/mpi/__init__.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
from typing import Callable, List
2020

21-
from raydp.mpi.mpi_job import MPIJob, MPIType, IntelMPIJob, OpenMPIJob, MPIJobContext
21+
from .mpi_job import MPIJob, MPIType, IntelMPIJob, OpenMPIJob, MPICHJob, MPIJobContext
22+
from .mpi_worker import WorkerContext
2223

2324

2425
def _get_mpi_type(mpi_type: str) -> MPIType:
2526
if mpi_type.strip().lower() == "openmpi":
2627
return MPIType.OPEN_MPI
2728
elif mpi_type.strip().lower() == "intel_mpi":
2829
return MPIType.INTEL_MPI
30+
elif mpi_type.strip().lower() == "mpich":
31+
return MPIType.MPICH
2932
else:
3033
return None
3134

@@ -39,16 +42,16 @@ def create_mpi_job(job_name: str,
3942
mpi_type: str = "intel_mpi",
4043
placement_group=None,
4144
placement_group_bundle_indexes: List[int] = None) -> MPIJob:
42-
"""
43-
Create a MPI Job
45+
"""Create a MPI Job
46+
4447
:param job_name: the job name
4548
:param world_size: the world size
4649
:param num_cpus_per_process: num cpus per process, this used to request resource from Ray
4750
:param num_processes_per_node: num processes per node
4851
:param mpi_script_prepare_fn: a function used to create mpi script, it will pass in a
49-
MPIJobcontext instance. It will use the default script if not provides.
52+
MPIJobContext instance. It will use the default script if not provides.
5053
:param timeout: the timeout used to wait for job creation
51-
:param mpi_type: the mpi type, now only support openmpi and intel_mpi
54+
:param mpi_type: the mpi type, now only support openmpi, intel_mpi and MPICH
5255
:param placement_group: the placement_group for request mpi resources
5356
:param placement_group_bundle_indexes: this should be equal with
5457
world_size / num_processes_per_node if provides.
@@ -74,8 +77,18 @@ def create_mpi_job(job_name: str,
7477
timeout=timeout,
7578
placement_group=placement_group,
7679
placement_group_bundle_indexes=placement_group_bundle_indexes)
80+
elif mpi_type == MPIType.MPICH:
81+
return MPICHJob(mpi_type=MPIType.MPICH,
82+
job_name=job_name,
83+
world_size=world_size,
84+
num_cpus_per_process=num_cpus_per_process,
85+
num_processes_per_node=num_processes_per_node,
86+
mpi_script_prepare_fn=mpi_script_prepare_fn,
87+
timeout=timeout,
88+
placement_group=placement_group,
89+
placement_group_bundle_indexes=placement_group_bundle_indexes)
7790
else:
7891
raise Exception(f"MPI type: {mpi_type} not supported now")
7992

8093

81-
__all__ = ["create_mpi_job", "MPIJobContext"]
94+
__all__ = ["create_mpi_job", "MPIJobContext", "WorkerContext"]

0 commit comments

Comments
 (0)