Skip to content

Commit ef72e11

Browse files
committed
update expected plans
1 parent d2321b2 commit ef72e11

File tree

5 files changed

+431
-7
lines changed

5 files changed

+431
-7
lines changed

src/planner.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ mod test {
276276
do_test(6).await
277277
}
278278

279-
#[ignore = "non-deterministic IN clause"]
280279
#[tokio::test]
281280
async fn test_q7() -> TestResult<()> {
282281
do_test(7).await
@@ -302,7 +301,6 @@ mod test {
302301
do_test(11).await
303302
}
304303

305-
#[ignore = "non-deterministic IN clause"]
306304
#[tokio::test]
307305
async fn test_q12() -> TestResult<()> {
308306
do_test(12).await
@@ -324,10 +322,6 @@ mod test {
324322
do_test(15).await
325323
}
326324

327-
// This test is ignored because there is some non-determinism
328-
// in a part of the plan, see
329-
// https://github.com/edmondop/datafusion-ray/actions/runs/11180062292/job/31080996808"
330-
#[ignore = "non-deterministic IN clause"]
331325
#[tokio::test]
332326
async fn test_q16() -> TestResult<()> {
333327
do_test(16).await
@@ -343,7 +337,6 @@ mod test {
343337
do_test(18).await
344338
}
345339

346-
#[ignore = "non-deterministic IN clause"]
347340
#[tokio::test]
348341
async fn test_q19() -> TestResult<()> {
349342
do_test(19).await

testdata/expected-plans/q12.txt

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
DataFusion Logical Plan
2+
=======================
3+
4+
Sort: lineitem.l_shipmode ASC NULLS LAST
5+
Projection: lineitem.l_shipmode, sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
6+
Aggregate: groupBy=[[lineitem.l_shipmode]], aggr=[[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
7+
Projection: orders.o_orderpriority, lineitem.l_shipmode
8+
Inner Join: orders.o_orderkey = lineitem.l_orderkey
9+
TableScan: orders projection=[o_orderkey, o_orderpriority]
10+
Projection: lineitem.l_orderkey, lineitem.l_shipmode
11+
Filter: (lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("1995-01-01") AND lineitem.l_receiptdate < Date32("1996-01-01")
12+
TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8("FOB") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1995-01-01"), lineitem.l_receiptdate < Date32("1996-01-01")]
13+
14+
DataFusion Physical Plan
15+
========================
16+
17+
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
18+
SortExec: expr=[l_shipmode@0 ASC NULLS LAST], preserve_partitioning=[true]
19+
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
20+
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
21+
CoalesceBatchesExec: target_batch_size=8192
22+
RepartitionExec: partitioning=Hash([l_shipmode@0], 2), input_partitions=2
23+
AggregateExec: mode=Partial, gby=[l_shipmode@1 as l_shipmode], aggr=[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
24+
ProjectionExec: expr=[o_orderpriority@1 as o_orderpriority, l_shipmode@0 as l_shipmode]
25+
CoalesceBatchesExec: target_batch_size=8192
26+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_orderkey@0, o_orderkey@0)], projection=[l_shipmode@1, o_orderpriority@3]
27+
CoalesceBatchesExec: target_batch_size=8192
28+
RepartitionExec: partitioning=Hash([l_orderkey@0], 2), input_partitions=2
29+
CoalesceBatchesExec: target_batch_size=8192
30+
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, projection=[l_orderkey@0, l_shipmode@4]
31+
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (SHIP, FOB)]
32+
CoalesceBatchesExec: target_batch_size=8192
33+
RepartitionExec: partitioning=Hash([o_orderkey@0], 2), input_partitions=2
34+
ParquetExec: file_groups={ ... }, projection=[o_orderkey, o_orderpriority]
35+
36+
DataFusion Ray Distributed Plan
37+
===========
38+
39+
Query Stage #0 (2 -> 2):
40+
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2))
41+
CoalesceBatchesExec: target_batch_size=8192
42+
FilterExec: (l_shipmode@4 = FOB OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 1995-01-01 AND l_receiptdate@3 < 1996-01-01, projection=[l_orderkey@0, l_shipmode@4]
43+
ParquetExec: file_groups={ ... }, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], predicate=(l_shipmode@14 = FOB OR l_shipmode@14 = SHIP) AND l_receiptdate@12 > l_commitdate@11 AND l_shipdate@10 < l_commitdate@11 AND l_receiptdate@12 >= 1995-01-01 AND l_receiptdate@12 < 1996-01-01, pruning_predicate=(CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= FOB AND FOB <= l_shipmode_max@1 END OR CASE WHEN l_shipmode_null_count@2 = l_shipmode_row_count@3 THEN false ELSE l_shipmode_min@0 <= SHIP AND SHIP <= l_shipmode_max@1 END) AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_max@4 >= 1995-01-01 END AND CASE WHEN l_receiptdate_null_count@5 = l_receiptdate_row_count@6 THEN false ELSE l_receiptdate_min@7 < 1996-01-01 END, required_guarantees=[l_shipmode in (SHIP, FOB)]
44+
45+
Query Stage #1 (2 -> 2):
46+
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2))
47+
ParquetExec: file_groups={ ... }, projection=[o_orderkey, o_orderpriority]
48+
49+
Query Stage #2 (2 -> 2):
50+
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))
51+
AggregateExec: mode=Partial, gby=[l_shipmode@1 as l_shipmode], aggr=[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
52+
ProjectionExec: expr=[o_orderpriority@1 as o_orderpriority, l_shipmode@0 as l_shipmode]
53+
CoalesceBatchesExec: target_batch_size=8192
54+
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_orderkey@0, o_orderkey@0)], projection=[l_shipmode@1, o_orderpriority@3]
55+
CoalesceBatchesExec: target_batch_size=8192
56+
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2))
57+
CoalesceBatchesExec: target_batch_size=8192
58+
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2))
59+
60+
Query Stage #3 (2 -> 2):
61+
ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))
62+
SortExec: expr=[l_shipmode@0 ASC NULLS LAST], preserve_partitioning=[true]
63+
ProjectionExec: expr=[l_shipmode@0 as l_shipmode, sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
64+
AggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[sum(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
65+
CoalesceBatchesExec: target_batch_size=8192
66+
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))
67+
68+
Query Stage #4 (2 -> 1):
69+
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
70+
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))
71+

0 commit comments

Comments
 (0)