Skip to content

Commit 3881319

Browse files
Add test comparing distributed + single node execution on TPCH data
* add TPCH queries from DataFusion repo (see tests/fixtures/tpch/queries) * add lazy TPCH parquet data generator using github.com/clflushopt/tpchgen-rs * write consistency test that runs queries twice (distributed and single-node) and asserts match
1 parent 0dbf427 commit 3881319

File tree

30 files changed

+368
-42
lines changed

30 files changed

+368
-42
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/.idea
22
/target
3-
/benchmarks/data/
3+
/benchmarks/data/
4+
testdata/tpch/data/

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,23 @@ object_store = "0.12.3"
3333

3434
# integration_tests deps
3535
insta = { version = "1.43.1", features = ["filters"], optional = true }
36+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
37+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
38+
parquet = { version = "55.2.0", optional = true }
39+
arrow = { version = "55.2.0", optional = true }
3640

3741
[features]
3842
integration = [
39-
"insta"
43+
"insta",
44+
"tpchgen",
45+
"tpchgen-arrow",
46+
"parquet",
47+
"arrow"
4048
]
4149

4250
[dev-dependencies]
4351
insta = { version = "1.43.1", features = ["filters"] }
52+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
53+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
54+
parquet = "55.2.0"
55+
arrow = "55.2.0"

benchmarks/src/tpch/mod.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use datafusion::{
2323
common::plan_err,
2424
error::Result,
2525
};
26-
use std::fs;
2726
mod run;
2827
pub use run::RunOpt;
2928

@@ -140,28 +139,24 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
140139
}
141140
}
142141

142+
fn get_benchmark_queries_dir() -> std::path::PathBuf {
143+
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../testdata/tpch/queries")
144+
}
145+
143146
/// Get the SQL statements from the specified query file
144147
pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
145148
if query > 0 && query < 23 {
146-
let possibilities = vec![
147-
format!("queries/q{query}.sql"),
148-
format!("benchmarks/queries/q{query}.sql"),
149-
];
150-
let mut errors = vec![];
151-
for filename in possibilities {
152-
match fs::read_to_string(&filename) {
153-
Ok(contents) => {
154-
return Ok(contents
155-
.split(';')
156-
.map(|s| s.trim())
157-
.filter(|s| !s.is_empty())
158-
.map(|s| s.to_string())
159-
.collect());
160-
}
161-
Err(e) => errors.push(format!("{filename}: {e}")),
162-
};
163-
}
164-
plan_err!("invalid query. Could not find query: {:?}", errors)
149+
let queries_dir = get_benchmark_queries_dir();
150+
let contents = datafusion_distributed::test_utils::tpch::tpch_query_from_dir(
151+
&queries_dir,
152+
query as u8,
153+
);
154+
Ok(contents
155+
.split(';')
156+
.map(|s| s.trim())
157+
.filter(|s| !s.is_empty())
158+
.map(|s| s.to_string())
159+
.collect())
165160
} else {
166161
plan_err!("invalid query. Expected value between 1 and 22")
167162
}

src/test_utils/tpch.rs

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,34 @@ use datafusion::{
55
catalog::{MemTable, TableProvider},
66
};
77

8-
pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
9-
let schema = Arc::new(get_tpch_table_schema(name));
10-
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
11-
}
8+
use std::fs;
9+
10+
use arrow::record_batch::RecordBatch;
11+
use parquet::{arrow::arrow_writer::ArrowWriter, file::properties::WriterProperties};
12+
use tpchgen::generators::{
13+
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
14+
PartSuppGenerator, RegionGenerator, SupplierGenerator,
15+
};
16+
use tpchgen_arrow::{
17+
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow,
18+
SupplierArrow,
19+
};
1220

13-
pub fn tpch_query(num: u8) -> String {
14-
// read the query from the test/tpch/queries/ directory and return it
15-
let query_path = format!("testing/tpch/queries/q{}.sql", num);
16-
std::fs::read_to_string(query_path)
21+
pub fn tpch_query_from_dir(queries_dir: &std::path::Path, num: u8) -> String {
22+
let query_path = queries_dir.join(format!("q{num}.sql"));
23+
fs::read_to_string(query_path)
1724
.unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num))
1825
.trim()
1926
.to_string()
2027
}
28+
pub const NUM_QUERIES: u8 = 22; // number of queries in the TPCH benchmark numbered from 1 to 22
29+
30+
const SCALE_FACTOR: f64 = 0.001;
31+
32+
pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
33+
let schema = Arc::new(get_tpch_table_schema(name));
34+
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
35+
}
2136

2237
pub fn get_tpch_table_schema(table: &str) -> Schema {
2338
// note that the schema intentionally uses signed integers so that any generated Parquet
@@ -113,3 +128,58 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
113128
_ => unimplemented!(),
114129
}
115130
}
131+
132+
// generate_table creates a parquet file in the data directory from an arrow RecordBatch row
133+
// source.
134+
fn generate_table<A>(
135+
mut data_source: A,
136+
table_name: &str,
137+
data_dir: &std::path::Path,
138+
) -> Result<(), Box<dyn std::error::Error>>
139+
where
140+
A: Iterator<Item = RecordBatch>,
141+
{
142+
let output_path = data_dir.join(format!("{}.parquet", table_name));
143+
144+
if let Some(first_batch) = data_source.next() {
145+
let file = fs::File::create(&output_path)?;
146+
let props = WriterProperties::builder().build();
147+
let mut writer = ArrowWriter::try_new(file, first_batch.schema(), Some(props))?;
148+
149+
writer.write(&first_batch)?;
150+
151+
while let Some(batch) = data_source.next() {
152+
writer.write(&batch)?;
153+
}
154+
155+
writer.close()?;
156+
}
157+
158+
Ok(())
159+
}
160+
161+
macro_rules! must_generate_tpch_table {
162+
($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => {
163+
generate_table(
164+
// TODO: Consider adjusting the partitions and batch sizes.
165+
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
166+
$name,
167+
$data_dir,
168+
)
169+
.expect(concat!("Failed to generate ", $name, " table"));
170+
};
171+
}
172+
173+
// generate_tpch_data generates all TPC-H tables in the specified data directory.
174+
pub fn generate_tpch_data(data_dir: &std::path::Path) {
175+
fs::create_dir_all(data_dir).expect("Failed to create data directory");
176+
177+
must_generate_tpch_table!(RegionGenerator, RegionArrow, "region", data_dir);
178+
must_generate_tpch_table!(NationGenerator, NationArrow, "nation", data_dir);
179+
must_generate_tpch_table!(CustomerGenerator, CustomerArrow, "customer", data_dir);
180+
must_generate_tpch_table!(SupplierGenerator, SupplierArrow, "supplier", data_dir);
181+
must_generate_tpch_table!(PartGenerator, PartArrow, "part", data_dir);
182+
must_generate_tpch_table!(PartSuppGenerator, PartSuppArrow, "partsupp", data_dir);
183+
must_generate_tpch_table!(OrderGenerator, OrderArrow, "orders", data_dir);
184+
must_generate_tpch_table!(LineItemGenerator, LineItemArrow, "lineitem", data_dir);
185+
}
File renamed without changes.

benchmarks/queries/q10.sql renamed to testdata/tpch/queries/q10.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,4 @@ group by
2828
c_address,
2929
c_comment
3030
order by
31-
revenue desc
32-
limit 20;
31+
revenue desc;
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)