Skip to content

Commit 1154320

Browse files
authored
Merge pull request #7 from robtandy/rob.tandy/pull_arrow_flight
Rob.tandy/pull arrow flight
2 parents 36aef70 + 7d18881 commit 1154320

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1284
-4482
lines changed

.cargo/config.toml

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
[target.x86_64-apple-darwin]
2-
rustflags = [
3-
"-C", "link-arg=-undefined",
4-
"-C", "link-arg=dynamic_lookup",
5-
]
2+
rustflags = ["-C", "link-arg=-undefined", "-C", "link-arg=dynamic_lookup"]
63

74
[target.aarch64-apple-darwin]
8-
rustflags = [
9-
"-C", "link-arg=-undefined",
10-
"-C", "link-arg=dynamic_lookup",
11-
]
5+
rustflags = ["-C", "link-arg=-undefined", "-C", "link-arg=dynamic_lookup"]
126

7+
[build]
8+
rustflags = ["-C", "target-cpu=native"]

Cargo.lock

Lines changed: 7 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ version = "0.1.0"
2525
edition = "2021"
2626
readme = "README.md"
2727
license = "Apache-2.0"
28-
rust-version = "1.70"
28+
rust-version = "1.76"
2929
build = "build.rs"
3030

3131
[dependencies]
@@ -34,6 +34,7 @@ arrow = { version = "53.3", features = ["pyarrow", "ipc"] }
3434
arrow-flight = "53.3"
3535
async-stream = "0.3"
3636
async-channel = "2.3"
37+
bytesize = "1.3"
3738
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
3839
datafusion-python = { version = "43.1" }
3940
datafusion-proto = "43.0"
@@ -83,9 +84,9 @@ tonic-build = { version = "0.8", default-features = false, features = [
8384
url = "2"
8485

8586
[dev-dependencies]
86-
anyhow = "1.0.89"
87-
pretty_assertions = "1.4.0"
88-
regex = "1.11.0"
87+
#anyhow = "1.0.89"
88+
#pretty_assertions = "1.4.0"
89+
#regex = "1.11.0"
8990

9091
[lib]
9192
name = "datafusion_ray"
@@ -95,5 +96,13 @@ crate-type = ["cdylib", "rlib"]
9596
name = "datafusion_ray._datafusion_ray_internal"
9697

9798
[profile.release]
99+
lto = "thin"
98100
codegen-units = 1
99-
lto = true
101+
opt-level = 3
102+
debug = 0
103+
104+
[profile.dev]
105+
opt-level = 1
106+
107+
[profile.dev.package."*"]
108+
opt-level = 3

README.md

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,62 @@ of [Apache Arrow], [Apache DataFusion], and [Ray].
4848

4949
[DataFusion Python]: https://github.com/apache/datafusion-python
5050

51+
## Building
52+
53+
To build DataFusion Ray, you will need rust installed, as well as [https://github.com/PyO3/maturin](maturin).
54+
55+
Install maturin in your current python environment (a virtual environment is recommended), with
56+
57+
```bash
58+
pip install maturin
59+
```
60+
61+
Then build the project with the following command:
62+
63+
```bash
64+
maturin develop # --release for a release build
65+
```
66+
67+
- In the `examples` directory, run
68+
5169
## Example
5270

53-
- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at the specifed scale factor, then
71+
- In the `examples` directory, run
5472

5573
```bash
56-
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=4 --batch-size=4096 --validate
74+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata/tips/
5775
```
5876

77+
- In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then
78+
79+
```bash
80+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --qnum 2
81+
```
82+
83+
To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create.
84+
85+
For example, to execute the following query:
86+
87+
```bash
88+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --query `select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1`
89+
```
90+
91+
To further parallelize execution, and host each partition of each stage as a Ray Actor, add `--isolate`. Note that this can drastically increase the number of Actors. A future version of DataFusion Ray will provide a`--split-factor` which will let you configure how the Stages are split.
92+
93+
To validate the output against non-ray single node datafusion, add `--validate` which will ensure that both systems produce the same output.
94+
95+
To run the entire TPCH benchmark use
96+
97+
```bash
98+
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 [--isolate] [--validate]
99+
```
100+
101+
This will output a json file in the current directory with query timings.
102+
59103
## Status
60104

61-
- DataFusion Ray can execute all TPCH queries.
105+
- DataFusion Ray can execute all TPCH queries. Tested up to SF100.
62106

63107
## Known Issues
64108

65-
- Using `--isolate` (in `tpcbench.py`) to execute individual partitions in their own Ray Actors currently can produce incorrect results.
66-
- The DataFusion config setting, `datafusion.execution.parquet.pushdown_filters`, can produce incorrect results. We think this could be related to an issue with round trip physical path serialization.
109+
- The DataFusion config setting, `datafusion.execution.parquet.pushdown_filters`, can produce incorrect results. We think this could be related to an issue with round trip physical path serialization. At the moment, do not enable this setting, as it prevents physical plans from serializing correctly.

build.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,39 @@
1818
use std::path::Path;
1919

2020
fn main() -> Result<(), String> {
21-
/* use std::io::Write;
22-
23-
let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
24-
25-
// for use in docker build where file changes can be wonky
26-
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
27-
28-
let version = rustc_version::version().unwrap();
29-
println!("cargo:rustc-env=RUSTC_VERSION={version}");
30-
31-
let path = "src/proto/generated/protobuf.rs";
32-
33-
// We don't include the proto files in releases so that downstreams
34-
// do not need to have PROTOC included
35-
if Path::new("src/proto/datafusion_ray.proto").exists() {
36-
println!("cargo:rerun-if-changed=src/proto/datafusion_common.proto");
37-
println!("cargo:rerun-if-changed=src/proto/datafusion.proto");
38-
println!("cargo:rerun-if-changed=src/proto/datafusion_ray.proto");
39-
tonic_build::configure()
40-
.extern_path(".datafusion", "::datafusion_proto::protobuf")
41-
.extern_path(".datafusion_common", "::datafusion_proto::protobuf")
42-
.compile(&["src/proto/datafusion_ray.proto"], &["src/proto"])
43-
.map_err(|e| format!("protobuf compilation failed: {e}"))?;
44-
let generated_source_path = out.join("datafusion_ray.protobuf.rs");
45-
let code = std::fs::read_to_string(generated_source_path).unwrap();
46-
let mut file = std::fs::OpenOptions::new()
47-
.write(true)
48-
.truncate(true)
49-
.create(true)
50-
.open(path)
51-
.unwrap();
52-
file.write_all(code.as_str().as_ref()).unwrap();
53-
}
54-
*/
21+
use std::io::Write;
22+
23+
let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap());
24+
25+
// for use in docker build where file changes can be wonky
26+
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
27+
28+
let version = rustc_version::version().unwrap();
29+
println!("cargo:rustc-env=RUSTC_VERSION={version}");
30+
31+
let path = "src/proto/generated/protobuf.rs";
32+
33+
// We don't include the proto files in releases so that downstreams
34+
// do not need to have PROTOC included
35+
if Path::new("src/proto/datafusion_ray.proto").exists() {
36+
println!("cargo:rerun-if-changed=src/proto/datafusion_common.proto");
37+
println!("cargo:rerun-if-changed=src/proto/datafusion.proto");
38+
println!("cargo:rerun-if-changed=src/proto/datafusion_ray.proto");
39+
tonic_build::configure()
40+
.extern_path(".datafusion", "::datafusion_proto::protobuf")
41+
.extern_path(".datafusion_common", "::datafusion_proto::protobuf")
42+
.compile(&["src/proto/datafusion_ray.proto"], &["src/proto"])
43+
.map_err(|e| format!("protobuf compilation failed: {e}"))?;
44+
let generated_source_path = out.join("datafusion_ray.protobuf.rs");
45+
let code = std::fs::read_to_string(generated_source_path).unwrap();
46+
let mut file = std::fs::OpenOptions::new()
47+
.write(true)
48+
.truncate(true)
49+
.create(true)
50+
.open(path)
51+
.unwrap();
52+
file.write_all(code.as_str().as_ref()).unwrap();
53+
}
54+
5555
Ok(())
5656
}

0 commit comments

Comments
 (0)