@@ -10,6 +10,7 @@ use datafusion::{
1010
1111use datafusion_proto:: physical_plan:: { DefaultPhysicalExtensionCodec , PhysicalExtensionCodec } ;
1212use datafusion_substrait:: { logical_plan:: consumer:: from_substrait_plan, substrait:: proto:: Plan } ;
13+ use insta:: assert_snapshot;
1314use tokio_stream:: StreamExt ;
1415
1516use crate :: {
@@ -286,3 +287,127 @@ impl QueryPlanner {
286287 Ok ( ( ) )
287288 }
288289}
290+
291+ pub mod tests {
292+ use super :: * ;
293+ use arrow:: datatypes:: { DataType , Field , Schema } ;
294+ use datafusion:: physical_plan:: displayable;
295+ use std:: io:: BufReader ;
296+ use std:: { fs:: File , path:: Path } ;
297+
298+ #[ tokio:: test]
299+ async fn prepare_substrait_select_one ( ) -> anyhow:: Result < ( ) > {
300+ // Load Substrait and parse to protobuf `Plan`.
301+ let file = File :: open ( Path :: new ( "testdata/substrait/select_one.substrait.json" ) ) ?;
302+ let reader = BufReader :: new ( file) ;
303+ let plan: Plan = serde_json:: from_reader ( reader) ?;
304+
305+ let planner = QueryPlanner :: default ( ) ;
306+ let qp = planner. prepare_substrait ( plan) . await ?;
307+
308+ // Distributed plan schema must match logical schema.
309+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
310+ "test_col" ,
311+ DataType :: Int64 ,
312+ false ,
313+ ) ] ) ) ;
314+ assert_eq ! ( qp. distributed_plan. schema( ) , expected_schema) ;
315+
316+ // Check the distributed physical plan.
317+ let distributed_plan_str =
318+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
319+ assert_snapshot ! ( distributed_plan_str, @r"
320+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
321+ ProjectionExec: expr=[1 as test_col]
322+ DataSourceExec: partitions=1, partition_sizes=[1]
323+ " ) ;
324+
325+ Ok ( ( ) )
326+ }
327+
328+ #[ tokio:: test]
329+ async fn prepare_sql_select_one ( ) -> Result < ( ) > {
330+ let planner = QueryPlanner :: default ( ) ;
331+ let sql = "SELECT 1 AS test_col" ;
332+
333+ let qp = planner. prepare ( sql) . await ?;
334+
335+ // Distributed plan schema must match logical schema.
336+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
337+ "test_col" ,
338+ DataType :: Int64 ,
339+ false ,
340+ ) ] ) ) ;
341+ assert_eq ! ( qp. distributed_plan. schema( ) , expected_schema) ;
342+
343+ // Check the distributed physical plan.
344+ let distributed_plan_str =
345+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
346+ assert_snapshot ! ( distributed_plan_str, @r"
347+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
348+ ProjectionExec: expr=[1 as test_col]
349+ PlaceholderRowExec
350+ " ) ;
351+
352+ Ok ( ( ) )
353+ }
354+
355+ #[ tokio:: test]
356+ async fn prepare_describe_table ( ) -> Result < ( ) > {
357+ std:: env:: set_var (
358+ "DD_TABLES" ,
359+ "people:parquet:testdata/parquet/people.parquet" ,
360+ ) ;
361+
362+ let planner = QueryPlanner :: default ( ) ;
363+ let sql = "DESCRIBE people" ;
364+
365+ let qp = planner. prepare ( sql) . await ?;
366+
367+ // Check the distributed physical plan.
368+ let distributed_plan_str =
369+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
370+ assert_snapshot ! ( distributed_plan_str, @r"
371+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
372+ RecordBatchExec
373+ " ) ;
374+
375+ Ok ( ( ) )
376+ }
377+
378+ #[ tokio:: test]
379+ async fn two_stages_query ( ) -> Result < ( ) > {
380+ std:: env:: set_var (
381+ "DD_TABLES" ,
382+ "people:parquet:testdata/parquet/people.parquet" ,
383+ ) ;
384+
385+ let planner = QueryPlanner :: default ( ) ;
386+ let sql = "SELECT * FROM (SELECT 1 as id) a CROSS JOIN (SELECT 2 as id) b order by b.id" ;
387+ let qp = planner. prepare ( sql) . await ?;
388+
389+ // Distributed plan schema must match logical schema.
390+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [
391+ Field :: new( "id" , DataType :: Int64 , false ) ,
392+ Field :: new( "id" , DataType :: Int64 , false ) ,
393+ ] ) ) ;
394+
395+ assert_eq ! ( qp. distributed_plan. schema( ) , expected_schema) ;
396+
397+ // Check the distributed physical plan.
398+ let distributed_plan_str =
399+ format ! ( "{}" , displayable( qp. distributed_plan. as_ref( ) ) . indent( true ) ) ;
400+ assert_snapshot ! ( distributed_plan_str, @r"
401+ DDStageExec[1] (output_partitioning=UnknownPartitioning(1))
402+ DDStageExec[0] (output_partitioning=UnknownPartitioning(1))
403+ SortExec: expr=[id@1 ASC NULLS LAST], preserve_partitioning=[false]
404+ CrossJoinExec
405+ ProjectionExec: expr=[1 as id]
406+ PlaceholderRowExec
407+ ProjectionExec: expr=[2 as id]
408+ PlaceholderRowExec
409+ " ) ;
410+
411+ Ok ( ( ) )
412+ }
413+ }
0 commit comments