Skip to content

Commit 0d76298

Browse files
authored
Add working CI tests to validate TPCH across a matrix of python / ray versions (#74)
1 parent 8498b19 commit 0d76298

File tree

5 files changed

+125
-56
lines changed

5 files changed

+125
-56
lines changed

.github/workflows/main.yml

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,53 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
name: main
18+
name: Python Tests
1919
on:
2020
push:
2121
branches: [main]
2222
pull_request:
2323
branches: [main]
24+
workflow_dispatch:
2425

25-
#concurrency:
26-
# group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
27-
#cancel-in-progress: true
26+
concurrency:
27+
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
28+
cancel-in-progress: true
29+
30+
env:
31+
MATURIN_PEP517_ARGS: --profile=dev
2832

2933
jobs:
30-
test-matrix:
31-
runs-on: ubuntu-latest
34+
validate-tpch:
35+
runs-on: ${{ matrix.platform.image }}
3236
strategy:
3337
fail-fast: false
3438
matrix:
39+
platform:
40+
- image: ubuntu-latest
41+
target: x86_64-unknown-linux-musl
3542
python-version:
36-
#- "3.10"
37-
#- "3.11"
43+
- "3.10"
44+
- "3.11"
3845
- "3.12"
39-
toolchain:
40-
- "stable"
41-
46+
ray-version:
47+
- "2.40"
48+
- "2.41"
49+
- "2.42.1"
50+
- "2.43"
4251
steps:
4352
- uses: actions/checkout@v4
4453

4554
- name: Setup Rust Toolchain
4655
uses: dtolnay/rust-toolchain@stable
4756
id: rust-toolchain
4857
with:
49-
components: clippy,rustfmt
58+
target: ${{ matrix.platform.target }}
59+
60+
- name: Check Rust output
61+
id: rust-toolchain-out
62+
run: |
63+
rustc --version --verbose
64+
cargo --version
5065
5166
- name: Install Protoc
5267
uses: arduino/setup-protoc@v3
@@ -60,15 +75,35 @@ jobs:
6075
path: ~/.cargo
6176
key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{ hashFiles('Cargo.lock') }}
6277

63-
- name: Install dependencies and build
78+
- name: install uv
6479
uses: astral-sh/setup-uv@v5
6580
with:
66-
python-version: ${{ matrix.python-version }}
6781
enable-cache: true
82+
python-version: ${{ matrix.python-version }}
83+
84+
- name: initial project sync
85+
run: |
86+
cargo --version
87+
uv sync --no-install-package datafusion-ray --no-install-package ray
6888
69-
- name: Create virtual env
89+
# reset the version of ray in pyproject.toml
90+
# to agree align with our matrix version
91+
- name: add ray
7092
run: |
71-
uv venv
93+
cargo --version
94+
uv add --no-sync 'ray[default]==${{ matrix.ray-version}}'
95+
96+
# sync the environment, building everything other than datafusion-ray
97+
- name: install dependencies
98+
run: |
99+
uv sync --no-install-package datafusion-ray
100+
working-directory: ${{ github.workspace }}
101+
102+
# build datafusion ray honoring the MATURIN_PEP517_ARGS env
103+
# var to ensure that we build this in dev mode so its faster
104+
- name: maturin develop
105+
run: uv run maturin develop --uv
106+
working-directory: ${{ github.workspace }}
72107

73108
- name: Cache the generated dataset
74109
id: cache-tpch-dataset
@@ -80,26 +115,21 @@ jobs:
80115
- name: create the dataset
81116
if: ${{ steps.cache-tpch-dataset.outputs.cache-hit != 'true' }}
82117
run: |
83-
uv add duckdb
84-
uv run python tpch/make_data.py 1 testdata/tpch/
85-
86-
- name: build and install datafusion-ray
87-
env:
88-
RUST_BACKTRACE: 1
89-
run: |
90-
uv add 'ray[default]'
91-
uv run --no-project maturin develop --uv
118+
uv run tpch/make_data.py 1 testdata/tpch/
92119
120+
# run the tpcbench.py file with --validate which will cause
121+
# it to exit with an error code if any of the queries do not validate
93122
- name: validate tpch
94123
env:
95124
DATAFUSION_RAY_LOG_LEVEL: debug
96125
RAY_COLOR_PREFIX: 1
97126
RAY_DEDUP_LOGS: 0
127+
RUST_BACKTRACE: 1
98128
run: |
99129
uv run python tpch/tpcbench.py \
100130
--data='file:///${{ github.workspace }}/testdata/tpch/' \
101131
--concurrency 3 \
102-
--partitions-per-worker 2 \
132+
--partitions-per-processor 2 \
103133
--batch-size=8192 \
104134
--worker-pool-min=20 \
105135
--validate

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
under the License.
1818
-->
1919

20+
[![Python Tests](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml/badge.svg)](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml)
21+
2022
# DataFusion on Ray
2123

2224
## Overview
@@ -36,16 +38,16 @@ as soon as its inputs are available, leading to a more pipelined execution model
3638

3739
### Batch Execution
3840

39-
_Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_
41+
_Note: Batch Execution is not implemented yet. Tracking issue: <https://github.com/apache/datafusion-ray/issues/69>_
4042

41-
In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing
43+
In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing
4244
intermediate shuffle files that are persisted and used as input for the next stage.
4345

4446
## Getting Started
4547

4648
See the [contributor guide] for instructions on building DataFusion Ray.
4749

48-
Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution
50+
Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution
4951
capabilities of Ray.
5052

5153
```python
@@ -67,5 +69,4 @@ Contributions are welcome! Please open an issue or submit a pull request if you
6769

6870
DataFusion Ray is licensed under Apache 2.0.
6971

70-
71-
[contributor guide]: docs/contributing.md
72+
[contributor guide]: docs/contributing.md

docs/contributing.md

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,52 @@
2121

2222
## Building
2323

24-
To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin).
24+
You'll need to have both rust and cargo installed.
2525

26-
Install maturin in your current python environment (a virtual environment is recommended), with
26+
We will follow the development workflow outlined by [datafusion-python](https://github.com/apache/datafusion-python), [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).
27+
28+
The Maturin tools used in this workflow can be installed either via `uv` or `pip`. Both approaches should offer the same experience. It is recommended to use `uv` since it has significant performance improvements
29+
over `pip`.
30+
31+
Bootstrap (`uv`):
32+
33+
By default `uv` will attempt to build the datafusion-ray python package. For our development we prefer to build manually. This means
34+
that when creating your virtual environment using `uv sync` you need to pass in the additional `--no-install-package datafusion-ray`. This tells uv, to install all of the dependencies found in `pyproject.toml`, but skip building `datafusion-ray` as we'll do that manually.
35+
36+
```bash
37+
# fetch this repo
38+
git clone [email protected]:apache/datafusion-ray.git
39+
# go to repo root
40+
cd datafusion-ray
41+
# create the virtual enviornment
42+
uv sync --dev --no-install-package datafusion-ray
43+
# activate the environment
44+
source .venv/bin/activate
45+
```
46+
47+
Bootstrap (`pip`):
2748

2849
```bash
29-
pip install maturin
50+
# fetch this repo
51+
git clone [email protected]:apache/datafusion-python.git
52+
# go to repo root
53+
cd datafusion-ray
54+
# prepare development environment (used to build wheel / install in development)
55+
python3 -m venv .venv
56+
# activate the venv
57+
source .venv/bin/activate
58+
# update pip itself if necessary
59+
python -m pip install -U pip
60+
# install dependencies
61+
python -m pip install -r pyproject.toml
3062
```
3163

32-
Then build the project with the following command:
64+
Whenever rust code changes (your changes or via `git pull`):
3365

3466
```bash
35-
maturin develop # --release for a release build
67+
# make sure you activate the venv using "source venv/bin/activate" first
68+
maturin develop --uv
69+
python -m pytest
3670
```
3771

3872
## Example
@@ -57,14 +91,14 @@ For example, to execute the following query:
5791
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1'
5892
```
5993

60-
To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created.
94+
To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-processor`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-processor=4`, then `40` `RayStage` Actors will be created. If `partitions-per-processor=16` or is absent, then `10` `RayStage` Actors will be created.
6195

6296
To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output.
6397

6498
To run the entire TPCH benchmark use
6599

66100
```bash
67-
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate]
101+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-processor=] [--validate]
68102
```
69103

70104
This will output a json file in the current directory with query timings.

pyproject.toml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,34 @@
1616
# under the License.
1717

1818
[build-system]
19-
requires = ["maturin>=0.14"]
19+
requires = ["maturin>=1.8.1,<2.0"]
2020
build-backend = "maturin"
2121

2222
[project]
2323
name = "datafusion-ray"
24-
requires-python = ">=3.10"
24+
requires-python = ">=3.10,<3.13"
2525
classifiers = [
2626
"Programming Language :: Rust",
2727
"Programming Language :: Python :: Implementation :: CPython",
2828
"Programming Language :: Python :: Implementation :: PyPy",
2929
]
30+
keywords = [
31+
"datafusion",
32+
"ray",
33+
"dataframe",
34+
"rust",
35+
"query-engine",
36+
"distributed",
37+
]
3038
dependencies = [
31-
"datafusion>=45.0.0",
39+
"ray[default]==2.43.0",
3240
"pyarrow>=18.0.0",
3341
"typing-extensions;python_version<'3.13'",
3442
]
3543
dynamic = ["version"]
3644

3745
[tool.maturin]
3846
module-name = "datafusion_ray._datafusion_ray_internal"
47+
48+
[dependency-groups]
49+
dev = ["maturin>=1.8.1", "duckdb", "datafusion>=43.0.0"]

tpch/tpcbench.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,9 @@ def main(
107107

108108
start_time = time.time()
109109
df = ctx.sql(sql)
110-
end_time = time.time()
111-
print(f"Ray output schema {df.schema()}")
112-
print("Logical plan \n", df.logical_plan().display_indent())
113-
print("Optimized Logical plan \n", df.optimized_logical_plan().display_indent())
114-
part1 = end_time - start_time
115-
for stage in df.stages():
116-
print(
117-
f"Stage {stage.stage_id} output partitions:{stage.num_output_partitions} partition_groups: {stage.partition_groups} full_partitions: {stage.full_partitions}"
118-
)
119-
print(stage.display_execution_plan())
120-
121-
start_time = time.time()
122110
batches = df.collect()
123111
end_time = time.time()
124-
results["queries"][qnum] = end_time - start_time + part1
112+
results["queries"][qnum] = end_time - start_time
125113

126114
calculated = prettify(batches)
127115
print(calculated)
@@ -152,6 +140,11 @@ def main(
152140
print("sleeping for 3 seconds for ray to clean up")
153141
time.sleep(3)
154142

143+
if validate and False in results["validated"].values():
144+
# return a non zero return code if we did not validate all queries
145+
print("Possible incorrect query result")
146+
exit(1)
147+
155148

156149
if __name__ == "__main__":
157150
parser = argparse.ArgumentParser(
@@ -174,7 +167,7 @@ def main(
174167
help="Desired batch size output per stage",
175168
)
176169
parser.add_argument(
177-
"--partitions-per-worker",
170+
"--partitions-per-processor",
178171
type=int,
179172
help="Max partitions per Stage Service Worker",
180173
)
@@ -198,7 +191,7 @@ def main(
198191
args.data,
199192
int(args.concurrency),
200193
int(args.batch_size),
201-
args.partitions_per_worker,
194+
args.partitions_per_processor,
202195
args.worker_pool_min,
203196
args.listing_tables,
204197
args.validate,

0 commit comments

Comments
 (0)