Skip to content

Commit 831d617

Browse files
authored
Add Clickbench tests and benchmarks (#270)
* Split channel resolver in two * Simplify WorkerResolverExtension and ChannelResolverExtension * Add default builder to ArrowFlightEndpoint * Add some docs * Listen to clippy * Split get_flight_client_for_url in two * Fix conflicts * Remove unnecessary channel resolver * Improve WorkerResolver docs * Use one ChannelResolver per runtime * Improve error reporting on client connection failure * Add a from_session_builder method for constructing an InMemoryChannelResolver * Add ChannelResolver and WorkerResolver default implementations for Arcs * Make TPC-DS tests use DataFusion test dataset * Remove non-working in-memory option from benchmarks * Remove unnecessary utils folder * Refactor benchmark folder * Rename to prepare_tpch.rs * Adapt benchmarks for TPC-DS * Update benchmarks README.md * Fix conflicts * Use default session state builder * Add clickbench correctness tests * Add clickbench plans tests * Ignore also clickbench 3 * Rename file * Add clickbench to benchmarks
1 parent 69da7db commit 831d617

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1630
-15
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ jobs:
5959
key: "main.zip"
6060
- run: cargo test --features tpcds --test 'tpcds_*'
6161

62+
clickbench-test:
63+
runs-on: ubuntu-latest
64+
steps:
65+
- uses: actions/checkout@v4
66+
- uses: ./.github/actions/setup
67+
- uses: actions/cache@v4
68+
with:
69+
path: testdata/clickbench/
70+
key: "data"
71+
- run: cargo test --features clickbench --test 'clickbench_*'
72+
6273
format-check:
6374
runs-on: ubuntu-latest
6475
steps:

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ testdata/tpch/*
66
testdata/tpcds/*
77
!testdata/tpcds/queries
88
!testdata/tpcds/README.md
9+
testdata/clickbench/*
10+
!testdata/clickbench/queries

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ integration = [
6767

6868
tpch = ["integration"]
6969
tpcds = ["integration"]
70+
clickbench = ["integration"]
7071

7172
[dev-dependencies]
7273
structopt = "0.3"

benchmarks/gen-clickbench.sh

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
PARTITION_START=${PARTITION_START:-0}
6+
PARTITION_END=${PARTITION_END:-100}
7+
8+
echo "Generating ClickBench dataset"
9+
10+
11+
# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
12+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
13+
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
14+
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
15+
CLICKBENCH_DIR="${DATA_DIR}/clickbench_${PARTITION_START}-${PARTITION_END}"
16+
17+
echo "Creating clickbench dataset from partition ${PARTITION_START} to ${PARTITION_END}"
18+
19+
# Ensure the target data directory exists
20+
mkdir -p "${CLICKBENCH_DIR}"
21+
22+
$CARGO_COMMAND -- prepare-clickbench --output "${CLICKBENCH_DIR}" --partition-start "$PARTITION_START" --partition-end "$PARTITION_END"

benchmarks/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//! DataFusion Distributed benchmark runner
2+
mod prepare_clickbench;
23
mod prepare_tpcds;
34
mod prepare_tpch;
45
mod run;
@@ -12,6 +13,7 @@ enum Options {
1213
Run(run::RunOpt),
1314
PrepareTpch(prepare_tpch::PrepareTpchOpt),
1415
PrepareTpcds(prepare_tpcds::PrepareTpcdsOpt),
16+
PrepareClickbench(prepare_clickbench::PrepareClickBenchOpt),
1517
}
1618

1719
// Main benchmark runner entrypoint
@@ -28,5 +30,9 @@ pub fn main() -> Result<()> {
2830
let rt = tokio::runtime::Runtime::new()?;
2931
rt.block_on(async { opt.run().await })
3032
}
33+
Options::PrepareClickbench(opt) => {
34+
let rt = tokio::runtime::Runtime::new()?;
35+
rt.block_on(async { opt.run().await })
36+
}
3137
}
3238
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use datafusion::error::DataFusionError;
2+
use datafusion_distributed::test_utils::clickbench;
3+
use std::path::{Path, PathBuf};
4+
use structopt::StructOpt;
5+
6+
/// Prepare ClickBench parquet files for benchmarks
7+
#[derive(Debug, StructOpt)]
8+
pub struct PrepareClickBenchOpt {
9+
/// Output path
10+
#[structopt(parse(from_os_str), required = true, short = "o", long = "output")]
11+
output_path: PathBuf,
12+
13+
/// Clickbench dataset is partitioned in 100 files. You may not want to use all the files for
14+
/// the benchmark, so this allows setting from which file partition to start.
15+
#[structopt(long, default_value = "0")]
16+
partition_start: usize,
17+
18+
/// Clickbench dataset is partitioned in 100 files. You may not want to use all the files for
19+
/// the benchmark, so this allows setting a maximum in the file partition index.
20+
#[structopt(long, default_value = "100")]
21+
partition_end: usize,
22+
}
23+
24+
impl PrepareClickBenchOpt {
25+
pub async fn run(self) -> datafusion::common::Result<()> {
26+
clickbench::generate_clickbench_data(
27+
Path::new(&self.output_path),
28+
self.partition_start..self.partition_end,
29+
)
30+
.await
31+
.map_err(|e| DataFusionError::Internal(format!("{e:?}")))
32+
}
33+
}

benchmarks/src/run.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
2929
use datafusion::physical_plan::{collect, displayable};
3030
use datafusion::prelude::*;
3131
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
32-
use datafusion_distributed::test_utils::{tpcds, tpch};
32+
use datafusion_distributed::test_utils::{clickbench, tpcds, tpch};
3333
use datafusion_distributed::{
3434
ArrowFlightEndpoint, DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt,
3535
};
@@ -115,26 +115,27 @@ pub struct RunOpt {
115115
enum Dataset {
116116
Tpch,
117117
Tpcds,
118+
Clickbench,
118119
}
119120

120121
impl Dataset {
121122
fn infer_from_data_path(path: PathBuf) -> Result<Self, DataFusionError> {
122-
if path
123-
.iter()
124-
.any(|v| v.to_str().is_some_and(|v| v.contains("tpch")))
125-
{
126-
return Ok(Self::Tpch);
123+
fn path_contains(path: &Path, substr: &str) -> bool {
124+
path.iter()
125+
.any(|v| v.to_str().is_some_and(|v| v.contains(substr)))
127126
}
128-
if path
129-
.iter()
130-
.any(|v| v.to_str().is_some_and(|v| v.contains("tpcds")))
131-
{
132-
return Ok(Self::Tpcds);
127+
if path_contains(&path, "tpch") {
128+
Ok(Self::Tpch)
129+
} else if path_contains(&path, "tpcds") {
130+
Ok(Self::Tpcds)
131+
} else if path_contains(&path, "clickbench") {
132+
Ok(Self::Clickbench)
133+
} else {
134+
not_impl_err!(
135+
"Cannot infer benchmark dataset from path {}",
136+
path.display()
137+
)
133138
}
134-
not_impl_err!(
135-
"Cannot infer benchmark dataset from path {}",
136-
path.display()
137-
)
138139
}
139140

140141
fn queries(&self) -> Result<Vec<(usize, String)>, DataFusionError> {
@@ -145,6 +146,9 @@ impl Dataset {
145146
Dataset::Tpcds => (1..99 + 1)
146147
.map(|i| Ok((i, tpcds::get_test_tpcds_query(i)?)))
147148
.collect(),
149+
Dataset::Clickbench => (0..42 + 1)
150+
.map(|i| Ok((i, clickbench::get_test_clickbench_query(i)?)))
151+
.collect(),
148152
}
149153
}
150154
}

src/test_utils/clickbench.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use datafusion::common::{DataFusionError, internal_datafusion_err, internal_err};
2+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
3+
use std::fs;
4+
use std::io::Write;
5+
use std::ops::Range;
6+
use std::path::{Path, PathBuf};
7+
use tokio::task::JoinSet;
8+
9+
const URL: &str =
10+
"https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{}.parquet";
11+
12+
/// Load a single ClickBench query by ID (0-42).
13+
pub fn get_test_clickbench_query(id: usize) -> Result<String, DataFusionError> {
14+
let queries_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/clickbench/queries");
15+
16+
if !queries_dir.exists() {
17+
return internal_err!(
18+
"TPC-DS queries directory not found: {}",
19+
queries_dir.display()
20+
);
21+
}
22+
23+
let query_file = queries_dir.join(format!("q{id}.sql"));
24+
25+
if !query_file.exists() {
26+
return internal_err!("Query file not found: {}", query_file.display());
27+
}
28+
29+
let query_sql = fs::read_to_string(&query_file)
30+
.map_err(|e| {
31+
internal_datafusion_err!("Failed to read query file {}: {e}", query_file.display())
32+
})?
33+
.trim()
34+
.to_string();
35+
36+
Ok(query_sql)
37+
}
38+
39+
/// Downloads the datafusion-benchmarks repository as a zip file
40+
async fn download_benchmark(
41+
dest_path: PathBuf,
42+
i: usize,
43+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
44+
if dest_path.exists() {
45+
return Ok(());
46+
}
47+
48+
// Create directory if it doesn't exist
49+
if let Some(parent) = dest_path.parent() {
50+
fs::create_dir_all(parent)?;
51+
}
52+
53+
// Download the file
54+
let response = reqwest::get(URL.replace("{}", &i.to_string())).await?;
55+
let bytes = response.bytes().await?;
56+
57+
// Write to file
58+
let mut file = fs::File::create(&dest_path)?;
59+
file.write_all(&bytes)?;
60+
61+
println!("Downloaded to {}", dest_path.display());
62+
63+
Ok(())
64+
}
65+
66+
async fn download_partitioned(
67+
dest_path: PathBuf,
68+
range: Range<usize>,
69+
) -> Result<(), Box<dyn std::error::Error>> {
70+
let mut join_set = JoinSet::new();
71+
for i in range {
72+
let dest_path = dest_path.clone();
73+
join_set.spawn(async move {
74+
download_benchmark(dest_path.join("hits").join(format!("{i}.parquet")), i).await
75+
});
76+
}
77+
join_set.join_all().await;
78+
Ok(())
79+
}
80+
81+
pub async fn generate_clickbench_data(
82+
dest_path: &Path,
83+
range: Range<usize>,
84+
) -> Result<(), Box<dyn std::error::Error>> {
85+
download_partitioned(dest_path.to_path_buf(), range).await?;
86+
Ok(())
87+
}
88+
89+
pub async fn register_tables(
90+
ctx: &SessionContext,
91+
data_path: &Path,
92+
) -> Result<(), DataFusionError> {
93+
for entry in fs::read_dir(data_path)? {
94+
let path = entry?.path();
95+
if path.is_dir() {
96+
let table_name = path.file_name().unwrap().to_str().unwrap();
97+
ctx.register_parquet(
98+
table_name,
99+
path.to_str().unwrap(),
100+
ParquetReadOptions::default(),
101+
)
102+
.await?;
103+
}
104+
}
105+
Ok(())
106+
}

src/test_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod clickbench;
12
pub mod in_memory_channel_resolver;
23
pub mod insta;
34
pub mod localhost;

testdata/clickbench/queries/q0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
3+
-- set datafusion.execution.parquet.binary_as_string = true
4+
SELECT COUNT(*) FROM hits;

0 commit comments

Comments
 (0)