From 16588b81ccc4812d72c6d2c389841017b90abcda Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Mon, 21 Jul 2025 14:23:34 +0200 Subject: [PATCH] Add planning tests --- Cargo.lock | 37 ++++++++++ Cargo.toml | 2 + src/query_planner.rs | 125 ++++++++++++++++++++++++++++++++ testdata/parquet/people.parquet | Bin 0 -> 1024 bytes 4 files changed, 164 insertions(+) create mode 100644 testdata/parquet/people.parquet diff --git a/Cargo.lock b/Cargo.lock index e3f8d34..ba7edfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,6 +869,18 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "windows-sys 0.59.0", +] + [[package]] name = "const-random" version = "0.1.18" @@ -1733,6 +1745,7 @@ dependencies = [ "datafusion-substrait", "env_logger", "futures", + "insta", "itertools 0.14.0", "k8s-openapi", "kube", @@ -1744,6 +1757,7 @@ dependencies = [ "rand 0.8.5", "rustc_version", "rustls", + "serde_json", "tempfile", "test-log", "thiserror 1.0.69", @@ -1788,6 +1802,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "enum-ordinalize" version = "4.3.0" @@ -2502,6 +2522,17 @@ dependencies = [ "hashbrown 0.15.4", ] +[[package]] +name = "insta" +version = "1.43.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "154934ea70c58054b556dd430b99a98c2a7ff5309ac9891597e339b5c28f4371" +dependencies = [ + "console", + "once_cell", + "similar", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -4251,6 +4282,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "siphasher" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 132d092..3309194 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,8 @@ kube = { version = "1.1", features = ["derive", "runtime"] } log = "0.4" rand = "0.8" uuid = { version = "1.6", features = ["v4"] } +serde_json = "1.0" +insta = { version = "1.43.1"} object_store = { version = "0.12.0", features = [ "aws", diff --git a/src/query_planner.rs b/src/query_planner.rs index 2357ae8..2f835e3 100644 --- a/src/query_planner.rs +++ b/src/query_planner.rs @@ -10,6 +10,7 @@ use datafusion::{ use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec}; use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan}; +use insta::assert_snapshot; use tokio_stream::StreamExt; use crate::{ @@ -286,3 +287,127 @@ impl QueryPlanner { Ok(()) } } + +pub mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::displayable; + use std::io::BufReader; + use std::{fs::File, path::Path}; + + #[tokio::test] + async fn prepare_substrait_select_one() -> anyhow::Result<()> { + // Load Substrait and parse to protobuf `Plan`. + let file = File::open(Path::new("testdata/substrait/select_one.substrait.json"))?; + let reader = BufReader::new(file); + let plan: Plan = serde_json::from_reader(reader)?; + + let planner = QueryPlanner::default(); + let qp = planner.prepare_substrait(plan).await?; + + // Distributed plan schema must match logical schema. + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "test_col", + DataType::Int64, + false, + )])); + assert_eq!(qp.distributed_plan.schema(), expected_schema); + + // Check the distributed physical plan. + let distributed_plan_str = + format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true)); + assert_snapshot!(distributed_plan_str, @r" + DDStageExec[0] (output_partitioning=UnknownPartitioning(1)) + ProjectionExec: expr=[1 as test_col] + DataSourceExec: partitions=1, partition_sizes=[1] + "); + + Ok(()) + } + + #[tokio::test] + async fn prepare_sql_select_one() -> Result<()> { + let planner = QueryPlanner::default(); + let sql = "SELECT 1 AS test_col"; + + let qp = planner.prepare(sql).await?; + + // Distributed plan schema must match logical schema. + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "test_col", + DataType::Int64, + false, + )])); + assert_eq!(qp.distributed_plan.schema(), expected_schema); + + // Check the distributed physical plan. + let distributed_plan_str = + format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true)); + assert_snapshot!(distributed_plan_str, @r" + DDStageExec[0] (output_partitioning=UnknownPartitioning(1)) + ProjectionExec: expr=[1 as test_col] + PlaceholderRowExec + "); + + Ok(()) + } + + #[tokio::test] + async fn prepare_describe_table() -> Result<()> { + std::env::set_var( + "DD_TABLES", + "people:parquet:testdata/parquet/people.parquet", + ); + + let planner = QueryPlanner::default(); + let sql = "DESCRIBE people"; + + let qp = planner.prepare(sql).await?; + + // Check the distributed physical plan. + let distributed_plan_str = + format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true)); + assert_snapshot!(distributed_plan_str, @r" + DDStageExec[0] (output_partitioning=UnknownPartitioning(1)) + RecordBatchExec + "); + + Ok(()) + } + + #[tokio::test] + async fn two_stages_query() -> Result<()> { + std::env::set_var( + "DD_TABLES", + "people:parquet:testdata/parquet/people.parquet", + ); + + let planner = QueryPlanner::default(); + let sql = "SELECT * FROM (SELECT 1 as id) a CROSS JOIN (SELECT 2 as id) b order by b.id"; + let qp = planner.prepare(sql).await?; + + // Distributed plan schema must match logical schema. + let expected_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("id", DataType::Int64, false), + ])); + + assert_eq!(qp.distributed_plan.schema(), expected_schema); + + // Check the distributed physical plan. + let distributed_plan_str = + format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true)); + assert_snapshot!(distributed_plan_str, @r" + DDStageExec[1] (output_partitioning=UnknownPartitioning(1)) + DDStageExec[0] (output_partitioning=UnknownPartitioning(1)) + SortExec: expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false] + CrossJoinExec + ProjectionExec: expr=[1 as id] + PlaceholderRowExec + ProjectionExec: expr=[2 as id] + PlaceholderRowExec + "); + + Ok(()) + } +} diff --git a/testdata/parquet/people.parquet b/testdata/parquet/people.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b781c218a61395b97a8c95562c4eb1ed3803d2af GIT binary patch literal 1024 zcmZuw!EVz)5FI;PD~B8s#H{SfLb=rHAp{B(sa1=#OO2?_Ps58wkha_G09o;WkJwv!;bvUX5XGp;aTc%!&IHmId>+Xc1efP`Obs|JArhATK8kaEnGUud$1H)tpuhYq_5yX|%(n zx*x?Oi*i3252EpHig}f7r8T*pQ!$;?EeEoWrX$H5!_(%J=8op*Oo_-`JR@D%M4v@u z85~z$8~Bz^oZguDMiVYjt-o6e;y6kUChvyfNq`n$X$fr64o<=)@Ljrtf`EcL3^S^6 zboM!;q;Onk!vhyf&MH`C^3N9$q>$$ZJzmo*{{KaMG6nLtz(x71fOTL2kqK}kegDFc z7lwCjcwY^7_U^NRNLZduWQOug<@o|i`KN%-wDQy%8`sj8hP*Vq8`HZo+1zw?b05Z+ z^y>2a0=n{tz;%#63sQJR3U_&mc1iVmd;RrvYW9`+fY(}A(b5~mmr5n7aj04hQB-P+ zXG_F=FW*eNOaF+fFmjRO2=CT6(*EI@ld)=1YgbPux`L zO@l*ypZX=-*H5Y36jh|owRE>XIOZbitI=knwpoLG>+^FhIP;@Pjd5K!dsQyo+S~81 u9H-RgCfLqt5WoKrp52cp)sJC3If};B)yMTm^@q*|%{+bJn+edNfB!GIWV}59 literal 0 HcmV?d00001