Skip to content

Commit 7f7e445

Browse files
Add test comparing distributed + single node execution on TPCH data
* add TPCH queries from DataFusion repo (see tests/fixtures/tpch/queries) * add lazy TPCH parquet data generator using github.com/clflushopt/tpchgen-rs * write consistency test that runs queries twice (distributed and single-node) and asserts match
1 parent 0dbf427 commit 7f7e445

File tree

29 files changed

+357
-38
lines changed

29 files changed

+357
-38
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/.idea
22
/target
3-
/benchmarks/data/
3+
/benchmarks/data/
4+
testdata/tpch/data/

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,23 @@ object_store = "0.12.3"
3333

3434
# integration_tests deps
3535
insta = { version = "1.43.1", features = ["filters"], optional = true }
36+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
37+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
38+
parquet = { version = "55.2.0", optional = true }
39+
arrow = { version = "55.2.0", optional = true }
3640

3741
[features]
3842
integration = [
39-
"insta"
43+
"insta",
44+
"tpchgen",
45+
"tpchgen-arrow",
46+
"parquet",
47+
"arrow"
4048
]
4149

4250
[dev-dependencies]
4351
insta = { version = "1.43.1", features = ["filters"] }
52+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
53+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
54+
parquet = "55.2.0"
55+
arrow = "55.2.0"

benchmarks/src/tpch/mod.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use datafusion::{
2323
common::plan_err,
2424
error::Result,
2525
};
26-
use std::fs;
2726
mod run;
2827
pub use run::RunOpt;
2928

@@ -140,28 +139,24 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
140139
}
141140
}
142141

142+
fn get_benchmark_queries_dir() -> std::path::PathBuf {
143+
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../testdata/tpch/queries")
144+
}
145+
143146
/// Get the SQL statements from the specified query file
144147
pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
145148
if query > 0 && query < 23 {
146-
let possibilities = vec![
147-
format!("queries/q{query}.sql"),
148-
format!("benchmarks/queries/q{query}.sql"),
149-
];
150-
let mut errors = vec![];
151-
for filename in possibilities {
152-
match fs::read_to_string(&filename) {
153-
Ok(contents) => {
154-
return Ok(contents
155-
.split(';')
156-
.map(|s| s.trim())
157-
.filter(|s| !s.is_empty())
158-
.map(|s| s.to_string())
159-
.collect());
160-
}
161-
Err(e) => errors.push(format!("{filename}: {e}")),
162-
};
163-
}
164-
plan_err!("invalid query. Could not find query: {:?}", errors)
149+
let queries_dir = get_benchmark_queries_dir();
150+
let contents = datafusion_distributed::test_utils::tpch::tpch_query_from_dir(
151+
&queries_dir,
152+
query as u8,
153+
);
154+
Ok(contents
155+
.split(';')
156+
.map(|s| s.trim())
157+
.filter(|s| !s.is_empty())
158+
.map(|s| s.to_string())
159+
.collect())
165160
} else {
166161
plan_err!("invalid query. Expected value between 1 and 22")
167162
}

src/test_utils/tpch.rs

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,39 @@ use datafusion::{
55
catalog::{MemTable, TableProvider},
66
};
77

8+
use std::fs;
9+
10+
use arrow::record_batch::RecordBatch;
11+
use parquet::{arrow::arrow_writer::ArrowWriter, file::properties::WriterProperties};
12+
use tpchgen::generators::{
13+
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
14+
PartSuppGenerator, RegionGenerator, SupplierGenerator,
15+
};
16+
use tpchgen_arrow::{
17+
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow,
18+
SupplierArrow,
19+
};
20+
21+
pub fn tpch_query_from_dir(queries_dir: &std::path::Path, num: u8) -> String {
22+
let query_path = queries_dir.join(format!("q{num}.sql"));
23+
fs::read_to_string(query_path)
24+
.unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num))
25+
.trim()
26+
.to_string()
27+
}
28+
pub const NUM_QUERIES: u8 = 22; // number of queries in the TPCH benchmark numbered from 1 to 22
29+
30+
const SCALE_FACTOR: f64 = 0.001;
31+
832
pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
933
let schema = Arc::new(get_tpch_table_schema(name));
1034
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
1135
}
1236

1337
pub fn tpch_query(num: u8) -> String {
14-
// read the query from the test/tpch/queries/ directory and return it
15-
let query_path = format!("testing/tpch/queries/q{}.sql", num);
16-
std::fs::read_to_string(query_path)
17-
.unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num))
18-
.trim()
19-
.to_string()
38+
let queries_dir =
39+
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries");
40+
tpch_query_from_dir(&queries_dir, num)
2041
}
2142

2243
pub fn get_tpch_table_schema(table: &str) -> Schema {
@@ -113,3 +134,58 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
113134
_ => unimplemented!(),
114135
}
115136
}
137+
138+
// generate_table creates a parquet file in the data directory from an arrow RecordBatch row
139+
// source.
140+
fn generate_table<A>(
141+
mut data_source: A,
142+
table_name: &str,
143+
data_dir: &std::path::Path,
144+
) -> Result<(), Box<dyn std::error::Error>>
145+
where
146+
A: Iterator<Item = RecordBatch>,
147+
{
148+
let output_path = data_dir.join(format!("{}.parquet", table_name));
149+
150+
if let Some(first_batch) = data_source.next() {
151+
let file = fs::File::create(&output_path)?;
152+
let props = WriterProperties::builder().build();
153+
let mut writer = ArrowWriter::try_new(file, first_batch.schema(), Some(props))?;
154+
155+
writer.write(&first_batch)?;
156+
157+
while let Some(batch) = data_source.next() {
158+
writer.write(&batch)?;
159+
}
160+
161+
writer.close()?;
162+
}
163+
164+
Ok(())
165+
}
166+
167+
macro_rules! must_generate_tpch_table {
168+
($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => {
169+
generate_table(
170+
// TODO: Consider adjusting the partitions and batch sizes.
171+
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
172+
$name,
173+
$data_dir,
174+
)
175+
.expect(concat!("Failed to generate ", $name, " table"));
176+
};
177+
}
178+
179+
// generate_tpch_data generates all TPC-H tables in the specified data directory.
180+
pub fn generate_tpch_data(data_dir: &std::path::Path) {
181+
fs::create_dir_all(data_dir).expect("Failed to create data directory");
182+
183+
must_generate_tpch_table!(RegionGenerator, RegionArrow, "region", data_dir);
184+
must_generate_tpch_table!(NationGenerator, NationArrow, "nation", data_dir);
185+
must_generate_tpch_table!(CustomerGenerator, CustomerArrow, "customer", data_dir);
186+
must_generate_tpch_table!(SupplierGenerator, SupplierArrow, "supplier", data_dir);
187+
must_generate_tpch_table!(PartGenerator, PartArrow, "part", data_dir);
188+
must_generate_tpch_table!(PartSuppGenerator, PartSuppArrow, "partsupp", data_dir);
189+
must_generate_tpch_table!(OrderGenerator, OrderArrow, "orders", data_dir);
190+
must_generate_tpch_table!(LineItemGenerator, LineItemArrow, "lineitem", data_dir);
191+
}
File renamed without changes.

benchmarks/queries/q10.sql renamed to testdata/tpch/queries/q10.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,4 @@ group by
2828
c_address,
2929
c_comment
3030
order by
31-
revenue desc
32-
limit 20;
31+
revenue desc;
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)