Skip to content

Commit 69da7db

Browse files
authored
Refactor benchmarks crate and add TPC-DS benchmarks (#269)
* Split channel resolver in two * Simplify WorkerResolverExtension and ChannelResolverExtension * Add default builder to ArrowFlightEndpoint * Add some docs * Listen to clippy * Split get_flight_client_for_url in two * Fix conflicts * Remove unnecessary channel resolver * Improve WorkerResolver docs * Use one ChannelResolver per runtime * Improve error reporting on client connection failure * Add a from_session_builder method for constructing an InMemoryChannelResolver * Add ChannelResolver and WorkerResolver default implementations for Arcs * Make TPC-DS tests use DataFusion test dataset * Remove non-working in-memory option from benchmarks * Remove unnecessary utils folder * Refactor benchmark folder * Rename to prepare_tpch.rs * Adapt benchmarks for TPC-DS * Update benchmarks README.md * Fix conflicts * Use default session state builder * Update benchmarks README.md * Make gen-tpcds.sh executable
1 parent a02666a commit 69da7db

22 files changed

+871
-1601
lines changed

benchmarks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ aws-sdk-ec2 = "1"
3030

3131
[[bin]]
3232
name = "dfbench"
33-
path = "src/bin/dfbench.rs"
33+
path = "src/main.rs"
3434

3535
[[bin]]
3636
name = "worker"

benchmarks/README.md

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,36 @@
11
# Distributed DataFusion Benchmarks
22

3-
### Generating TPCH data
3+
### Generating Benchmarking data
44

55
Generate TPCH data into the `data/` dir
66

77
```shell
88
./gen-tpch.sh
9+
./gen-tpcds.sh
910
```
1011

11-
### Running TPCH benchmarks in single-node mode
12+
### Running Benchmarks in single-node mode
1213

13-
After generating the data with the command above, the benchmarks can be run with
14+
After generating the data with the command above, the benchmarks can be run with:
1415

1516
```shell
16-
cargo run -p datafusion-distributed-benchmarks --release -- tpch
17+
WORKERS=0 ./benchmarks/run.sh --threads 2 --path benchmarks/data/tpch_sf1
1718
```
1819

19-
For preloading the TPCH data in-memory, the `-m` flag can be passed
20+
- `--threads`: This is the physical threads that the Tokio runtime will use for executing the binary.
21+
It's recommended to set `--threads` to something small, like `2`, for throttling each individual
22+
process running queries, and simulate how adding throttled workers can speed up the queries.
23+
- `--path`: It can point to any folder containing benchmark datasets.
2024

21-
```shell
22-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m
23-
```
24-
25-
For running the benchmarks with using just a specific amount of physical threads:
26-
27-
```shell
28-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3
29-
```
30-
31-
### Running TPCH benchmarks in distributed mode
32-
33-
Running the benchmarks in distributed mode implies:
34-
35-
- running 1 or more workers in separate terminals
36-
- running the benchmarks in an additional terminal
37-
38-
The workers can be spawned by passing the `--spawn <port>` flag, for example, for spawning 3 workers:
39-
40-
```shell
41-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
42-
```
43-
44-
```shell
45-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
46-
```
47-
48-
```shell
49-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002
50-
```
51-
52-
With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:
53-
54-
```shell
55-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002
56-
```
25+
### Running Benchmarks benchmarks in distributed mode
5726

58-
A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
59-
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
60-
single-node with just 2 threads (1 * 3 = 2 total).
27+
The same script is used for running distributed benchmarks:
6128

6229
```shell
63-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 &
64-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 &
65-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 &
66-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 &
67-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 &
68-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 &
69-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 &
70-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 &
30+
WORKERS=8 ./benchmarks/run.sh --threads 2 --path ./benchmarks/data/tpch_sf1 --files-per-task 2
7131
```
7232

73-
```shell
74-
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007
75-
```
76-
77-
The `run.sh` script already does this for you in a more ergonomic way:
78-
79-
```shell
80-
WORKERS=8 run.sh --threads 2 -m
81-
```
33+
- `WORKERS`: Env variable that sets the amount of localhost workers used in the query.
34+
- `--threads`: Sets the Tokio runtime threads for each individual worker and for the benchmarking binary.
35+
- `--path`: It can point to any folder containing benchmark datasets.
36+
- `--files-per-task`: How many files each distributed task will handle.

benchmarks/gen-tpcds.sh

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
SCALE_FACTOR=${SCALE_FACTOR:-1}
6+
PARTITIONS=${PARTITIONS:-16}
7+
8+
echo "Generating TPC-DS dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}"
9+
10+
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
11+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
12+
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
13+
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
14+
TPCDS_DIR="${DATA_DIR}/tpcds_sf${SCALE_FACTOR}"
15+
16+
echo "Creating tpcds dataset at Scale Factor ${SCALE_FACTOR} in ${TPCDS_DIR}..."
17+
18+
# Ensure the target data directory exists
19+
mkdir -p "${TPCDS_DIR}"
20+
21+
$CARGO_COMMAND -- prepare-tpcds --output "${TPCDS_DIR}" --partitions "$PARTITIONS"

benchmarks/gen-tpch.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ if test -d "${FILE}"; then
3333
else
3434
echo " creating parquet files using benchmark binary ..."
3535
pushd "${SCRIPT_DIR}" > /dev/null
36-
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet --partitions "$PARTITIONS"
36+
$CARGO_COMMAND -- prepare-tpch --input "${TPCH_DIR}" --output "${TPCH_DIR}" --partitions "$PARTITIONS"
3737
popd > /dev/null
3838
fi

benchmarks/run.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ WORKERS=${WORKERS:-8}
88
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
99

1010
if [ "$WORKERS" == "0" ]; then
11-
cargo run -p datafusion-distributed-benchmarks --release -- tpch "$@"
11+
cargo run -p datafusion-distributed-benchmarks --release -- run "$@"
1212
exit
1313
fi
1414

@@ -38,12 +38,12 @@ cargo build -p datafusion-distributed-benchmarks --release
3838

3939
trap cleanup EXIT INT TERM
4040
for i in $(seq 0 $((WORKERS-1))); do
41-
"$SCRIPT_DIR"/../target/release/dfbench tpch --spawn $((8000+i)) "$@" &
41+
"$SCRIPT_DIR"/../target/release/dfbench run --spawn $((8000+i)) "$@" &
4242
done
4343

4444
echo "Waiting for worker ports to be ready..."
4545
for i in $(seq 0 $((WORKERS-1))); do
4646
wait_for_port $((8000+i))
4747
done
4848

49-
"$SCRIPT_DIR"/../target/release/dfbench tpch --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@"
49+
"$SCRIPT_DIR"/../target/release/dfbench run --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@"

benchmarks/src/bin/dfbench.rs

Lines changed: 0 additions & 43 deletions
This file was deleted.

benchmarks/src/lib.rs

Lines changed: 0 additions & 2 deletions
This file was deleted.

benchmarks/src/main.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//! DataFusion Distributed benchmark runner
2+
mod prepare_tpcds;
3+
mod prepare_tpch;
4+
mod run;
5+
6+
use datafusion::error::Result;
7+
use structopt::StructOpt;
8+
9+
#[derive(Debug, StructOpt)]
10+
#[structopt(about = "benchmark command")]
11+
enum Options {
12+
Run(run::RunOpt),
13+
PrepareTpch(prepare_tpch::PrepareTpchOpt),
14+
PrepareTpcds(prepare_tpcds::PrepareTpcdsOpt),
15+
}
16+
17+
// Main benchmark runner entrypoint
18+
pub fn main() -> Result<()> {
19+
env_logger::init();
20+
21+
match Options::from_args() {
22+
Options::Run(opt) => opt.run(),
23+
Options::PrepareTpch(opt) => {
24+
let rt = tokio::runtime::Runtime::new()?;
25+
rt.block_on(async { opt.run().await })
26+
}
27+
Options::PrepareTpcds(opt) => {
28+
let rt = tokio::runtime::Runtime::new()?;
29+
rt.block_on(async { opt.run().await })
30+
}
31+
}
32+
}

benchmarks/src/prepare_tpcds.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use datafusion::error::DataFusionError;
2+
use datafusion_distributed::test_utils::tpcds;
3+
use std::path::{Path, PathBuf};
4+
use structopt::StructOpt;
5+
6+
/// Prepare TPC-DS parquet files for benchmarks
7+
#[derive(Debug, StructOpt)]
8+
pub struct PrepareTpcdsOpt {
9+
/// Output path
10+
#[structopt(parse(from_os_str), required = true, short = "o", long = "output")]
11+
output_path: PathBuf,
12+
13+
/// Number of partitions to produce. By default, uses only 1 partition.
14+
#[structopt(short = "n", long = "partitions", default_value = "1")]
15+
partitions: usize,
16+
17+
/// Scale factor of the TPC-DS data
18+
#[structopt(long, default_value = "1")]
19+
sf: f64,
20+
}
21+
22+
impl PrepareTpcdsOpt {
23+
pub async fn run(self) -> datafusion::common::Result<()> {
24+
tpcds::generate_tpcds_data(Path::new(&self.output_path), self.sf, self.partitions)
25+
.await
26+
.map_err(|e| DataFusionError::Internal(format!("{e:?}")))
27+
}
28+
}

0 commit comments

Comments
 (0)