Skip to content

Commit 76120df

Browse files
committed
Add run script
1 parent 0f83b86 commit 76120df

File tree

3 files changed

+120
-7
lines changed

3 files changed

+120
-7
lines changed

benchmarks/README.md

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,81 @@
11
# Distributed DataFusion Benchmarks
22

3-
### Generating tpch data
3+
### Generating TPCH data
44

55
Generate TPCH data into the `data/` dir
66

77
```shell
88
./gen-tpch.sh
99
```
1010

11-
### Running tpch benchmarks
11+
### Running TPCH benchmarks in single-node mode
1212

13-
After generating the data with the command above:
13+
After generating the data with the command above, the benchmarks can be run with
1414

1515
```shell
16-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
16+
cargo run -p datafusion-distributed-benchmarks --release -- tpch
17+
```
18+
19+
For preloading the TPCH data in-memory, the `-m` flag can be passed
20+
21+
```shell
22+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
23+
```
24+
25+
For running the benchmarks with using just a specific amount of physical threads:
26+
27+
```shell
28+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
29+
```
30+
31+
### Running TPCH benchmarks in distributed mode
32+
33+
Running the benchmarks in distributed mode implies:
34+
35+
- running 1 or more workers in separate terminals
36+
- running the benchmarks in an additional terminal
37+
38+
The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:
39+
40+
```shell
41+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
42+
```
43+
44+
```shell
45+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
46+
```
47+
48+
```shell
49+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
50+
```
51+
52+
With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:
53+
54+
```shell
55+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
56+
```
57+
58+
A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
59+
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
60+
single-node with just 2 threads (1 * 3 = 2 total).
61+
62+
```shell
63+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
64+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
65+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
66+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
67+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
68+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
69+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
70+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
71+
```
72+
73+
```shell
74+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
75+
```
76+
77+
The `run.sh` script already does this for you in a more ergonomic way:
78+
79+
```shell
80+
WORKERS=8 THREADS=2 ./run.sh
1781
```

benchmarks/run.sh

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
THREADS=${THREADS:-2}
6+
WORKERS=${WORKERS:-8}
7+
8+
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
9+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
10+
11+
if [ "$WORKERS" == "0" ]; then
12+
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads "$THREADS"
13+
exit
14+
fi
15+
16+
cleanup() {
17+
echo "Cleaning up processes..."
18+
for i in $(seq 1 $((WORKERS))); do
19+
kill "%$i"
20+
done
21+
}
22+
23+
wait_for_port() {
24+
local port=$1
25+
local timeout=30
26+
local elapsed=0
27+
while ! nc -z localhost "$port" 2>/dev/null; do
28+
if [ "$elapsed" -ge "$timeout" ]; then
29+
echo "Timeout waiting for port $port"
30+
return 1
31+
fi
32+
sleep 0.1
33+
elapsed=$((elapsed + 1))
34+
done
35+
echo "Port $port is ready"
36+
}
37+
38+
cargo build -p datafusion-distributed-benchmarks --release
39+
40+
trap cleanup EXIT INT TERM
41+
for i in $(seq 0 $((WORKERS-1))); do
42+
"$SCRIPT_DIR"/../target/release/dfbench tpch -m --threads "$THREADS" --spawn $((8000+i)) &
43+
done
44+
45+
echo "Waiting for worker ports to be ready..."
46+
for i in $(seq 0 $((WORKERS-1))); do
47+
wait_for_port $((8000+i))
48+
done
49+
50+
"$SCRIPT_DIR"/../target/release/dfbench tpch -m --threads "$THREADS" --workers $(seq -s, 8000 $((8000+WORKERS-1)))

benchmarks/src/tpch/run.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,12 @@ impl DistributedSessionBuilder for RunOpt {
125125
) -> Result<SessionState, DataFusionError> {
126126
let mut builder = SessionStateBuilder::new().with_default_features();
127127

128-
let mut config = self
128+
let config = self
129129
.common
130130
.config()?
131131
.with_collect_statistics(!self.disable_statistics)
132132
.with_distributed_user_codec(InMemoryCacheExecCodec)
133+
.with_distributed_channel_resolver(LocalHostChannelResolver::new(self.workers.clone()))
133134
.with_distributed_option_extension_from_headers::<WarmingUpMarker>(&ctx.headers)?
134135
.with_target_partitions(self.partitions());
135136

@@ -143,8 +144,6 @@ impl DistributedSessionBuilder for RunOpt {
143144
if let Some(partitions_per_task) = self.partitions_per_task {
144145
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
145146
}
146-
let ports = self.workers.clone();
147-
config = config.with_distributed_channel_resolver(LocalHostChannelResolver::new(ports));
148147
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
149148
}
150149

0 commit comments

Comments
 (0)