Skip to content

Commit 1c7f499

Browse files
authored
Merge pull request #5 from robtandy/rob.tandy/arrow-flight
Rob.tandy/arrow flight
2 parents d68e05f + c1ce07c commit 1c7f499

30 files changed

+2683
-1764
lines changed

Cargo.lock

Lines changed: 540 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,36 +25,50 @@ version = "0.1.0"
2525
edition = "2021"
2626
readme = "README.md"
2727
license = "Apache-2.0"
28-
rust-version = "1.62"
28+
rust-version = "1.70"
2929
build = "build.rs"
3030

3131
[dependencies]
32+
anyhow = "1"
3233
arrow = { version = "53.3", features = ["pyarrow", "ipc"] }
34+
arrow-flight = "53.3"
3335
async-stream = "0.3"
36+
async-channel = "2.3"
3437
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
3538
datafusion-python = { version = "43.1" }
3639
datafusion-proto = "43.0"
3740
futures = "0.3"
3841
glob = "0.3.1"
42+
local-ip-address = "0.6"
3943
log = "0.4"
4044
object_store = { version = "0.11.0", features = [
4145
"aws",
4246
"gcp",
4347
"azure",
4448
"http",
4549
] }
50+
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
4651
prost = "0.13"
4752
pyo3 = { version = "0.22.6", features = [
4853
"extension-module",
4954
"abi3",
5055
"abi3-py38",
5156
] }
5257
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
58+
rust_decimal = "1.36"
5359
tokio = { version = "1.40", features = [
5460
"macros",
5561
"rt",
5662
"rt-multi-thread",
5763
"sync",
64+
"time",
65+
] }
66+
tokio-stream = "0.1"
67+
68+
tonic = { version = "0.12.3", default-features = false, features = [
69+
"transport",
70+
"codegen",
71+
"prost",
5872
] }
5973
uuid = "1.11.0"
6074
url = "2"

README.md

Lines changed: 11 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020
# DataFusion on Ray
2121

22-
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
22+
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
2323
> Python, using [Ray] and [Apache DataFusion]
2424
2525
[ray-sql]: https://github.com/datafusion-contrib/ray-sql
2626

27-
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
27+
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
2828
of [Apache Arrow], [Apache DataFusion], and [Ray].
2929

3030
[Ray]: https://www.ray.io/
@@ -35,118 +35,32 @@ of [Apache Arrow], [Apache DataFusion], and [Ray].
3535

3636
### Comparison to DataFusion Ballista
3737

38-
- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
38+
- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
3939
Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
4040
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first
4141

4242
[DataFusion Ballista]: https://github.com/apache/datafusion-ballista
4343

4444
### Comparison to DataFusion Python
4545

46-
- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends
46+
- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends
4747
DataFusion Python to provide scalability across multiple nodes.
4848

4949
[DataFusion Python]: https://github.com/apache/datafusion-python
5050

5151
## Example
5252

53-
Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).
54-
55-
```python
56-
import os
57-
import ray
58-
59-
from datafusion_ray import DatafusionRayContext
60-
61-
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
62-
63-
# Start a local cluster
64-
ray.init(resources={"worker": 1})
65-
66-
# Create a context and register a table
67-
ctx = DatafusionRayContext(2)
68-
# Register either a CSV or Parquet file
69-
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
70-
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
71-
72-
result_set = ctx.sql(
73-
"select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
74-
)
75-
for record_batch in result_set:
76-
print(record_batch.to_pandas())
77-
```
78-
79-
## Status
80-
81-
- DataFusion Ray can run all queries in the TPC-H benchmark
82-
83-
## Features
84-
85-
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
86-
- Support for CSV and Parquet files
87-
88-
## Building
89-
90-
```bash
91-
# prepare development environment (used to build wheel / install in development)
92-
python3 -m venv venv
93-
# activate the venv
94-
source venv/bin/activate
95-
# update pip itself if necessary
96-
python -m pip install -U pip
97-
# install dependencies (for Python 3.8+)
98-
python -m pip install -r requirements-in.txt
99-
```
100-
101-
Whenever rust code changes (your changes or via `git pull`):
102-
103-
```bash
104-
# make sure you activate the venv using "source venv/bin/activate" first
105-
maturin develop; python -m pytest
106-
```
107-
108-
## Testing
109-
110-
Running local Rust tests require generating the tpch-data. This can be done
111-
by running the following commands:
53+
- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at the specifed scale factor, then
11254

11355
```bash
114-
export TPCH_TEST_PARTITIONS=1
115-
export TPCH_SCALING_FACTOR=1
116-
./scripts/gen-test-data.sh
117-
```
118-
119-
This will generate data into a top-level `data` directory.
120-
121-
Tests can be run with:
122-
123-
```shell
124-
export TPCH_DATA_PATH=`pwd`/data
125-
cargo test
56+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=4 --batch-size=4096 --validate
12657
```
12758

128-
## Benchmarking
129-
130-
Create a release build when running benchmarks, then use pip to install the wheel.
131-
132-
```bash
133-
maturin develop --release
134-
```
135-
136-
## How to update dependencies
137-
138-
To change test dependencies, change the `requirements.in` and run
139-
140-
```bash
141-
# install pip-tools (this can be done only once), also consider running in venv
142-
python -m pip install pip-tools
143-
python -m piptools compile --generate-hashes -o requirements-310.txt
144-
```
59+
## Status
14560

146-
To update dependencies, run with `-U`
61+
- DataFusion Ray can execute all TPCH queries.
14762

148-
```bash
149-
python -m piptools compile -U --generate-hashes -o requirements-310.txt
150-
```
63+
## Known Issues
15164

152-
More details [here](https://github.com/jazzband/pip-tools)
65+
- Using `--isolate` (in `tpcbench.py`) to execute individual partitions in their own Ray Actors currently can produce incorrect results.
66+
- The DataFusion config setting, `datafusion.execution.parquet.pushdown_filters`, can produce incorrect results. We think this could be related to an issue with round trip physical path serialization.

build.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::path::Path;
1919

2020
fn main() -> Result<(), String> {
21-
use std::io::Write;
22-
/*
21+
/* use std::io::Write;
22+
2323
let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
2424
2525
// for use in docker build where file changes can be wonky

datafusion_ray/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020
except ImportError:
2121
import importlib_metadata
2222

23-
from .context import RayContext, RayDataFrame, StageReader
23+
from .core import RayContext, prettify
2424

2525
__version__ = importlib_metadata.version(__name__)

0 commit comments

Comments
 (0)