Skip to content

Commit a9b0624

Browse files
feat: Support distributed plan in EXPLAIN command (#1309)
* support distributed plan in explan * fix test + lint warning * add const table explain test * remove flakey test
1 parent a60086a commit a9b0624

File tree

5 files changed

+284
-52
lines changed

5 files changed

+284
-52
lines changed

ballista/client/tests/context_checks.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,4 +979,63 @@ mod supported {
979979

980980
Ok(())
981981
}
982+
983+
#[rstest]
984+
#[case::standalone(standalone_context())]
985+
#[case::remote(remote_context())]
986+
#[tokio::test]
987+
async fn should_execute_explain_query_correctly(
988+
#[future(awt)]
989+
#[case]
990+
ctx: SessionContext,
991+
) {
992+
let result = ctx
993+
.sql("EXPLAIN select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
994+
.await
995+
.unwrap()
996+
.collect()
997+
.await
998+
.unwrap();
999+
1000+
let expected: Vec<&str> = vec![
1001+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
1002+
"| plan_type | plan |",
1003+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
1004+
"| logical_plan | Projection: count(Int64(1)) AS count(*), id |",
1005+
"| | Aggregate: groupBy=[[id]], aggr=[[count(Int64(1))]] |",
1006+
"| | Projection: __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1) AS UNNEST(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5))) AS id |",
1007+
"| | Unnest: lists[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))|depth=1] structs[] |",
1008+
"| | Projection: List([1, 2, 3, 4, 5]) AS __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5))) |",
1009+
"| | EmptyRelation: rows=1 |",
1010+
"| physical_plan | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
1011+
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1012+
"| | CoalesceBatchesExec: target_batch_size=8192 |",
1013+
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
1014+
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1015+
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
1016+
"| | UnnestExec |",
1017+
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
1018+
"| | PlaceholderRowExec |",
1019+
"| | |",
1020+
"| distributed_plan | =========ResolvedStage[stage_id=1.0, partitions=1]========= |",
1021+
"| | ShuffleWriterExec: partitioning:Some(Hash([Column { name: \"id\", index: 0 }], 16)) |",
1022+
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1023+
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
1024+
"| | UnnestExec |",
1025+
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
1026+
"| | PlaceholderRowExec |",
1027+
"| | |",
1028+
"| | =========UnResolvedStage[stage_id=2.0, children=1]========= |",
1029+
"| | Inputs{1: StageOutput { partition_locations: {}, complete: false }} |",
1030+
"| | ShuffleWriterExec: partitioning:None |",
1031+
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
1032+
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1033+
"| | CoalesceBatchesExec: target_batch_size=8192 |",
1034+
"| | UnresolvedShuffleExec: partitioning=Hash([Column { name: \"id\", index: 0 }], 16) |",
1035+
"| | |",
1036+
"| | |",
1037+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
1038+
];
1039+
assert_batches_eq!(expected, &result);
1040+
}
9821041
}

ballista/client/tests/context_unsupported.rs

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -33,56 +33,6 @@ mod unsupported {
3333
crate::common::example_test_data()
3434
}
3535

36-
#[rstest]
37-
#[case::standalone(standalone_context())]
38-
#[case::remote(remote_context())]
39-
#[tokio::test]
40-
#[should_panic]
41-
async fn should_execute_explain_query_correctly(
42-
#[future(awt)]
43-
#[case]
44-
ctx: SessionContext,
45-
test_data: String,
46-
) {
47-
ctx.register_parquet(
48-
"test",
49-
&format!("{test_data}/alltypes_plain.parquet"),
50-
Default::default(),
51-
)
52-
.await
53-
.unwrap();
54-
55-
let result = ctx
56-
.sql("EXPLAIN select count(*), id from test where id > 4 group by id")
57-
.await
58-
.unwrap()
59-
.collect()
60-
.await
61-
.unwrap();
62-
63-
let expected = vec![
64-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
65-
"| plan_type | plan |",
66-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
67-
"| logical_plan | Projection: count(*), test.id |",
68-
"| | Aggregate: groupBy=[[test.id]], aggr=[[count(Int64(1)) AS count(*)]] |",
69-
"| | Filter: test.id > Int32(4) |",
70-
"| | TableScan: test projection=[id], partial_filters=[test.id > Int32(4)] |",
71-
"| physical_plan | ProjectionExec: expr=[count(*)@1 as count(*), id@0 as id] |",
72-
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(*)] |",
73-
"| | CoalesceBatchesExec: target_batch_size=8192 |",
74-
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
75-
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(*)] |",
76-
"| | CoalesceBatchesExec: target_batch_size=8192 |",
77-
"| | FilterExec: id@0 > 4 |",
78-
"| | ParquetExec: file_groups={1 group: [[Users/ballista/git/datafusion-ballista/ballista/client/testdata/alltypes_plain.parquet]]}, projection=[id], predicate=id@0 > 4, pruning_predicate=CASE WHEN id_null_count@1 = id_row_count@2 THEN false ELSE id_max@0 > 4 END, required_guarantees=[] |",
79-
"| | |",
80-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
81-
];
82-
83-
assert_batches_eq!(expected, &result);
84-
}
85-
8636
#[rstest]
8737
#[case::standalone(standalone_context())]
8838
#[case::remote(remote_context())]

0 commit comments

Comments
 (0)