|
19 | 19 |
|
20 | 20 | # DataFusion on Ray |
21 | 21 |
|
22 | | -> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from |
23 | | -> Python, using [Ray] and [Apache DataFusion] |
| 22 | +## Overview |
24 | 23 |
|
25 | | -[ray-sql]: https://github.com/datafusion-contrib/ray-sql |
| 24 | +DataFusion Ray is a distributed execution framework that enables DataFusion DataFrame and SQL queries to run on a |
| 25 | +Ray cluster. This integration allows users to leverage Ray's dynamic scheduling capabilities while executing |
| 26 | +queries in a distributed fashion. |
26 | 27 |
|
27 | | -DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation |
28 | | -of [Apache Arrow], [Apache DataFusion], and [Ray]. |
| 28 | +## Execution Modes |
29 | 29 |
|
30 | | -[Ray]: https://www.ray.io/ |
31 | | -[Apache Arrow]: https://arrow.apache.org/ |
32 | | -[Apache DataFusion]: https://datafusion.apache.org/ |
| 30 | +DataFusion Ray supports two execution modes: |
33 | 31 |
|
34 | | -## Comparison to other DataFusion projects |
| 32 | +### Streaming Execution |
35 | 33 |
|
36 | | -### Comparison to DataFusion Ballista |
| 34 | +This mode mimics the default execution strategy of DataFusion. Each operator in the query plan starts executing |
| 35 | +as soon as its inputs are available, leading to a more pipelined execution model. |
37 | 36 |
|
38 | | -- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on |
39 | | - Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project. |
40 | | -- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first |
| 37 | +### Batch Execution |
41 | 38 |
|
42 | | -[DataFusion Ballista]: https://github.com/apache/datafusion-ballista |
| 39 | +_Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_ |
43 | 40 |
|
44 | | -### Comparison to DataFusion Python |
| 41 | +In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing |
| 42 | +intermediate shuffle files that are persisted and used as input for the next stage. |
45 | 43 |
|
46 | | -- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends |
47 | | - DataFusion Python to provide scalability across multiple nodes. |
| 44 | +## Getting Started |
48 | 45 |
|
49 | | -[DataFusion Python]: https://github.com/apache/datafusion-python |
| 46 | +See the [contributor guide] for instructions on building DataFusion Ray. |
50 | 47 |
|
51 | | -## Building |
| 48 | +Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution |
| 49 | +capabilities of Ray. |
52 | 50 |
|
53 | | -To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin). |
| 51 | +```python |
| 52 | +import ray |
| 53 | +from datafusion_ray import DFRayContext, df_ray_runtime_env |
54 | 54 |
|
55 | | -Install maturin in your current python environment (a virtual environment is recommended), with |
56 | | - |
57 | | -```bash |
58 | | -pip install maturin |
59 | | -``` |
60 | | - |
61 | | -Then build the project with the following command: |
62 | | - |
63 | | -```bash |
64 | | -maturin develop # --release for a release build |
65 | | -``` |
66 | | - |
67 | | -## Example |
68 | | - |
69 | | -- In the `examples` directory, run |
70 | | - |
71 | | -```bash |
72 | | -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata/tips/ |
73 | | -``` |
74 | | - |
75 | | -- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then |
76 | | - |
77 | | -```bash |
78 | | -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2 |
| 55 | +ray.init(runtime_env=df_ray_runtime_env) |
| 56 | +session = DFRayContext() |
| 57 | +df = session.sql("SELECT * FROM my_table WHERE value > 100") |
| 58 | +df.show() |
79 | 59 | ``` |
80 | 60 |
|
81 | | -To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create. |
82 | | - |
83 | | -For example, to execute the following query: |
84 | | - |
85 | | -```bash |
86 | | -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' |
87 | | -``` |
88 | | - |
89 | | -To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created. |
90 | | - |
91 | | -To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output. |
92 | | - |
93 | | -To run the entire TPCH benchmark use |
94 | | - |
95 | | -```bash |
96 | | -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate] |
97 | | -``` |
98 | | - |
99 | | -This will output a json file in the current directory with query timings. |
100 | | - |
101 | | -## Logging |
102 | | - |
103 | | -DataFusion Ray's logging output is determined by the `DATAFUSION_RAY_LOG_LEVEL` environment variable. The default log level is `WARN`. To change the log level, set the environment variable to one of the following values: `ERROR`, `WARN`, `INFO`, `DEBUG`, or `TRACE`. |
104 | | - |
105 | | -DataFusion Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for `datafusion_ray` is routed to rust for logging. The `RUST_LOG` environment variable can be used to control other rust log output other than `datafusion_ray`. |
| 61 | +## Contributing |
106 | 62 |
|
107 | | -## Status |
| 63 | +Contributions are welcome! Please open an issue or submit a pull request if you would like to contribute. See the |
| 64 | +[contributor guide] for more information. |
108 | 65 |
|
109 | | -- DataFusion Ray can execute all TPCH queries. Tested up to SF100. |
| 66 | +## License |
110 | 67 |
|
111 | | -## Known Issues |
| 68 | +DataFusion Ray is licensed under Apache 2.0. |
112 | 69 |
|
113 | | -- We are waiting to upgrade to a DataFusion version where the parquet options are serialized into substrait in order to send them correctly in a plan. Currently, we |
114 | | - manually add back `table_parquet_options.pushdown_filters=true` after deserialization to compensate. This will be refactored in the future. |
115 | 70 |
|
116 | | -see <https://github.com/apache/datafusion/pull/14465> |
| 71 | +[contributor guide]: docs/contributing.md |
0 commit comments