diff --git a/.gitignore b/.gitignore index ce69e802..be0175de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /.idea /target -/benchmarks/data/ \ No newline at end of file +/benchmarks/data/ +testdata/tpch/data/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 84e8d020..9ab5890a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1113,6 +1113,7 @@ dependencies = [ name = "datafusion-distributed" version = "0.1.0" dependencies = [ + "arrow", "arrow-flight", "async-trait", "dashmap", @@ -1124,11 +1125,14 @@ dependencies = [ "insta", "itertools", "object_store", + "parquet", "prost", "rand 0.8.5", "tokio", "tonic", "tower 0.5.2", + "tpchgen", + "tpchgen-arrow", "url", "uuid", ] @@ -3469,6 +3473,20 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tpchgen" +version = "1.1.1" +source = "git+https://github.com/clflushopt/tpchgen-rs?rev=c8d823432528eed4f70fca5a1296a66c68a389a8#c8d823432528eed4f70fca5a1296a66c68a389a8" + +[[package]] +name = "tpchgen-arrow" +version = "1.1.1" +source = "git+https://github.com/clflushopt/tpchgen-rs?rev=c8d823432528eed4f70fca5a1296a66c68a389a8#c8d823432528eed4f70fca5a1296a66c68a389a8" +dependencies = [ + "arrow", + "tpchgen", +] + [[package]] name = "tracing" version = "0.1.41" diff --git a/Cargo.toml b/Cargo.toml index 0235c81f..c7ac91ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,11 +33,23 @@ object_store = "0.12.3" # integration_tests deps insta = { version = "1.43.1", features = ["filters"], optional = true } +tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true } +tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8", optional = true } +parquet = { version = "55.2.0", optional = true } +arrow = { version = "55.2.0", optional = true } [features] integration = [ - "insta" + "insta", + "tpchgen", + "tpchgen-arrow", + "parquet", + "arrow" ] [dev-dependencies] insta = { version = "1.43.1", features = ["filters"] } +tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" } +tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "c8d823432528eed4f70fca5a1296a66c68a389a8" } +parquet = "55.2.0" +arrow = "55.2.0" diff --git a/benchmarks/src/tpch/mod.rs b/benchmarks/src/tpch/mod.rs index d4aea119..e8f558c9 100644 --- a/benchmarks/src/tpch/mod.rs +++ b/benchmarks/src/tpch/mod.rs @@ -23,7 +23,6 @@ use datafusion::{ common::plan_err, error::Result, }; -use std::fs; mod run; pub use run::RunOpt; @@ -140,28 +139,24 @@ pub fn get_tpch_table_schema(table: &str) -> Schema { } } +fn get_benchmark_queries_dir() -> std::path::PathBuf { + std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("../testdata/tpch/queries") +} + /// Get the SQL statements from the specified query file pub fn get_query_sql(query: usize) -> Result> { if query > 0 && query < 23 { - let possibilities = vec![ - format!("queries/q{query}.sql"), - format!("benchmarks/queries/q{query}.sql"), - ]; - let mut errors = vec![]; - for filename in possibilities { - match fs::read_to_string(&filename) { - Ok(contents) => { - return Ok(contents - .split(';') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .map(|s| s.to_string()) - .collect()); - } - Err(e) => errors.push(format!("{filename}: {e}")), - }; - } - plan_err!("invalid query. Could not find query: {:?}", errors) + let queries_dir = get_benchmark_queries_dir(); + let contents = datafusion_distributed::test_utils::tpch::tpch_query_from_dir( + &queries_dir, + query as u8, + ); + Ok(contents + .split(';') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + .collect()) } else { plan_err!("invalid query. Expected value between 1 and 22") } diff --git a/src/test_utils/tpch.rs b/src/test_utils/tpch.rs index d96eca18..e435eab2 100644 --- a/src/test_utils/tpch.rs +++ b/src/test_utils/tpch.rs @@ -5,19 +5,34 @@ use datafusion::{ catalog::{MemTable, TableProvider}, }; -pub fn tpch_table(name: &str) -> Arc { - let schema = Arc::new(get_tpch_table_schema(name)); - Arc::new(MemTable::try_new(schema, vec![]).unwrap()) -} +use std::fs; + +use arrow::record_batch::RecordBatch; +use parquet::{arrow::arrow_writer::ArrowWriter, file::properties::WriterProperties}; +use tpchgen::generators::{ + CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator, + PartSuppGenerator, RegionGenerator, SupplierGenerator, +}; +use tpchgen_arrow::{ + CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, PartSuppArrow, RegionArrow, + SupplierArrow, +}; -pub fn tpch_query(num: u8) -> String { - // read the query from the test/tpch/queries/ directory and return it - let query_path = format!("testing/tpch/queries/q{}.sql", num); - std::fs::read_to_string(query_path) +pub fn tpch_query_from_dir(queries_dir: &std::path::Path, num: u8) -> String { + let query_path = queries_dir.join(format!("q{num}.sql")); + fs::read_to_string(query_path) .unwrap_or_else(|_| panic!("Failed to read TPCH query file: q{}.sql", num)) .trim() .to_string() } +pub const NUM_QUERIES: u8 = 22; // number of queries in the TPCH benchmark numbered from 1 to 22 + +const SCALE_FACTOR: f64 = 0.001; + +pub fn tpch_table(name: &str) -> Arc { + let schema = Arc::new(get_tpch_table_schema(name)); + Arc::new(MemTable::try_new(schema, vec![]).unwrap()) +} pub fn get_tpch_table_schema(table: &str) -> Schema { // note that the schema intentionally uses signed integers so that any generated Parquet @@ -113,3 +128,58 @@ pub fn get_tpch_table_schema(table: &str) -> Schema { _ => unimplemented!(), } } + +// generate_table creates a parquet file in the data directory from an arrow RecordBatch row +// source. +fn generate_table( + mut data_source: A, + table_name: &str, + data_dir: &std::path::Path, +) -> Result<(), Box> +where + A: Iterator, +{ + let output_path = data_dir.join(format!("{}.parquet", table_name)); + + if let Some(first_batch) = data_source.next() { + let file = fs::File::create(&output_path)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, first_batch.schema(), Some(props))?; + + writer.write(&first_batch)?; + + while let Some(batch) = data_source.next() { + writer.write(&batch)?; + } + + writer.close()?; + } + + Ok(()) +} + +macro_rules! must_generate_tpch_table { + ($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => { + generate_table( + // TODO: Consider adjusting the partitions and batch sizes. + $arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000), + $name, + $data_dir, + ) + .expect(concat!("Failed to generate ", $name, " table")); + }; +} + +// generate_tpch_data generates all TPC-H tables in the specified data directory. +pub fn generate_tpch_data(data_dir: &std::path::Path) { + fs::create_dir_all(data_dir).expect("Failed to create data directory"); + + must_generate_tpch_table!(RegionGenerator, RegionArrow, "region", data_dir); + must_generate_tpch_table!(NationGenerator, NationArrow, "nation", data_dir); + must_generate_tpch_table!(CustomerGenerator, CustomerArrow, "customer", data_dir); + must_generate_tpch_table!(SupplierGenerator, SupplierArrow, "supplier", data_dir); + must_generate_tpch_table!(PartGenerator, PartArrow, "part", data_dir); + must_generate_tpch_table!(PartSuppGenerator, PartSuppArrow, "partsupp", data_dir); + must_generate_tpch_table!(OrderGenerator, OrderArrow, "orders", data_dir); + must_generate_tpch_table!(LineItemGenerator, LineItemArrow, "lineitem", data_dir); +} diff --git a/benchmarks/queries/q1.sql b/testdata/tpch/queries/q1.sql similarity index 100% rename from benchmarks/queries/q1.sql rename to testdata/tpch/queries/q1.sql diff --git a/benchmarks/queries/q10.sql b/testdata/tpch/queries/q10.sql similarity index 95% rename from benchmarks/queries/q10.sql rename to testdata/tpch/queries/q10.sql index 8613fd49..cf45e434 100644 --- a/benchmarks/queries/q10.sql +++ b/testdata/tpch/queries/q10.sql @@ -28,5 +28,4 @@ group by c_address, c_comment order by - revenue desc -limit 20; + revenue desc; \ No newline at end of file diff --git a/benchmarks/queries/q11.sql b/testdata/tpch/queries/q11.sql similarity index 100% rename from benchmarks/queries/q11.sql rename to testdata/tpch/queries/q11.sql diff --git a/benchmarks/queries/q12.sql b/testdata/tpch/queries/q12.sql similarity index 100% rename from benchmarks/queries/q12.sql rename to testdata/tpch/queries/q12.sql diff --git a/benchmarks/queries/q13.sql b/testdata/tpch/queries/q13.sql similarity index 100% rename from benchmarks/queries/q13.sql rename to testdata/tpch/queries/q13.sql diff --git a/benchmarks/queries/q14.sql b/testdata/tpch/queries/q14.sql similarity index 100% rename from benchmarks/queries/q14.sql rename to testdata/tpch/queries/q14.sql diff --git a/benchmarks/queries/q15.sql b/testdata/tpch/queries/q15.sql similarity index 100% rename from benchmarks/queries/q15.sql rename to testdata/tpch/queries/q15.sql diff --git a/benchmarks/queries/q16.sql b/testdata/tpch/queries/q16.sql similarity index 100% rename from benchmarks/queries/q16.sql rename to testdata/tpch/queries/q16.sql diff --git a/benchmarks/queries/q17.sql b/testdata/tpch/queries/q17.sql similarity index 100% rename from benchmarks/queries/q17.sql rename to testdata/tpch/queries/q17.sql diff --git a/benchmarks/queries/q18.sql b/testdata/tpch/queries/q18.sql similarity index 95% rename from benchmarks/queries/q18.sql rename to testdata/tpch/queries/q18.sql index ba7ee7f7..835de28a 100644 --- a/benchmarks/queries/q18.sql +++ b/testdata/tpch/queries/q18.sql @@ -29,5 +29,4 @@ group by o_totalprice order by o_totalprice desc, - o_orderdate -limit 100; + o_orderdate; \ No newline at end of file diff --git a/benchmarks/queries/q19.sql b/testdata/tpch/queries/q19.sql similarity index 100% rename from benchmarks/queries/q19.sql rename to testdata/tpch/queries/q19.sql diff --git a/benchmarks/queries/q2.sql b/testdata/tpch/queries/q2.sql similarity index 96% rename from benchmarks/queries/q2.sql rename to testdata/tpch/queries/q2.sql index 68e478f6..f66af210 100644 --- a/benchmarks/queries/q2.sql +++ b/testdata/tpch/queries/q2.sql @@ -40,5 +40,4 @@ order by s_acctbal desc, n_name, s_name, - p_partkey -limit 100; + p_partkey; \ No newline at end of file diff --git a/benchmarks/queries/q20.sql b/testdata/tpch/queries/q20.sql similarity index 100% rename from benchmarks/queries/q20.sql rename to testdata/tpch/queries/q20.sql diff --git a/benchmarks/queries/q21.sql b/testdata/tpch/queries/q21.sql similarity index 97% rename from benchmarks/queries/q21.sql rename to testdata/tpch/queries/q21.sql index b95e7b0d..9d2fe32c 100644 --- a/benchmarks/queries/q21.sql +++ b/testdata/tpch/queries/q21.sql @@ -36,5 +36,4 @@ group by s_name order by numwait desc, - s_name -limit 100; + s_name; \ No newline at end of file diff --git a/benchmarks/queries/q22.sql b/testdata/tpch/queries/q22.sql similarity index 100% rename from benchmarks/queries/q22.sql rename to testdata/tpch/queries/q22.sql diff --git a/benchmarks/queries/q3.sql b/testdata/tpch/queries/q3.sql similarity index 94% rename from benchmarks/queries/q3.sql rename to testdata/tpch/queries/q3.sql index e5fa9e38..7dbc6d9e 100644 --- a/benchmarks/queries/q3.sql +++ b/testdata/tpch/queries/q3.sql @@ -19,5 +19,4 @@ group by o_shippriority order by revenue desc, - o_orderdate -limit 10; + o_orderdate; \ No newline at end of file diff --git a/benchmarks/queries/q4.sql b/testdata/tpch/queries/q4.sql similarity index 100% rename from benchmarks/queries/q4.sql rename to testdata/tpch/queries/q4.sql diff --git a/benchmarks/queries/q5.sql b/testdata/tpch/queries/q5.sql similarity index 100% rename from benchmarks/queries/q5.sql rename to testdata/tpch/queries/q5.sql diff --git a/benchmarks/queries/q6.sql b/testdata/tpch/queries/q6.sql similarity index 100% rename from benchmarks/queries/q6.sql rename to testdata/tpch/queries/q6.sql diff --git a/benchmarks/queries/q7.sql b/testdata/tpch/queries/q7.sql similarity index 100% rename from benchmarks/queries/q7.sql rename to testdata/tpch/queries/q7.sql diff --git a/benchmarks/queries/q8.sql b/testdata/tpch/queries/q8.sql similarity index 100% rename from benchmarks/queries/q8.sql rename to testdata/tpch/queries/q8.sql diff --git a/benchmarks/queries/q9.sql b/testdata/tpch/queries/q9.sql similarity index 100% rename from benchmarks/queries/q9.sql rename to testdata/tpch/queries/q9.sql diff --git a/tests/common.rs b/tests/common.rs new file mode 100644 index 00000000..7a340908 --- /dev/null +++ b/tests/common.rs @@ -0,0 +1,28 @@ +use datafusion_distributed::test_utils::tpch; + +use tokio::sync::OnceCell; + +pub fn get_test_data_dir() -> std::path::PathBuf { + std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/data") +} + +pub fn get_test_queries_dir() -> std::path::PathBuf { + std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpch/queries") +} + +pub fn get_test_tpch_query(num: u8) -> String { + let queries_dir = get_test_queries_dir(); + tpch::tpch_query_from_dir(&queries_dir, num) +} + +// OnceCell to ensure TPCH tables are generated only once for tests +static INIT_TEST_TPCH_TABLES: OnceCell<()> = OnceCell::const_new(); + +// ensure_tpch_data initializes the TPCH data on disk. +pub async fn ensure_tpch_data() { + INIT_TEST_TPCH_TABLES + .get_or_init(|| async { + tpch::generate_tpch_data(&get_test_data_dir()); + }) + .await; +} diff --git a/tests/non_distributed_consistency_test.rs b/tests/non_distributed_consistency_test.rs new file mode 100644 index 00000000..debcde03 --- /dev/null +++ b/tests/non_distributed_consistency_test.rs @@ -0,0 +1,203 @@ +mod common; + +#[cfg(all(feature = "integration", test))] +mod tests { + use crate::common::{ensure_tpch_data, get_test_data_dir, get_test_tpch_query}; + use datafusion::execution::SessionStateBuilder; + use datafusion::physical_plan::execute_stream; + use datafusion::prelude::{SessionConfig, SessionContext}; + use futures::TryStreamExt; + use std::error::Error; + + #[tokio::test] + async fn test_tpch_1() -> Result<(), Box> { + test_tpch_query(1).await + } + + #[tokio::test] + async fn test_tpch_2() -> Result<(), Box> { + test_tpch_query(2).await + } + + #[tokio::test] + async fn test_tpch_3() -> Result<(), Box> { + test_tpch_query(3).await + } + + #[tokio::test] + async fn test_tpch_4() -> Result<(), Box> { + test_tpch_query(4).await + } + + #[tokio::test] + async fn test_tpch_5() -> Result<(), Box> { + test_tpch_query(5).await + } + + #[tokio::test] + async fn test_tpch_6() -> Result<(), Box> { + test_tpch_query(6).await + } + + #[tokio::test] + async fn test_tpch_7() -> Result<(), Box> { + test_tpch_query(7).await + } + + #[tokio::test] + async fn test_tpch_8() -> Result<(), Box> { + test_tpch_query(8).await + } + + #[tokio::test] + async fn test_tpch_9() -> Result<(), Box> { + test_tpch_query(9).await + } + + #[tokio::test] + async fn test_tpch_10() -> Result<(), Box> { + test_tpch_query(10).await + } + + #[tokio::test] + async fn test_tpch_11() -> Result<(), Box> { + test_tpch_query(11).await + } + + #[tokio::test] + async fn test_tpch_12() -> Result<(), Box> { + test_tpch_query(12).await + } + + #[tokio::test] + async fn test_tpch_13() -> Result<(), Box> { + test_tpch_query(13).await + } + + #[tokio::test] + async fn test_tpch_14() -> Result<(), Box> { + test_tpch_query(14).await + } + + #[tokio::test] + #[ignore] + // TODO: Support query 15? + // Skip because it contains DDL statements not supported in single SQL execution + async fn test_tpch_15() -> Result<(), Box> { + test_tpch_query(15).await + } + + #[tokio::test] + async fn test_tpch_16() -> Result<(), Box> { + test_tpch_query(16).await + } + + #[tokio::test] + async fn test_tpch_17() -> Result<(), Box> { + test_tpch_query(17).await + } + + #[tokio::test] + async fn test_tpch_18() -> Result<(), Box> { + test_tpch_query(18).await + } + + #[tokio::test] + async fn test_tpch_19() -> Result<(), Box> { + test_tpch_query(19).await + } + + #[tokio::test] + async fn test_tpch_20() -> Result<(), Box> { + test_tpch_query(20).await + } + + #[tokio::test] + async fn test_tpch_21() -> Result<(), Box> { + test_tpch_query(21).await + } + + #[tokio::test] + async fn test_tpch_22() -> Result<(), Box> { + test_tpch_query(22).await + } + + // test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner + // and once in a non-distributed manner. For each query, it asserts that the results are identical. + async fn test_tpch_query(query_id: u8) -> Result<(), Box> { + ensure_tpch_data().await; + + let sql = get_test_tpch_query(query_id); + + // Context 1: Non-distributed execution. + let config1 = SessionConfig::new(); + let state1 = SessionStateBuilder::new() + .with_default_features() + .with_config(config1) + .build(); + let ctx1 = SessionContext::new_with_state(state1); + + // Register tables for first context + for table_name in [ + "lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier", + ] { + let query_path = get_test_data_dir().join(format!("{}.parquet", table_name)); + ctx1.register_parquet( + table_name, + query_path.to_string_lossy().as_ref(), + datafusion::prelude::ParquetReadOptions::default(), + ) + .await?; + } + + let df1 = ctx1.sql(&sql).await?; + let physical1 = df1.create_physical_plan().await?; + + let batches1 = execute_stream(physical1.clone(), ctx1.task_ctx())? + .try_collect::>() + .await?; + + // Context 2: Distributed execution. + // TODO: once distributed execution is working, we can enable distributed features here. + let config2 = SessionConfig::new(); + // .with_target_partitions(3); + let state2 = SessionStateBuilder::new() + .with_default_features() + .with_config(config2) + // .with_optimizer_rule(DistributedPhysicalOptimizerRule::default().with_maximum_partitions_per_task(4)) + .build(); + let ctx2 = SessionContext::new_with_state(state2); + + // Register tables for second context + for table_name in [ + "lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier", + ] { + let query_path = get_test_data_dir().join(format!("{}.parquet", table_name)); + ctx2.register_parquet( + table_name, + query_path.to_string_lossy().as_ref(), + datafusion::prelude::ParquetReadOptions::default(), + ) + .await?; + } + + let df2 = ctx2.sql(&sql).await?; + let physical2 = df2.create_physical_plan().await?; + + let batches2 = execute_stream(physical2.clone(), ctx2.task_ctx())? + .try_collect::>() + .await?; + + let formatted1 = arrow::util::pretty::pretty_format_batches(&batches1)?; + let formatted2 = arrow::util::pretty::pretty_format_batches(&batches2)?; + + assert_eq!( + formatted1.to_string(), + formatted2.to_string(), + "Query {} results differ between executions", + query_id + ); + + Ok(()) + } +} diff --git a/tests/stage_planning.rs b/tests/stage_planning.rs index 08b3572e..ffacf771 100644 --- a/tests/stage_planning.rs +++ b/tests/stage_planning.rs @@ -1,11 +1,14 @@ +mod common; + #[cfg(all(feature = "integration", test))] mod tests { + use crate::common::get_test_queries_dir; use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::{displayable, execute_stream}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_distributed::assert_snapshot; - use datafusion_distributed::test_utils::tpch::tpch_query; + use datafusion_distributed::test_utils::tpch::tpch_query_from_dir; use datafusion_distributed::DistributedPhysicalOptimizerRule; use datafusion_distributed::{display_stage_graphviz, ExecutionStage}; use futures::TryStreamExt; @@ -40,7 +43,8 @@ mod tests { .await?; } - let sql = tpch_query(2); + let queries_dir = get_test_queries_dir(); + let sql = tpch_query_from_dir(&queries_dir, 2); //let sql = "select 1;"; println!("SQL Query:\n{}", sql);