@@ -10,6 +10,7 @@ use datafusion::{
1010
1111use datafusion_proto:: physical_plan:: { DefaultPhysicalExtensionCodec , PhysicalExtensionCodec } ;
1212use datafusion_substrait:: { logical_plan:: consumer:: from_substrait_plan, substrait:: proto:: Plan } ;
13+ use insta:: assert_snapshot;
1314use tokio_stream:: StreamExt ;
1415
1516use crate :: {
@@ -286,3 +287,119 @@ impl QueryPlanner {
286287 Ok ( ( ) )
287288 }
288289}
290+
291+ pub mod tests {
292+ use super :: * ;
293+ use arrow:: datatypes:: { DataType , Field , Schema } ;
294+ use datafusion:: physical_plan:: displayable;
295+ use std:: io:: BufReader ;
296+ use std:: { fs:: File , path:: Path } ;
297+
298+ #[ tokio:: test]
299+ async fn prepare_substrait_select_one ( ) -> anyhow:: Result < ( ) > {
300+ // Load Substrait and parse to protobuf `Plan`.
301+ let file = File :: open ( Path :: new ( "testdata/substrait/select_one.substrait.json" ) ) ?;
302+ let reader = BufReader :: new ( file) ;
303+ let plan: Plan = serde_json:: from_reader ( reader) ?;
304+
305+ let planner = QueryPlanner :: default ( ) ;
306+ let qp = planner. prepare_substrait ( plan) . await ?;
307+
308+ // Distributed plan schema must match logical schema.
309+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
310+ "test_col" ,
311+ DataType :: Int64 ,
312+ false ,
313+ ) ] ) ) ;
314+ assert_eq ! ( qp. distributed_plan. schema( ) , expected_schema) ;
315+
316+ // Check the distributed physical plan.
317+ let distributed_plan_str =
318+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
319+ assert_snapshot ! ( distributed_plan_str, @r"
320+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
321+ ProjectionExec: expr=[1 as test_col]
322+ DataSourceExec: partitions=1, partition_sizes=[1]
323+ " ) ;
324+
325+ Ok ( ( ) )
326+ }
327+
328+ #[ tokio:: test]
329+ async fn prepare_sql_select_one ( ) -> Result < ( ) > {
330+ let planner = QueryPlanner :: default ( ) ;
331+ let sql = "SELECT 1 AS test_col" ;
332+
333+ let qp = planner. prepare ( sql) . await ?;
334+
335+ // Distributed plan schema must match logical schema.
336+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
337+ "test_col" ,
338+ DataType :: Int64 ,
339+ false ,
340+ ) ] ) ) ;
341+ assert_eq ! ( qp. distributed_plan. schema( ) , expected_schema) ;
342+
343+ // Check the distributed physical plan.
344+ let distributed_plan_str =
345+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
346+ assert_snapshot ! ( distributed_plan_str, @r"
347+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
348+ ProjectionExec: expr=[1 as test_col]
349+ PlaceholderRowExec
350+ " ) ;
351+
352+ Ok ( ( ) )
353+ }
354+
355+ #[ tokio:: test]
356+ async fn prepare_describe_table ( ) -> Result < ( ) > {
357+ std:: env:: set_var (
358+ "DD_TABLES" ,
359+ "people:parquet:testdata/parquet/people.parquet" ,
360+ ) ;
361+
362+ let planner = QueryPlanner :: default ( ) ;
363+ let sql = "DESCRIBE people" ;
364+
365+ let qp = planner. prepare ( sql) . await ?;
366+
367+ // Check the distributed physical plan.
368+ let distributed_plan_str =
369+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
370+ assert_snapshot ! ( distributed_plan_str, @r"
371+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
372+ RecordBatchExec
373+ " ) ;
374+
375+ Ok ( ( ) )
376+ }
377+
378+ #[ tokio:: test]
379+ async fn test_two_stages_query ( ) -> Result < ( ) > {
380+ std:: env:: set_var (
381+ "DD_TABLES" ,
382+ "people:parquet:testdata/parquet/people.parquet" ,
383+ ) ;
384+
385+ let planner = QueryPlanner :: default ( ) ;
386+ let sql = "SELECT * FROM (SELECT 1 as id) a CROSS JOIN (SELECT 2 as id) b order by b.id" ;
387+ let qp = planner. prepare ( sql) . await ?;
388+
389+ // Check the distributed physical plan.
390+ let distributed_plan_str =
391+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
392+ assert_snapshot ! ( distributed_plan_str, @r"
393+ DDStageExec[1] (output_partitioning=UnknownPartitioning(1))
394+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
395+ SortExec: expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false]
396+ CrossJoinExec
397+ ProjectionExec: expr=[1 as id]
398+ PlaceholderRowExec
399+ ProjectionExec: expr=[2 as id]
400+ PlaceholderRowExec
401+ " ) ;
402+
403+ Ok ( ( ) )
404+ }
405+ }
0 commit comments