Skip to content

Commit c12c271

Browse files
authored
Support --threads and --workers on TPCH benchmarks (#130)
* Add support for in-memory TPCH tests * Add --threads and --workers options in tpch benchmarks * Remove useless stuff * Automatically resolve tpch paths * Draft: spawn workers correctly in different tokio runtimes * Register tables only in one place * Spawn benchmark workers as a separate command * Rollback unnecessary changes in localhost.rs * Add run script * Default partitions to threads * Pass through arguments
1 parent cc5d13a commit c12c271

File tree

5 files changed

+216
-237
lines changed

5 files changed

+216
-237
lines changed

benchmarks/README.md

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,81 @@
11
# Distributed DataFusion Benchmarks
22

3-
### Generating tpch data
3+
### Generating TPCH data
44

55
Generate TPCH data into the `data/` dir
66

77
```shell
88
./gen-tpch.sh
99
```
1010

11-
### Running tpch benchmarks
11+
### Running TPCH benchmarks in single-node mode
1212

13-
After generating the data with the command above:
13+
After generating the data with the command above, the benchmarks can be run with
1414

1515
```shell
16-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
16+
cargo run -p datafusion-distributed-benchmarks --release -- tpch
17+
```
18+
19+
For preloading the TPCH data in-memory, the `-m` flag can be passed
20+
21+
```shell
22+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
23+
```
24+
25+
For running the benchmarks with using just a specific amount of physical threads:
26+
27+
```shell
28+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
29+
```
30+
31+
### Running TPCH benchmarks in distributed mode
32+
33+
Running the benchmarks in distributed mode implies:
34+
35+
- running 1 or more workers in separate terminals
36+
- running the benchmarks in an additional terminal
37+
38+
The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:
39+
40+
```shell
41+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
42+
```
43+
44+
```shell
45+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
46+
```
47+
48+
```shell
49+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
50+
```
51+
52+
With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:
53+
54+
```shell
55+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
56+
```
57+
58+
A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
59+
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
60+
single-node with just 2 threads (1 * 3 = 2 total).
61+
62+
```shell
63+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
64+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
65+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
66+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
67+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
68+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
69+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
70+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
71+
```
72+
73+
```shell
74+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
75+
```
76+
77+
The `run.sh` script already does this for you in a more ergonomic way:
78+
79+
```shell
80+
WORKERS=8 run.sh --threads 2 -m
1781
```

benchmarks/run.sh

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
WORKERS=${WORKERS:-8}
6+
7+
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
8+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
9+
10+
if [ "$WORKERS" == "0" ]; then
11+
cargo run -p datafusion-distributed-benchmarks --release -- tpch "$@"
12+
exit
13+
fi
14+
15+
cleanup() {
16+
echo "Cleaning up processes..."
17+
for i in $(seq 1 $((WORKERS))); do
18+
kill "%$i"
19+
done
20+
}
21+
22+
wait_for_port() {
23+
local port=$1
24+
local timeout=30
25+
local elapsed=0
26+
while ! nc -z localhost "$port" 2>/dev/null; do
27+
if [ "$elapsed" -ge "$timeout" ]; then
28+
echo "Timeout waiting for port $port"
29+
return 1
30+
fi
31+
sleep 0.1
32+
elapsed=$((elapsed + 1))
33+
done
34+
echo "Port $port is ready"
35+
}
36+
37+
cargo build -p datafusion-distributed-benchmarks --release
38+
39+
trap cleanup EXIT INT TERM
40+
for i in $(seq 0 $((WORKERS-1))); do
41+
"$SCRIPT_DIR"/../target/release/dfbench tpch --spawn $((8000+i)) "$@" &
42+
done
43+
44+
echo "Waiting for worker ports to be ready..."
45+
for i in $(seq 0 $((WORKERS-1))); do
46+
wait_for_port $((8000+i))
47+
done
48+
49+
"$SCRIPT_DIR"/../target/release/dfbench tpch --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@"

benchmarks/src/bin/dfbench.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ enum Options {
3030
}
3131

3232
// Main benchmark runner entrypoint
33-
#[tokio::main]
34-
pub async fn main() -> Result<()> {
33+
pub fn main() -> Result<()> {
3534
env_logger::init();
3635

3736
match Options::from_args() {
38-
Options::Tpch(opt) => Box::pin(opt.run()).await,
39-
Options::TpchConvert(opt) => opt.run().await,
37+
Options::Tpch(opt) => opt.run(),
38+
Options::TpchConvert(opt) => {
39+
let rt = tokio::runtime::Runtime::new()?;
40+
rt.block_on(async { opt.run().await })
41+
}
4042
}
4143
}

0 commit comments

Comments
 (0)