Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/.idea
/target
/benchmarks/data/
/benchmarks/data/
testdata/tpch/data/
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,23 @@ object_store = "0.12.3"

# integration_tests deps
insta = { version = "1.43.1", features = ["filters"], optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true }
parquet = { version = "55.2.0", optional = true }
arrow = { version = "55.2.0", optional = true }

[features]
integration = [
"insta"
"insta",
"tpchgen",
"tpchgen-arrow",
"parquet",
"arrow"
]

[dev-dependencies]
insta = { version = "1.43.1", features = ["filters"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" }
parquet = "55.2.0"
arrow = "55.2.0"
35 changes: 15 additions & 20 deletions benchmarks/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use datafusion::{
common::plan_err,
error::Result,
};
use std::fs;
mod run;
pub use run::RunOpt;

Expand Down Expand Up @@ -140,28 +139,24 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
}
}

fn get_benchmark_queries_dir() -> std::path::PathBuf {
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../testdata/tpch/queries")
}

/// Get the SQL statements from the specified query file
pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
if query > 0 && query < 23 {
let possibilities = vec![
format!("queries/q{query}.sql"),
format!("benchmarks/queries/q{query}.sql"),
];
let mut errors = vec![];
for filename in possibilities {
match fs::read_to_string(&filename) {
Ok(contents) => {
return Ok(contents
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect());
}
Err(e) => errors.push(format!("{filename}: {e}")),
};
}
plan_err!("invalid query. Could not find query: {:?}", errors)
let queries_dir = get_benchmark_queries_dir();
let contents = datafusion_distributed::test_utils::tpch::tpch_query_from_dir(
&queries_dir,
query as u8,
);
Ok(contents
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
.collect())
} else {
plan_err!("invalid query. Expected value between 1 and 22")
}
Expand Down
86 changes: 78 additions & 8 deletions src/test_utils/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,34 @@ use datafusion::{
catalog::{MemTable, TableProvider},
};

pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
let schema = Arc::new(get_tpch_table_schema(name));
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
}
use std::fs;

use arrow::record_batch::RecordBatch;
use parquet::{arrow::arrow_writer::ArrowWriter, file::properties::WriterProperties};
use tpchgen::generators::{
CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
PartSuppGenerator, RegionGenerator, SupplierGenerator,
};
use tpchgen_arrow::{
CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow,
SupplierArrow,
};

pub fn tpch_query(num: u8) -> String {
// read the query from the test/tpch/queries/ directory and return it
let query_path = format!("testing/tpch/queries/q{}.sql", num);
std::fs::read_to_string(query_path)
pub fn tpch_query_from_dir(queries_dir: &std::path::Path, num: u8) -> String {
let query_path = queries_dir.join(format!("q{num}.sql"));
fs::read_to_string(query_path)
.unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num))
.trim()
.to_string()
}
pub const NUM_QUERIES: u8 = 22; // number of queries in the TPCH benchmark numbered from 1 to 22

const SCALE_FACTOR: f64 = 0.001;

pub fn tpch_table(name: &str) -> Arc<dyn TableProvider> {
let schema = Arc::new(get_tpch_table_schema(name));
Arc::new(MemTable::try_new(schema, vec![]).unwrap())
}

pub fn get_tpch_table_schema(table: &str) -> Schema {
// note that the schema intentionally uses signed integers so that any generated Parquet
Expand Down Expand Up @@ -113,3 +128,58 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
_ => unimplemented!(),
}
}

// generate_table creates a parquet file in the data directory from an arrow RecordBatch row
// source.
fn generate_table<A>(
mut data_source: A,
table_name: &str,
data_dir: &std::path::Path,
) -> Result<(), Box<dyn std::error::Error>>
where
A: Iterator<Item = RecordBatch>,
{
let output_path = data_dir.join(format!("{}.parquet", table_name));

if let Some(first_batch) = data_source.next() {
let file = fs::File::create(&output_path)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(file, first_batch.schema(), Some(props))?;

writer.write(&first_batch)?;

while let Some(batch) = data_source.next() {
writer.write(&batch)?;
}

writer.close()?;
}

Ok(())
}

macro_rules! must_generate_tpch_table {
($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => {
generate_table(
// TODO: Consider adjusting the partitions and batch sizes.
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
$name,
$data_dir,
)
.expect(concat!("Failed to generate ", $name, " table"));
};
}

// generate_tpch_data generates all TPC-H tables in the specified data directory.
pub fn generate_tpch_data(data_dir: &std::path::Path) {
fs::create_dir_all(data_dir).expect("Failed to create data directory");

must_generate_tpch_table!(RegionGenerator, RegionArrow, "region", data_dir);
must_generate_tpch_table!(NationGenerator, NationArrow, "nation", data_dir);
must_generate_tpch_table!(CustomerGenerator, CustomerArrow, "customer", data_dir);
must_generate_tpch_table!(SupplierGenerator, SupplierArrow, "supplier", data_dir);
must_generate_tpch_table!(PartGenerator, PartArrow, "part", data_dir);
must_generate_tpch_table!(PartSuppGenerator, PartSuppArrow, "partsupp", data_dir);
must_generate_tpch_table!(OrderGenerator, OrderArrow, "orders", data_dir);
must_generate_tpch_table!(LineItemGenerator, LineItemArrow, "lineitem", data_dir);
}
File renamed without changes.
3 changes: 1 addition & 2 deletions benchmarks/queries/q10.sql → testdata/tpch/queries/q10.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ group by
c_address,
c_comment
order by
revenue desc
limit 20;
revenue desc;
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 1 addition & 2 deletions benchmarks/queries/q18.sql → testdata/tpch/queries/q18.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,4 @@ group by
o_totalprice
order by
o_totalprice desc,
o_orderdate
limit 100;
o_orderdate;
File renamed without changes.
3 changes: 1 addition & 2 deletions benchmarks/queries/q2.sql → testdata/tpch/queries/q2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ order by
s_acctbal desc,
n_name,
s_name,
p_partkey
limit 100;
p_partkey;
File renamed without changes.
3 changes: 1 addition & 2 deletions benchmarks/queries/q21.sql → testdata/tpch/queries/q21.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ group by
s_name
order by
numwait desc,
s_name
limit 100;
s_name;
File renamed without changes.
3 changes: 1 addition & 2 deletions benchmarks/queries/q3.sql → testdata/tpch/queries/q3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ group by
o_shippriority
order by
revenue desc,
o_orderdate
limit 10;
o_orderdate;
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
28 changes: 28 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use datafusion_distributed::test_utils::tpch;

use tokio::sync::OnceCell;

pub fn get_test_data_dir() -> std::path::PathBuf {
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/data")
}

pub fn get_test_queries_dir() -> std::path::PathBuf {
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries")
}

pub fn get_test_tpch_query(num: u8) -> String {
let queries_dir = get_test_queries_dir();
tpch::tpch_query_from_dir(&queries_dir, num)
}

// OnceCell to ensure TPCH tables are generated only once for tests
static INIT_TEST_TPCH_TABLES: OnceCell<()> = OnceCell::const_new();

// ensure_tpch_data initializes the TPCH data on disk.
pub async fn ensure_tpch_data() {
INIT_TEST_TPCH_TABLES
.get_or_init(|| async {
tpch::generate_tpch_data(&get_test_data_dir());
})
.await;
}
Loading