Skip to content

Commit c073028

Browse files
Add test comparing distributed + single node execution on TPCH data
* add TPCH queries from DataFusion repo (see tests/fixtures/tpch/queries) * add lazy TPCH parquet data generator using github.com/clflushopt/tpchgen-rs * write consistency test that runs queries twice (distributed and single-node) and asserts match
1 parent 84026d6 commit c073028

File tree

27 files changed

+871
-4
lines changed

27 files changed

+871
-4
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/.idea
2-
/target
2+
/target
3+
tests/fixtures/tpch/data/

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ object_store = "0.12.3"
2525

2626
[dev-dependencies]
2727
insta = { version = "1.43.1", features = ["filters"] }
28+
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs" }
29+
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs" }
30+
parquet = "55.2.0"
31+
arrow = "55.2.0"

tests/common/tpch.rs

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,36 @@ use datafusion::{
55
catalog::{MemTable, TableProvider},
66
};
77

8+
use std::fs;
9+
10+
use tokio::sync::OnceCell;
11+
12+
use arrow::record_batch::RecordBatch;
13+
use parquet::{arrow::arrow_writer::ArrowWriter, file::properties::WriterProperties};
14+
use tpchgen::generators::{
15+
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
16+
PartSuppGenerator, RegionGenerator, SupplierGenerator,
17+
};
18+
use tpchgen_arrow::{
19+
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow,
20+
SupplierArrow,
21+
};
22+
23+
const QUERIES_DIR: &str = "tests/fixtures/tpch/queries";
24+
pub const DATA_DIR: &str = "tests/fixtures/tpch/data"; // mirroed in .gitignore
25+
pub const NUM_QUERIES: u8 = 22; // number of queries in the TPCH benchmark numbered from 1 to 22
26+
27+
const SCALE_FACTOR: f64 = 0.001;
28+
829
pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
930
let schema = Arc::new(get_tpch_table_schema(name));
1031
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
1132
}
1233

1334
pub fn tpch_query(num: u8) -> String {
14-
// read the query from the test/tpch/queries/ directory and return it
15-
let query_path = format!("testing/tpch/queries/q{}.sql", num);
16-
std::fs::read_to_string(query_path)
35+
// read the query from the queries directory in the fixtures dir and return it
36+
let query_path = format!("{}/q{}.sql", QUERIES_DIR, num);
37+
fs::read_to_string(query_path)
1738
.unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num))
1839
.trim()
1940
.to_string()
@@ -113,3 +134,61 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
113134
_ => unimplemented!(),
114135
}
115136
}
137+
138+
// generate_table creates a parquet file in the DATA_DIR directory from an arrow RecordBatch row
139+
// source.
140+
fn generate_table<A>(mut data_source: A, table_name: &str) -> Result<(), Box<dyn std::error::Error>>
141+
where
142+
A: Iterator<Item = RecordBatch>,
143+
{
144+
let output_path = format!("{}/{}.parquet", DATA_DIR, table_name);
145+
146+
if let Some(first_batch) = data_source.next() {
147+
let file = fs::File::create(&output_path)?;
148+
let props = WriterProperties::builder().build();
149+
let mut writer = ArrowWriter::try_new(file, first_batch.schema(), Some(props))?;
150+
151+
writer.write(&first_batch)?;
152+
153+
while let Some(batch) = data_source.next() {
154+
writer.write(&batch)?;
155+
}
156+
157+
writer.close()?;
158+
}
159+
160+
println!("Generated {} table: {}", table_name, output_path);
161+
Ok(())
162+
}
163+
164+
macro_rules! must_generate_tpch_table {
165+
($generator:ident, $arrow:ident, $name:literal) => {
166+
generate_table(
167+
// TODO: Consider adjusting the partitions and batch sizes.
168+
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
169+
$name,
170+
)
171+
.expect(concat!("Failed to generate ", $name, " table"));
172+
};
173+
}
174+
175+
// INIT_TPCH_TABLES is ensures that TPC-H tables are generated only once.
176+
static INIT_TPCH_TABLES: OnceCell<()> = OnceCell::const_new();
177+
178+
// generate_tpch_data generates all TPC-H tables in the DATA_DIR directory.
179+
pub async fn generate_tpch_data() {
180+
INIT_TPCH_TABLES
181+
.get_or_init(|| async {
182+
fs::create_dir_all(DATA_DIR).expect("Failed to create data directory");
183+
184+
must_generate_tpch_table!(RegionGenerator, RegionArrow, "region");
185+
must_generate_tpch_table!(NationGenerator, NationArrow, "nation");
186+
must_generate_tpch_table!(CustomerGenerator, CustomerArrow, "customer");
187+
must_generate_tpch_table!(SupplierGenerator, SupplierArrow, "supplier");
188+
must_generate_tpch_table!(PartGenerator, PartArrow, "part");
189+
must_generate_tpch_table!(PartSuppGenerator, PartSuppArrow, "partsupp");
190+
must_generate_tpch_table!(OrderGenerator, OrderArrow, "orders");
191+
must_generate_tpch_table!(LineItemGenerator, LineItemArrow, "lineitem");
192+
})
193+
.await;
194+
}

tests/fixtures/tpch/queries/q1.sql

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
select
2+
l_returnflag,
3+
l_linestatus,
4+
sum(l_quantity) as sum_qty,
5+
sum(l_extendedprice) as sum_base_price,
6+
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
7+
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
8+
avg(l_quantity) as avg_qty,
9+
avg(l_extendedprice) as avg_price,
10+
avg(l_discount) as avg_disc,
11+
count(*) as count_order
12+
from
13+
lineitem
14+
where
15+
l_shipdate <= date '1998-09-02'
16+
group by
17+
l_returnflag,
18+
l_linestatus
19+
order by
20+
l_returnflag,
21+
l_linestatus;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
select
2+
c_custkey,
3+
c_name,
4+
sum(l_extendedprice * (1 - l_discount)) as revenue,
5+
c_acctbal,
6+
n_name,
7+
c_address,
8+
c_phone,
9+
c_comment
10+
from
11+
customer,
12+
orders,
13+
lineitem,
14+
nation
15+
where
16+
c_custkey = o_custkey
17+
and l_orderkey = o_orderkey
18+
and o_orderdate >= date '1993-10-01'
19+
and o_orderdate < date '1994-01-01'
20+
and l_returnflag = 'R'
21+
and c_nationkey = n_nationkey
22+
group by
23+
c_custkey,
24+
c_name,
25+
c_acctbal,
26+
c_phone,
27+
n_name,
28+
c_address,
29+
c_comment
30+
order by
31+
revenue desc;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
select
2+
ps_partkey,
3+
sum(ps_supplycost * ps_availqty) as value
4+
from
5+
partsupp,
6+
supplier,
7+
nation
8+
where
9+
ps_suppkey = s_suppkey
10+
and s_nationkey = n_nationkey
11+
and n_name = 'GERMANY'
12+
group by
13+
ps_partkey having
14+
sum(ps_supplycost * ps_availqty) > (
15+
select
16+
sum(ps_supplycost * ps_availqty) * 0.0001
17+
from
18+
partsupp,
19+
supplier,
20+
nation
21+
where
22+
ps_suppkey = s_suppkey
23+
and s_nationkey = n_nationkey
24+
and n_name = 'GERMANY'
25+
)
26+
order by
27+
value desc;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
select
2+
l_shipmode,
3+
sum(case
4+
when o_orderpriority = '1-URGENT'
5+
or o_orderpriority = '2-HIGH'
6+
then 1
7+
else 0
8+
end) as high_line_count,
9+
sum(case
10+
when o_orderpriority <> '1-URGENT'
11+
and o_orderpriority <> '2-HIGH'
12+
then 1
13+
else 0
14+
end) as low_line_count
15+
from
16+
lineitem
17+
join
18+
orders
19+
on
20+
l_orderkey = o_orderkey
21+
where
22+
l_shipmode in ('MAIL', 'SHIP')
23+
and l_commitdate < l_receiptdate
24+
and l_shipdate < l_commitdate
25+
and l_receiptdate >= date '1994-01-01'
26+
and l_receiptdate < date '1995-01-01'
27+
group by
28+
l_shipmode
29+
order by
30+
l_shipmode;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
select
2+
c_count,
3+
count(*) as custdist
4+
from
5+
(
6+
select
7+
c_custkey,
8+
count(o_orderkey)
9+
from
10+
customer left outer join orders on
11+
c_custkey = o_custkey
12+
and o_comment not like '%special%requests%'
13+
group by
14+
c_custkey
15+
) as c_orders (c_custkey, c_count)
16+
group by
17+
c_count
18+
order by
19+
custdist desc,
20+
c_count desc;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
select
2+
100.00 * sum(case
3+
when p_type like 'PROMO%'
4+
then l_extendedprice * (1 - l_discount)
5+
else 0
6+
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
7+
from
8+
lineitem,
9+
part
10+
where
11+
l_partkey = p_partkey
12+
and l_shipdate >= date '1995-09-01'
13+
and l_shipdate < date '1995-10-01';

0 commit comments

Comments
 (0)