Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = ["benchmarks"]

[workspace.dependencies]
datafusion = { version = "49.0.0" }
datafusion-proto = { version = "49.0.0" }

[package]
name = "datafusion-distributed"
Expand All @@ -11,7 +12,7 @@ edition = "2021"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { version = "49.0.0" }
datafusion-proto = { workspace = true }
arrow-flight = "55.2.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
Expand Down
10 changes: 4 additions & 6 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ default-run = "dfbench"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
tokio = { version = "1.46.1", features = ["full"] }
parquet = { version = "55.2.0" }
Expand All @@ -15,14 +16,11 @@ serde = "1.0.219"
serde_json = "1.0.141"
env_logger = "0.11.8"
async-trait = "0.1.88"
datafusion-proto = { version = "49.0.0", optional = true }
chrono = "0.4.41"
futures = "0.3.31"
dashmap = "6.1.0"
prost = "0.13.5"

[[bin]]
name = "dfbench"
path = "src/bin/dfbench.rs"

[features]
ci = [
"datafusion-proto"
]
72 changes: 68 additions & 4 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,81 @@
# Distributed DataFusion Benchmarks

### Generating tpch data
### Generating TPCH data

Generate TPCH data into the `data/` dir

```shell
./gen-tpch.sh
```

### Running tpch benchmarks
### Running TPCH benchmarks in single-node mode

After generating the data with the command above:
After generating the data with the command above, the benchmarks can be run with

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
cargo run -p datafusion-distributed-benchmarks --release -- tpch
```

For preloading the TPCH data in-memory, the `-m` flag can be passed

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
```

For running the benchmarks with using just a specific amount of physical threads:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
```

### Running TPCH benchmarks in distributed mode

Running the benchmarks in distributed mode implies:

- running 1 or more workers in separate terminals
- running the benchmarks in an additional terminal

The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
```

With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
```

A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
single-node with just 2 threads (1 * 3 = 2 total).

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
```

The `run.sh` script already does this for you in a more ergonomic way:

```shell
WORKERS=8 THREADS=2 ./run.sh
```
50 changes: 50 additions & 0 deletions benchmarks/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash

set -e

THREADS=${THREADS:-2}
WORKERS=${WORKERS:-8}

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

if [ "$WORKERS" == "0" ]; then
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads "$THREADS"
exit
fi

cleanup() {
echo "Cleaning up processes..."
for i in $(seq 1 $((WORKERS))); do
kill "%$i"
done
}

wait_for_port() {
local port=$1
local timeout=30
local elapsed=0
while ! nc -z localhost "$port" 2>/dev/null; do
if [ "$elapsed" -ge "$timeout" ]; then
echo "Timeout waiting for port $port"
return 1
fi
sleep 0.1
elapsed=$((elapsed + 1))
done
echo "Port $port is ready"
}

cargo build -p datafusion-distributed-benchmarks --release

trap cleanup EXIT INT TERM
for i in $(seq 0 $((WORKERS-1))); do
"$SCRIPT_DIR"/../target/release/dfbench tpch -m --threads "$THREADS" --spawn $((8000+i)) &
done

echo "Waiting for worker ports to be ready..."
for i in $(seq 0 $((WORKERS-1))); do
wait_for_port $((8000+i))
done

"$SCRIPT_DIR"/../target/release/dfbench tpch -m --threads "$THREADS" --workers $(seq -s, 8000 $((8000+WORKERS-1)))
10 changes: 6 additions & 4 deletions benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ enum Options {
}

// Main benchmark runner entrypoint
#[tokio::main]
pub async fn main() -> Result<()> {
pub fn main() -> Result<()> {
env_logger::init();

match Options::from_args() {
Options::Tpch(opt) => Box::pin(opt.run()).await,
Options::TpchConvert(opt) => opt.run().await,
Options::Tpch(opt) => opt.run(),
Options::TpchConvert(opt) => {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { opt.run().await })
}
}
}
Loading