Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ kube = { version = "1.1", features = ["derive", "runtime"] }
log = "0.4"
rand = "0.8"
uuid = { version = "1.6", features = ["v4"] }
serde_json = "1.0"
insta = { version = "1.43.1"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


object_store = { version = "0.12.0", features = [
"aws",
Expand Down
125 changes: 125 additions & 0 deletions src/query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::{

use datafusion_proto::physical_plan::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
use datafusion_substrait::{logical_plan::consumer::from_substrait_plan, substrait::proto::Plan};
use insta::assert_snapshot;
use tokio_stream::StreamExt;

use crate::{
Expand Down Expand Up @@ -286,3 +287,127 @@ impl QueryPlanner {
Ok(())
}
}

pub mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::displayable;
use std::io::BufReader;
use std::{fs::File, path::Path};

#[tokio::test]
async fn prepare_substrait_select_one() -> anyhow::Result<()> {
// Load Substrait and parse to protobuf `Plan`.
let file = File::open(Path::new("testdata/substrait/select_one.substrait.json"))?;
let reader = BufReader::new(file);
let plan: Plan = serde_json::from_reader(reader)?;

let planner = QueryPlanner::default();
let qp = planner.prepare_substrait(plan).await?;

// Distributed plan schema must match logical schema.
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"test_col",
DataType::Int64,
false,
)]));
assert_eq!(qp.distributed_plan.schema(), expected_schema);

// Check the distributed physical plan.
let distributed_plan_str =
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
assert_snapshot!(distributed_plan_str, @r"
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
ProjectionExec: expr=[1 as test_col]
DataSourceExec: partitions=1, partition_sizes=[1]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that even though we set datafusion.execution.target_partitions = 3 in make_state(), the plan still shows partitions = 1 in all cases. If this changes, we could force 1 partition explicitly for tests in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because there are not enough data for the plan to make make 3 partitions? This is actually anther good case for us to test. I think we can either create a mock test and data to enforce the number of partitions or use larger data set. I think the mock test is usually the choice to avoid adding more data

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we will start having a lot of things to do soon If you cannot work on this test next, it is good to create a ticket to remind us to work on it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on looking into integration tests with mocking after merging the planning tests, unless there was something more urgent to do. I'll create a ticket to track it still

");

Ok(())
}

#[tokio::test]
async fn prepare_sql_select_one() -> Result<()> {
let planner = QueryPlanner::default();
let sql = "SELECT 1 AS test_col";

let qp = planner.prepare(sql).await?;

// Distributed plan schema must match logical schema.
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"test_col",
DataType::Int64,
false,
)]));
assert_eq!(qp.distributed_plan.schema(), expected_schema);

// Check the distributed physical plan.
let distributed_plan_str =
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
assert_snapshot!(distributed_plan_str, @r"
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
ProjectionExec: expr=[1 as test_col]
PlaceholderRowExec
");

Ok(())
}

#[tokio::test]
async fn prepare_describe_table() -> Result<()> {
std::env::set_var(
"DD_TABLES",
"people:parquet:testdata/parquet/people.parquet",
);

let planner = QueryPlanner::default();
let sql = "DESCRIBE people";

let qp = planner.prepare(sql).await?;

// Check the distributed physical plan.
let distributed_plan_str =
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
assert_snapshot!(distributed_plan_str, @r"
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
RecordBatchExec
");

Ok(())
}

#[tokio::test]
async fn two_stages_query() -> Result<()> {
std::env::set_var(
"DD_TABLES",
"people:parquet:testdata/parquet/people.parquet",
);

let planner = QueryPlanner::default();
let sql = "SELECT * FROM (SELECT 1 as id) a CROSS JOIN (SELECT 2 as id) b order by b.id";
let qp = planner.prepare(sql).await?;

// Distributed plan schema must match logical schema.
let expected_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("id", DataType::Int64, false),
]));

assert_eq!(qp.distributed_plan.schema(), expected_schema);

// Check the distributed physical plan.
let distributed_plan_str =
format!("{}", displayable(qp.distributed_plan.as_ref()).indent(true));
assert_snapshot!(distributed_plan_str, @r"
DDStageExec[1] (output_partitioning=UnknownPartitioning(1))
DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
SortExec: expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false]
CrossJoinExec
ProjectionExec: expr=[1 as id]
PlaceholderRowExec
ProjectionExec: expr=[2 as id]
PlaceholderRowExec
");

Ok(())
}
}
Binary file added testdata/parquet/people.parquet
Binary file not shown.
Loading