Skip to content
Merged
72 changes: 68 additions & 4 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,81 @@
# Distributed DataFusion Benchmarks

### Generating tpch data
### Generating TPCH data

Generate TPCH data into the `data/` dir

```shell
./gen-tpch.sh
```

### Running tpch benchmarks
### Running TPCH benchmarks in single-node mode

After generating the data with the command above:
After generating the data with the command above, the benchmarks can be run with

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
cargo run -p datafusion-distributed-benchmarks --release -- tpch
```

For preloading the TPCH data in-memory, the `-m` flag can be passed

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
```

For running the benchmarks with using just a specific amount of physical threads:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
```

### Running TPCH benchmarks in distributed mode

Running the benchmarks in distributed mode implies:

- running 1 or more workers in separate terminals
- running the benchmarks in an additional terminal

The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
```

With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
```

A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
single-node with just 2 threads (1 * 3 = 2 total).

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
```

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
```

The `run.sh` script already does this for you in a more ergonomic way:

```shell
WORKERS=8 run.sh --threads 2 -m
```
49 changes: 49 additions & 0 deletions benchmarks/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash

set -e

WORKERS=${WORKERS:-8}

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

if [ "$WORKERS" == "0" ]; then
cargo run -p datafusion-distributed-benchmarks --release -- tpch "$@"
exit
fi

cleanup() {
echo "Cleaning up processes..."
for i in $(seq 1 $((WORKERS))); do
kill "%$i"
done
}

wait_for_port() {
local port=$1
local timeout=30
local elapsed=0
while ! nc -z localhost "$port" 2>/dev/null; do
if [ "$elapsed" -ge "$timeout" ]; then
echo "Timeout waiting for port $port"
return 1
fi
sleep 0.1
elapsed=$((elapsed + 1))
done
echo "Port $port is ready"
}

cargo build -p datafusion-distributed-benchmarks --release

trap cleanup EXIT INT TERM
for i in $(seq 0 $((WORKERS-1))); do
"$SCRIPT_DIR"/../target/release/dfbench tpch --spawn $((8000+i)) "$@" &
done

echo "Waiting for worker ports to be ready..."
for i in $(seq 0 $((WORKERS-1))); do
wait_for_port $((8000+i))
done

"$SCRIPT_DIR"/../target/release/dfbench tpch --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@"
10 changes: 6 additions & 4 deletions benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ enum Options {
}

// Main benchmark runner entrypoint
#[tokio::main]
pub async fn main() -> Result<()> {
pub fn main() -> Result<()> {
env_logger::init();

match Options::from_args() {
Options::Tpch(opt) => Box::pin(opt.run()).await,
Options::TpchConvert(opt) => opt.run().await,
Options::Tpch(opt) => opt.run(),
Options::TpchConvert(opt) => {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { opt.run().await })
}
}
}
Loading
Loading