Skip to content

Commit 16588b8

Browse files
committed
Add planning tests
1 parent 6851fb4 commit 16588b8

File tree

4 files changed

+164
-0
lines changed

4 files changed

+164
-0
lines changed

Cargo.lock

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ kube = { version = "1.1", features = ["derive", "runtime"] }
5555
log = "0.4"
5656
rand = "0.8"
5757
uuid = { version = "1.6", features = ["v4"] }
58+
serde_json = "1.0"
59+
insta = { version = "1.43.1"}
5860

5961
object_store = { version = "0.12.0", features = [
6062
"aws",

src/query_planner.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use datafusion::{
1010

1111
use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
1212
use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan};
13+
use insta::assert_snapshot;
1314
use tokio_stream::StreamExt;
1415

1516
use crate::{
@@ -286,3 +287,127 @@ impl QueryPlanner {
286287
Ok(())
287288
}
288289
}
290+
291+
pub mod tests {
292+
use super::*;
293+
use arrow::datatypes::{DataType, Field, Schema};
294+
use datafusion::physical_plan::displayable;
295+
use std::io::BufReader;
296+
use std::{fs::File, path::Path};
297+
298+
#[tokio::test]
299+
async fn prepare_substrait_select_one() -> anyhow::Result<()> {
300+
// Load Substrait and parse to protobuf `Plan`.
301+
let file = File::open(Path::new("testdata/substrait/select_one.substrait.json"))?;
302+
let reader = BufReader::new(file);
303+
let plan: Plan = serde_json::from_reader(reader)?;
304+
305+
let planner = QueryPlanner::default();
306+
let qp = planner.prepare_substrait(plan).await?;
307+
308+
// Distributed plan schema must match logical schema.
309+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
310+
"test_col",
311+
DataType::Int64,
312+
false,
313+
)]));
314+
assert_eq!(qp.distributed_plan.schema(), expected_schema);
315+
316+
// Check the distributed physical plan.
317+
let distributed_plan_str =
318+
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
319+
assert_snapshot!(distributed_plan_str, @r"
320+
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
321+
ProjectionExec: expr=[1 as test_col]
322+
DataSourceExec: partitions=1, partition_sizes=[1]
323+
");
324+
325+
Ok(())
326+
}
327+
328+
#[tokio::test]
329+
async fn prepare_sql_select_one() -> Result<()> {
330+
let planner = QueryPlanner::default();
331+
let sql = "SELECT 1 AS test_col";
332+
333+
let qp = planner.prepare(sql).await?;
334+
335+
// Distributed plan schema must match logical schema.
336+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
337+
"test_col",
338+
DataType::Int64,
339+
false,
340+
)]));
341+
assert_eq!(qp.distributed_plan.schema(), expected_schema);
342+
343+
// Check the distributed physical plan.
344+
let distributed_plan_str =
345+
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
346+
assert_snapshot!(distributed_plan_str, @r"
347+
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
348+
ProjectionExec: expr=[1 as test_col]
349+
PlaceholderRowExec
350+
");
351+
352+
Ok(())
353+
}
354+
355+
#[tokio::test]
356+
async fn prepare_describe_table() -> Result<()> {
357+
std::env::set_var(
358+
"DD_TABLES",
359+
"people:parquet:testdata/parquet/people.parquet",
360+
);
361+
362+
let planner = QueryPlanner::default();
363+
let sql = "DESCRIBE people";
364+
365+
let qp = planner.prepare(sql).await?;
366+
367+
// Check the distributed physical plan.
368+
let distributed_plan_str =
369+
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
370+
assert_snapshot!(distributed_plan_str, @r"
371+
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
372+
RecordBatchExec
373+
");
374+
375+
Ok(())
376+
}
377+
378+
#[tokio::test]
379+
async fn two_stages_query() -> Result<()> {
380+
std::env::set_var(
381+
"DD_TABLES",
382+
"people:parquet:testdata/parquet/people.parquet",
383+
);
384+
385+
let planner = QueryPlanner::default();
386+
let sql = "SELECT * FROM (SELECT 1 as id) a CROSS JOIN (SELECT 2 as id) b order by b.id";
387+
let qp = planner.prepare(sql).await?;
388+
389+
// Distributed plan schema must match logical schema.
390+
let expected_schema = Arc::new(Schema::new(vec![
391+
Field::new("id", DataType::Int64, false),
392+
Field::new("id", DataType::Int64, false),
393+
]));
394+
395+
assert_eq!(qp.distributed_plan.schema(), expected_schema);
396+
397+
// Check the distributed physical plan.
398+
let distributed_plan_str =
399+
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
400+
assert_snapshot!(distributed_plan_str, @r"
401+
DDStageExec[1] (output_partitioning=UnknownPartitioning(1))
402+
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
403+
SortExec: expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false]
404+
CrossJoinExec
405+
ProjectionExec: expr=[1 as id]
406+
PlaceholderRowExec
407+
ProjectionExec: expr=[2 as id]
408+
PlaceholderRowExec
409+
");
410+
411+
Ok(())
412+
}
413+
}

testdata/parquet/people.parquet

1 KB
Binary file not shown.

0 commit comments

Comments
 (0)