diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bcbe6e2..2e9c5df 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -15,30 +15,39 @@ # specific language governing permissions and limitations # under the License. -name: main +name: Python Tests on: push: branches: [main] pull_request: branches: [main] + workflow_dispatch: -#concurrency: -# group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} -#cancel-in-progress: true +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +env: + MATURIN_PEP517_ARGS: --profile=dev jobs: - test-matrix: - runs-on: ubuntu-latest + validate-tpch: + runs-on: ${{ matrix.platform.image }} strategy: fail-fast: false matrix: + platform: + - image: ubuntu-latest + target: x86_64-unknown-linux-musl python-version: - #- "3.10" - #- "3.11" + - "3.10" + - "3.11" - "3.12" - toolchain: - - "stable" - + ray-version: + - "2.40" + - "2.41" + - "2.42.1" + - "2.43" steps: - uses: actions/checkout@v4 @@ -46,7 +55,13 @@ jobs: uses: dtolnay/rust-toolchain@stable id: rust-toolchain with: - components: clippy,rustfmt + target: ${{ matrix.platform.target }} + + - name: Check Rust output + id: rust-toolchain-out + run: | + rustc --version --verbose + cargo --version - name: Install Protoc uses: arduino/setup-protoc@v3 @@ -60,15 +75,35 @@ jobs: path: ~/.cargo key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{ hashFiles('Cargo.lock') }} - - name: Install dependencies and build + - name: install uv uses: astral-sh/setup-uv@v5 with: - python-version: ${{ matrix.python-version }} enable-cache: true + python-version: ${{ matrix.python-version }} + + - name: initial project sync + run: | + cargo --version + uv sync --no-install-package datafusion-ray --no-install-package ray - - name: Create virtual env + # reset the version of ray in pyproject.toml + # to agree align with our matrix version + - name: add ray run: | - uv venv + cargo --version + uv add --no-sync 'ray[default]==${{ matrix.ray-version}}' + + # sync the environment, building everything other than datafusion-ray + - name: install dependencies + run: | + uv sync --no-install-package datafusion-ray + working-directory: ${{ github.workspace }} + + # build datafusion ray honoring the MATURIN_PEP517_ARGS env + # var to ensure that we build this in dev mode so its faster + - name: maturin develop + run: uv run maturin develop --uv + working-directory: ${{ github.workspace }} - name: Cache the generated dataset id: cache-tpch-dataset @@ -80,26 +115,21 @@ jobs: - name: create the dataset if: ${{ steps.cache-tpch-dataset.outputs.cache-hit != 'true' }} run: | - uv add duckdb - uv run python tpch/make_data.py 1 testdata/tpch/ - - - name: build and install datafusion-ray - env: - RUST_BACKTRACE: 1 - run: | - uv add 'ray[default]' - uv run --no-project maturin develop --uv + uv run tpch/make_data.py 1 testdata/tpch/ + # run the tpcbench.py file with --validate which will cause + # it to exit with an error code if any of the queries do not validate - name: validate tpch env: DATAFUSION_RAY_LOG_LEVEL: debug RAY_COLOR_PREFIX: 1 RAY_DEDUP_LOGS: 0 + RUST_BACKTRACE: 1 run: | uv run python tpch/tpcbench.py \ --data='file:///${{ github.workspace }}/testdata/tpch/' \ --concurrency 3 \ - --partitions-per-worker 2 \ + --partitions-per-processor 2 \ --batch-size=8192 \ --worker-pool-min=20 \ --validate diff --git a/README.md b/README.md index 11951f7..1fc484f 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ under the License. --> +[![Python Tests](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml/badge.svg)](https://github.com/robtandy/datafusion-ray/actions/workflows/main.yml) + # DataFusion on Ray ## Overview @@ -36,16 +38,16 @@ as soon as its inputs are available, leading to a more pipelined execution model ### Batch Execution -_Note: Batch Execution is not implemented yet. Tracking issue: https://github.com/apache/datafusion-ray/issues/69_ +_Note: Batch Execution is not implemented yet. Tracking issue: _ -In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing +In this mode, execution follows a staged model similar to Apache Spark. Each query stage runs to completion, producing intermediate shuffle files that are persisted and used as input for the next stage. ## Getting Started See the [contributor guide] for instructions on building DataFusion Ray. -Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution +Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution capabilities of Ray. ```python @@ -67,5 +69,4 @@ Contributions are welcome! Please open an issue or submit a pull request if you DataFusion Ray is licensed under Apache 2.0. - -[contributor guide]: docs/contributing.md \ No newline at end of file +[contributor guide]: docs/contributing.md diff --git a/docs/contributing.md b/docs/contributing.md index e93447c..6dffbd0 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -21,18 +21,52 @@ ## Building -To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin). +You'll need to have both rust and cargo installed. -Install maturin in your current python environment (a virtual environment is recommended), with +We will follow the development workflow outlined by [datafusion-python](https://github.com/apache/datafusion-python), [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin). + +The Maturin tools used in this workflow can be installed either via `uv` or `pip`. Both approaches should offer the same experience. It is recommended to use `uv` since it has significant performance improvements +over `pip`. + +Bootstrap (`uv`): + +By default `uv` will attempt to build the datafusion-ray python package. For our development we prefer to build manually. This means +that when creating your virtual environment using `uv sync` you need to pass in the additional `--no-install-package datafusion-ray`. This tells uv, to install all of the dependencies found in `pyproject.toml`, but skip building `datafusion-ray` as we'll do that manually. + +```bash +# fetch this repo +git clone git@github.com:apache/datafusion-ray.git +# go to repo root +cd datafusion-ray +# create the virtual enviornment +uv sync --dev --no-install-package datafusion-ray +# activate the environment +source .venv/bin/activate +``` + +Bootstrap (`pip`): ```bash -pip install maturin +# fetch this repo +git clone git@github.com:apache/datafusion-python.git +# go to repo root +cd datafusion-ray +# prepare development environment (used to build wheel / install in development) +python3 -m venv .venv +# activate the venv +source .venv/bin/activate +# update pip itself if necessary +python -m pip install -U pip +# install dependencies +python -m pip install -r pyproject.toml ``` -Then build the project with the following command: +Whenever rust code changes (your changes or via `git pull`): ```bash -maturin develop # --release for a release build +# make sure you activate the venv using "source venv/bin/activate" first +maturin develop --uv +python -m pytest ``` ## Example @@ -57,14 +91,14 @@ For example, to execute the following query: RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' ``` -To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-worker`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-worker=4`, then `40` `RayStage` Actors will be created. If `partitions-per-worker=16` or is absent, then `10` `RayStage` Actors will be created. +To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-processor`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-processor=4`, then `40` `RayStage` Actors will be created. If `partitions-per-processor=16` or is absent, then `10` `RayStage` Actors will be created. To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output. To run the entire TPCH benchmark use ```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-worker=] [--validate] +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-processor=] [--validate] ``` This will output a json file in the current directory with query timings. diff --git a/pyproject.toml b/pyproject.toml index 32aa6e3..7261cc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,19 +16,27 @@ # under the License. [build-system] -requires = ["maturin>=0.14"] +requires = ["maturin>=1.8.1,<2.0"] build-backend = "maturin" [project] name = "datafusion-ray" -requires-python = ">=3.10" +requires-python = ">=3.10,<3.13" classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] +keywords = [ + "datafusion", + "ray", + "dataframe", + "rust", + "query-engine", + "distributed", +] dependencies = [ - "datafusion>=45.0.0", + "ray[default]==2.43.0", "pyarrow>=18.0.0", "typing-extensions;python_version<'3.13'", ] @@ -36,3 +44,6 @@ dynamic = ["version"] [tool.maturin] module-name = "datafusion_ray._datafusion_ray_internal" + +[dependency-groups] +dev = ["maturin>=1.8.1", "duckdb", "datafusion>=43.0.0"] diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py index edb943e..13960bf 100644 --- a/tpch/tpcbench.py +++ b/tpch/tpcbench.py @@ -107,21 +107,9 @@ def main( start_time = time.time() df = ctx.sql(sql) - end_time = time.time() - print(f"Ray output schema {df.schema()}") - print("Logical plan \n", df.logical_plan().display_indent()) - print("Optimized Logical plan \n", df.optimized_logical_plan().display_indent()) - part1 = end_time - start_time - for stage in df.stages(): - print( - f"Stage {stage.stage_id} output partitions:{stage.num_output_partitions} partition_groups: {stage.partition_groups} full_partitions: {stage.full_partitions}" - ) - print(stage.display_execution_plan()) - - start_time = time.time() batches = df.collect() end_time = time.time() - results["queries"][qnum] = end_time - start_time + part1 + results["queries"][qnum] = end_time - start_time calculated = prettify(batches) print(calculated) @@ -152,6 +140,11 @@ def main( print("sleeping for 3 seconds for ray to clean up") time.sleep(3) + if validate and False in results["validated"].values(): + # return a non zero return code if we did not validate all queries + print("Possible incorrect query result") + exit(1) + if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -174,7 +167,7 @@ def main( help="Desired batch size output per stage", ) parser.add_argument( - "--partitions-per-worker", + "--partitions-per-processor", type=int, help="Max partitions per Stage Service Worker", ) @@ -198,7 +191,7 @@ def main( args.data, int(args.concurrency), int(args.batch_size), - args.partitions_per_worker, + args.partitions_per_processor, args.worker_pool_min, args.listing_tables, args.validate,