Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,53 @@
# specific language governing permissions and limitations
# under the License.

name: main
name: Python Tests
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:

#concurrency:
# group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
#cancel-in-progress: true
concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

env:
MATURIN_PEP517_ARGS: --profile=dev

jobs:
test-matrix:
runs-on: ubuntu-latest
validate-tpch:
runs-on: ${{ matrix.platform.image }}
strategy:
fail-fast: false
matrix:
platform:
- image: ubuntu-latest
target: x86_64-unknown-linux-musl
python-version:
#- "3.10"
#- "3.11"
- "3.10"
- "3.11"
- "3.12"
toolchain:
- "stable"

ray-version:
- "2.40"
- "2.41"
- "2.42.1"
- "2.43"
steps:
- uses: actions/checkout@v4

- name: Setup Rust Toolchain
uses: dtolnay/rust-toolchain@stable
id: rust-toolchain
with:
components: clippy,rustfmt
target: ${{ matrix.platform.target }}

- name: Check Rust output
id: rust-toolchain-out
run: |
rustc --version --verbose
cargo --version

- name: Install Protoc
uses: arduino/setup-protoc@v3
Expand All @@ -60,15 +75,35 @@ jobs:
path: ~/.cargo
key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{ hashFiles('Cargo.lock') }}

- name: Install dependencies and build
- name: install uv
uses: astral-sh/setup-uv@v5
with:
python-version: ${{ matrix.python-version }}
enable-cache: true
python-version: ${{ matrix.python-version }}

- name: initial project sync
run: |
cargo --version
uv sync --no-install-package datafusion-ray --no-install-package ray

- name: Create virtual env
# reset the version of ray in pyproject.toml
# to agree align with our matrix version
- name: add ray
run: |
uv venv
cargo --version
uv add --no-sync 'ray[default]==${{ matrix.ray-version}}'

# sync the environment, building everything other than datafusion-ray
- name: install dependencies
run: |
uv sync --no-install-package datafusion-ray
working-directory: ${{ github.workspace }}

# build datafusion ray honoring the MATURIN_PEP517_ARGS env
# var to ensure that we build this in dev mode so its faster
- name: maturin develop
run: uv run maturin develop --uv
working-directory: ${{ github.workspace }}

- name: Cache the generated dataset
id: cache-tpch-dataset
Expand All @@ -80,26 +115,21 @@ jobs:
- name: create the dataset
if: ${{ steps.cache-tpch-dataset.outputs.cache-hit != 'true' }}
run: |
uv add duckdb
uv run python tpch/make_data.py 1 testdata/tpch/

- name: build and install datafusion-ray
env:
RUST_BACKTRACE: 1
run: |
uv add 'ray[default]'
uv run --no-project maturin develop --uv
uv run tpch/make_data.py 1 testdata/tpch/

# run the tpcbench.py file with --validate which will cause
# it to exit with an error code if any of the queries do not validate
- name: validate tpch
env:
DATAFUSION_RAY_LOG_LEVEL: debug
RAY_COLOR_PREFIX: 1
RAY_DEDUP_LOGS: 0
RUST_BACKTRACE: 1
run: |
uv run python tpch/tpcbench.py \
--data='file:///${{ github.workspace }}/testdata/tpch/' \
--concurrency 3 \
--partitions-per-worker 2 \
--partitions-per-processor 2 \
--batch-size=8192 \
--worker-pool-min=20 \
--validate
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
under the License.
-->

[![Python Tests](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml/badge.svg)](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml)

# DataFusion on Ray

## Overview
Expand All @@ -36,16 +38,16 @@ as soon as its inputs are available, leading to a more pipelined execution model

### Batch Execution

_Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_
_Note: Batch Execution is not implemented yet. Tracking issue: <https://github.com/apache/datafusion-ray/issues/69>_

In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing
In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing
intermediate shuffle files that are persisted and used as input for the next stage.

## Getting Started

See the [contributor guide] for instructions on building DataFusion Ray.

Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution
Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution
capabilities of Ray.

```python
Expand All @@ -67,5 +69,4 @@ Contributions are welcome! Please open an issue or submit a pull request if you

DataFusion Ray is licensed under Apache 2.0.


[contributor guide]: docs/contributing.md
[contributor guide]: docs/contributing.md
48 changes: 41 additions & 7 deletions docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,52 @@

## Building

To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin).
You'll need to have both rust and cargo installed.

Install maturin in your current python environment (a virtual environment is recommended), with
We will follow the development workflow outlined by [datafusion-python](https://github.com/apache/datafusion-python), [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).

The Maturin tools used in this workflow can be installed either via `uv` or `pip`. Both approaches should offer the same experience. It is recommended to use `uv` since it has significant performance improvements
over `pip`.

Bootstrap (`uv`):

By default `uv` will attempt to build the datafusion-ray python package. For our development we prefer to build manually. This means
that when creating your virtual environment using `uv sync` you need to pass in the additional `--no-install-package datafusion-ray`. This tells uv, to install all of the dependencies found in `pyproject.toml`, but skip building `datafusion-ray` as we'll do that manually.

```bash
# fetch this repo
git clone [email protected]:apache/datafusion-ray.git
# go to repo root
cd datafusion-ray
# create the virtual enviornment
uv sync --dev --no-install-package datafusion-ray
# activate the environment
source .venv/bin/activate
```

Bootstrap (`pip`):

```bash
pip install maturin
# fetch this repo
git clone [email protected]:apache/datafusion-python.git
# go to repo root
cd datafusion-ray
# prepare development environment (used to build wheel / install in development)
python3 -m venv .venv
# activate the venv
source .venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies
python -m pip install -r pyproject.toml
```

Then build the project with the following command:
Whenever rust code changes (your changes or via `git pull`):

```bash
maturin develop # --release for a release build
# make sure you activate the venv using "source venv/bin/activate" first
maturin develop --uv
python -m pytest
```

## Example
Expand All @@ -57,14 +91,14 @@ For example, to execute the following query:
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1'
```

To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created.
To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-processor`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-processor=4`, then `40` `RayStage` Actors will be created. If `partitions-per-processor=16` or is absent, then `10` `RayStage` Actors will be created.

To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output.

To run the entire TPCH benchmark use

```bash
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate]
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-processor=] [--validate]
```

This will output a json file in the current directory with query timings.
Expand Down
17 changes: 14 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,34 @@
# under the License.

[build-system]
requires = ["maturin>=0.14"]
requires = ["maturin>=1.8.1,<2.0"]
build-backend = "maturin"

[project]
name = "datafusion-ray"
requires-python = ">=3.10"
requires-python = ">=3.10,<3.13"
classifiers = [
"Programming Language :: Rust",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
keywords = [
"datafusion",
"ray",
"dataframe",
"rust",
"query-engine",
"distributed",
]
dependencies = [
"datafusion>=45.0.0",
"ray[default]==2.43.0",
"pyarrow>=18.0.0",
"typing-extensions;python_version<'3.13'",
]
dynamic = ["version"]

[tool.maturin]
module-name = "datafusion_ray._datafusion_ray_internal"

[dependency-groups]
dev = ["maturin>=1.8.1", "duckdb", "datafusion>=43.0.0"]
23 changes: 8 additions & 15 deletions tpch/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,9 @@ def main(

start_time = time.time()
df = ctx.sql(sql)
end_time = time.time()
print(f"Ray output schema {df.schema()}")
print("Logical plan \n", df.logical_plan().display_indent())
print("Optimized Logical plan \n", df.optimized_logical_plan().display_indent())
part1 = end_time - start_time
for stage in df.stages():
print(
f"Stage {stage.stage_id} output partitions:{stage.num_output_partitions} partition_groups: {stage.partition_groups} full_partitions: {stage.full_partitions}"
)
print(stage.display_execution_plan())

start_time = time.time()
batches = df.collect()
end_time = time.time()
results["queries"][qnum] = end_time - start_time + part1
results["queries"][qnum] = end_time - start_time

calculated = prettify(batches)
print(calculated)
Expand Down Expand Up @@ -152,6 +140,11 @@ def main(
print("sleeping for 3 seconds for ray to clean up")
time.sleep(3)

if validate and False in results["validated"].values():
# return a non zero return code if we did not validate all queries
print("Possible incorrect query result")
exit(1)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand All @@ -174,7 +167,7 @@ def main(
help="Desired batch size output per stage",
)
parser.add_argument(
"--partitions-per-worker",
"--partitions-per-processor",
type=int,
help="Max partitions per Stage Service Worker",
)
Expand All @@ -198,7 +191,7 @@ def main(
args.data,
int(args.concurrency),
int(args.batch_size),
args.partitions_per_worker,
args.partitions_per_processor,
args.worker_pool_min,
args.listing_tables,
args.validate,
Expand Down