Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = ["benchmarks"]

[workspace.dependencies]
datafusion = { version = "49.0.0" }
datafusion-proto = { version = "49.0.0" }

[package]
name = "datafusion-distributed"
Expand All @@ -11,7 +12,7 @@ edition = "2021"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { version = "49.0.0" }
datafusion-proto = { workspace = true }
arrow-flight = "55.2.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
Expand Down
10 changes: 4 additions & 6 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ default-run = "dfbench"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
tokio = { version = "1.46.1", features = ["full"] }
parquet = { version = "55.2.0" }
Expand All @@ -15,14 +16,11 @@ serde = "1.0.219"
serde_json = "1.0.141"
env_logger = "0.11.8"
async-trait = "0.1.88"
datafusion-proto = { version = "49.0.0", optional = true }
chrono = "0.4.41"
futures = "0.3.31"
dashmap = "6.1.0"
prost = "0.13.5"

[[bin]]
name = "dfbench"
path = "src/bin/dfbench.rs"

[features]
ci = [
"datafusion-proto"
]
72 changes: 68 additions & 4 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,81 @@
# Distributed DataFusion Benchmarks

### Generating tpch data
### Generating TPCH data

Generate TPCH data into the `data/` dir

```shell
./gen-tpch.sh
```

### Running tpch benchmarks
### Running TPCH benchmarks in single-node mode

After generating the data with the command above:
After generating the data with the command above, the benchmarks can be run with

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
cargo run -p datafusion-distributed-benchmarks --release -- tpch
```

For preloading the TPCH data in-memory, the `-m` flag can be passed

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
```

For running the benchmarks with using just a specific amount of physical threads:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
```

### Running TPCH benchmarks in distributed mode

Running the benchmarks in distributed mode implies:

- running 1 or more workers in separate terminals
- running the benchmarks in an additional terminal

The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
```

With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
```

A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
single-node with just 2 threads (1 * 3 = 2 total).

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
```

The `run.sh` script already does this for you in a more ergonomic way:

```shell
WORKERS=8 run.sh --threads 2 -m
```
49 changes: 49 additions & 0 deletions benchmarks/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash

set -e

WORKERS=${WORKERS:-8}

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

if [ "$WORKERS" == "0" ]; then
cargo run -p datafusion-distributed-benchmarks --release -- tpch "$@"
exit
fi

cleanup() {
echo "Cleaning up processes..."
for i in $(seq 1 $((WORKERS))); do
kill "%$i"
done
}

wait_for_port() {
local port=$1
local timeout=30
local elapsed=0
while ! nc -z localhost "$port" 2>/dev/null; do
if [ "$elapsed" -ge "$timeout" ]; then
echo "Timeout waiting for port $port"
return 1
fi
sleep 0.1
elapsed=$((elapsed + 1))
done
echo "Port $port is ready"
}

cargo build -p datafusion-distributed-benchmarks --release

trap cleanup EXIT INT TERM
for i in $(seq 0 $((WORKERS-1))); do
"$SCRIPT_DIR"/../target/release/dfbench tpch --spawn $((8000+i)) "$@" &
done

echo "Waiting for worker ports to be ready..."
for i in $(seq 0 $((WORKERS-1))); do
wait_for_port $((8000+i))
done

"$SCRIPT_DIR"/../target/release/dfbench tpch --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@"
10 changes: 6 additions & 4 deletions benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ enum Options {
}

// Main benchmark runner entrypoint
#[tokio::main]
pub async fn main() -> Result<()> {
pub fn main() -> Result<()> {
env_logger::init();

match Options::from_args() {
Options::Tpch(opt) => Box::pin(opt.run()).await,
Options::TpchConvert(opt) => opt.run().await,
Options::Tpch(opt) => opt.run(),
Options::TpchConvert(opt) => {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { opt.run().await })
}
}
}
Loading
Loading