Skip to content

Commit 8626b04

Browse files
authored
fix: set RangePartitioning for native shuffle default to false (#1907)
1 parent 9947e88 commit 8626b04

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+117
-139
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,14 @@ object CometConf extends ShimCometConf {
315315
.booleanConf
316316
.createWithDefault(true)
317317

318+
// RangePartitioning contains bugs https://github.com/apache/datafusion-comet/issues/1906
318319
val COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
319320
conf("spark.comet.native.shuffle.partitioning.range.enabled")
320-
.doc("Whether to enable range partitioning for Comet native shuffle.")
321+
.doc("Experimental feature to enable range partitioning for Comet native shuffle. " +
322+
"This feature is experimental while we investigate scenarios that don't partition data " +
323+
"correctly.")
321324
.booleanConf
322-
.createWithDefault(true)
325+
.createWithDefault(false)
323326

324327
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
325328
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Comet provides the following configuration settings.
7676
| spark.comet.memoryOverhead | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. This config is optional. If this is not specified, it will be set to `spark.comet.memory.overhead.factor` * `spark.executor.memory`. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | |
7777
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
7878
| spark.comet.native.shuffle.partitioning.hash.enabled | Whether to enable hash partitioning for Comet native shuffle. | true |
79-
| spark.comet.native.shuffle.partitioning.range.enabled | Whether to enable range partitioning for Comet native shuffle. | true |
79+
| spark.comet.native.shuffle.partitioning.range.enabled | Experimental feature to enable range partitioning for Comet native shuffle. This feature is experimental while we investigate scenarios that don't partition data correctly. | false |
8080
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
8181
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
8282
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q2/explain.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
== Physical Plan ==
22
* CometColumnarToRow (34)
33
+- CometSort (33)
4-
+- CometExchange (32)
4+
+- CometColumnarExchange (32)
55
+- CometProject (31)
66
+- CometBroadcastHashJoin (30)
77
:- CometProject (20)
@@ -180,9 +180,9 @@ Arguments: [d_week_seq1#29], [(d_week_seq2#54 - 53)], Inner, BuildRight
180180
Input [16]: [d_week_seq1#29, sun_sales1#30, mon_sales1#31, tue_sales1#32, wed_sales1#33, thu_sales1#34, fri_sales1#35, sat_sales1#36, d_week_seq2#54, sun_sales2#55, mon_sales2#56, tue_sales2#57, wed_sales2#58, thu_sales2#59, fri_sales2#60, sat_sales2#61]
181181
Arguments: [d_week_seq1#29, round((sun_sales1 / sun_sales2), 2)#62, round((mon_sales1 / mon_sales2), 2)#63, round((tue_sales1 / tue_sales2), 2)#64, round((wed_sales1 / wed_sales2), 2)#65, round((thu_sales1 / thu_sales2), 2)#66, round((fri_sales1 / fri_sales2), 2)#67, round((sat_sales1 / sat_sales2), 2)#68], [d_week_seq1#29, round((sun_sales1#30 / sun_sales2#55), 2) AS round((sun_sales1 / sun_sales2), 2)#62, round((mon_sales1#31 / mon_sales2#56), 2) AS round((mon_sales1 / mon_sales2), 2)#63, round((tue_sales1#32 / tue_sales2#57), 2) AS round((tue_sales1 / tue_sales2), 2)#64, round((wed_sales1#33 / wed_sales2#58), 2) AS round((wed_sales1 / wed_sales2), 2)#65, round((thu_sales1#34 / thu_sales2#59), 2) AS round((thu_sales1 / thu_sales2), 2)#66, round((fri_sales1#35 / fri_sales2#60), 2) AS round((fri_sales1 / fri_sales2), 2)#67, round((sat_sales1#36 / sat_sales2#61), 2) AS round((sat_sales1 / sat_sales2), 2)#68]
182182

183-
(32) CometExchange
183+
(32) CometColumnarExchange
184184
Input [8]: [d_week_seq1#29, round((sun_sales1 / sun_sales2), 2)#62, round((mon_sales1 / mon_sales2), 2)#63, round((tue_sales1 / tue_sales2), 2)#64, round((wed_sales1 / wed_sales2), 2)#65, round((thu_sales1 / thu_sales2), 2)#66, round((fri_sales1 / fri_sales2), 2)#67, round((sat_sales1 / sat_sales2), 2)#68]
185-
Arguments: rangepartitioning(d_week_seq1#29 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]
185+
Arguments: rangepartitioning(d_week_seq1#29 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=2]
186186

187187
(33) CometSort
188188
Input [8]: [d_week_seq1#29, round((sun_sales1 / sun_sales2), 2)#62, round((mon_sales1 / mon_sales2), 2)#63, round((tue_sales1 / tue_sales2), 2)#64, round((wed_sales1 / wed_sales2), 2)#65, round((thu_sales1 / thu_sales2), 2)#66, round((fri_sales1 / fri_sales2), 2)#67, round((sat_sales1 / sat_sales2), 2)#68]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q2/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
44
CometSort [d_week_seq1,round((sun_sales1 / sun_sales2), 2),round((mon_sales1 / mon_sales2), 2),round((tue_sales1 / tue_sales2), 2),round((wed_sales1 / wed_sales2), 2),round((thu_sales1 / thu_sales2), 2),round((fri_sales1 / fri_sales2), 2),round((sat_sales1 / sat_sales2), 2)]
5-
CometExchange [d_week_seq1] #1
5+
CometColumnarExchange [d_week_seq1] #1
66
CometProject [sun_sales1,sun_sales2,mon_sales1,mon_sales2,tue_sales1,tue_sales2,wed_sales1,wed_sales2,thu_sales1,thu_sales2,fri_sales1,fri_sales2,sat_sales1,sat_sales2] [d_week_seq1,round((sun_sales1 / sun_sales2), 2),round((mon_sales1 / mon_sales2), 2),round((tue_sales1 / tue_sales2), 2),round((wed_sales1 / wed_sales2), 2),round((thu_sales1 / thu_sales2), 2),round((fri_sales1 / fri_sales2), 2),round((sat_sales1 / sat_sales2), 2)]
77
CometBroadcastHashJoin [d_week_seq1,sun_sales1,mon_sales1,tue_sales1,wed_sales1,thu_sales1,fri_sales1,sat_sales1,d_week_seq2,sun_sales2,mon_sales2,tue_sales2,wed_sales2,thu_sales2,fri_sales2,sat_sales2]
88
CometProject [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales] [d_week_seq1,sun_sales1,mon_sales1,tue_sales1,wed_sales1,thu_sales1,fri_sales1,sat_sales1]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q31/explain.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
== Physical Plan ==
22
* CometColumnarToRow (90)
33
+- CometSort (89)
4-
+- CometExchange (88)
4+
+- CometColumnarExchange (88)
55
+- CometProject (87)
66
+- CometBroadcastHashJoin (86)
77
:- CometProject (73)
@@ -496,9 +496,9 @@ Arguments: [ca_county#42], [ca_county#64], Inner, (CASE WHEN (web_sales#55 > 0.0
496496
Input [10]: [ca_county#9, d_year#6, store_sales#22, store_sales#21, store_sales#33, ca_county#42, web_sales#44, web_sales#55, ca_county#64, web_sales#66]
497497
Arguments: [ca_county#9, d_year#6, web_q1_q2_increase#67, store_q1_q2_increase#68, web_q2_q3_increase#69, store_q2_q3_increase#70], [ca_county#9, d_year#6, (web_sales#55 / web_sales#44) AS web_q1_q2_increase#67, (store_sales#21 / store_sales#22) AS store_q1_q2_increase#68, (web_sales#66 / web_sales#55) AS web_q2_q3_increase#69, (store_sales#33 / store_sales#21) AS store_q2_q3_increase#70]
498498

499-
(88) CometExchange
499+
(88) CometColumnarExchange
500500
Input [6]: [ca_county#9, d_year#6, web_q1_q2_increase#67, store_q1_q2_increase#68, web_q2_q3_increase#69, store_q2_q3_increase#70]
501-
Arguments: rangepartitioning(ca_county#9 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=7]
501+
Arguments: rangepartitioning(ca_county#9 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=7]
502502

503503
(89) CometSort
504504
Input [6]: [ca_county#9, d_year#6, web_q1_q2_increase#67, store_q1_q2_increase#68, web_q2_q3_increase#69, store_q2_q3_increase#70]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q31/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
44
CometSort [ca_county,d_year,web_q1_q2_increase,store_q1_q2_increase,web_q2_q3_increase,store_q2_q3_increase]
5-
CometExchange [ca_county] #1
5+
CometColumnarExchange [ca_county] #1
66
CometProject [web_sales,web_sales,store_sales,store_sales,web_sales,store_sales] [ca_county,d_year,web_q1_q2_increase,store_q1_q2_increase,web_q2_q3_increase,store_q2_q3_increase]
77
CometBroadcastHashJoin [ca_county,d_year,store_sales,store_sales,store_sales,ca_county,web_sales,web_sales,ca_county,web_sales]
88
CometProject [ca_county,d_year,store_sales,store_sales,store_sales,ca_county,web_sales,web_sales]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/explain.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
== Physical Plan ==
22
* CometColumnarToRow (33)
33
+- CometSort (32)
4-
+- CometExchange (31)
4+
+- CometColumnarExchange (31)
55
+- CometProject (30)
66
+- CometBroadcastHashJoin (29)
77
:- CometFilter (24)
@@ -176,9 +176,9 @@ Arguments: [ss_customer_sk#1], [c_customer_sk#18], Inner, BuildRight
176176
Input [8]: [ss_ticket_number#4, ss_customer_sk#1, cnt#17, c_customer_sk#18, c_salutation#23, c_first_name#24, c_last_name#25, c_preferred_cust_flag#26]
177177
Arguments: [c_last_name#25, c_first_name#24, c_salutation#23, c_preferred_cust_flag#26, ss_ticket_number#4, cnt#17], [c_last_name#25, c_first_name#24, c_salutation#23, c_preferred_cust_flag#26, ss_ticket_number#4, cnt#17]
178178

179-
(31) CometExchange
179+
(31) CometColumnarExchange
180180
Input [6]: [c_last_name#25, c_first_name#24, c_salutation#23, c_preferred_cust_flag#26, ss_ticket_number#4, cnt#17]
181-
Arguments: rangepartitioning(c_last_name#25 ASC NULLS FIRST, c_first_name#24 ASC NULLS FIRST, c_salutation#23 ASC NULLS FIRST, c_preferred_cust_flag#26 DESC NULLS LAST, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]
181+
Arguments: rangepartitioning(c_last_name#25 ASC NULLS FIRST, c_first_name#24 ASC NULLS FIRST, c_salutation#23 ASC NULLS FIRST, c_preferred_cust_flag#26 DESC NULLS LAST, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=2]
182182

183183
(32) CometSort
184184
Input [6]: [c_last_name#25, c_first_name#24, c_salutation#23, c_preferred_cust_flag#26, ss_ticket_number#4, cnt#17]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
44
CometSort [c_last_name,c_first_name,c_salutation,c_preferred_cust_flag,ss_ticket_number,cnt]
5-
CometExchange [c_last_name,c_first_name,c_salutation,c_preferred_cust_flag] #1
5+
CometColumnarExchange [c_last_name,c_first_name,c_salutation,c_preferred_cust_flag] #1
66
CometProject [c_last_name,c_first_name,c_salutation,c_preferred_cust_flag,ss_ticket_number,cnt]
77
CometBroadcastHashJoin [ss_ticket_number,ss_customer_sk,cnt,c_customer_sk,c_salutation,c_first_name,c_last_name,c_preferred_cust_flag]
88
CometFilter [ss_ticket_number,ss_customer_sk,cnt]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/explain.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
== Physical Plan ==
22
* CometColumnarToRow (47)
33
+- CometSort (46)
4-
+- CometExchange (45)
4+
+- CometColumnarExchange (45)
55
+- CometBroadcastHashJoin (44)
66
:- CometProject (23)
77
: +- CometFilter (22)
@@ -253,9 +253,9 @@ Left output [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19]
253253
Right output [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37]
254254
Arguments: [i_item_sk#6, w_warehouse_sk#7], [i_item_sk#25, w_warehouse_sk#26], Inner, BuildRight
255255

256-
(45) CometExchange
256+
(45) CometColumnarExchange
257257
Input [10]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19, w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37]
258-
Arguments: rangepartitioning(w_warehouse_sk#7 ASC NULLS FIRST, i_item_sk#6 ASC NULLS FIRST, d_moy#11 ASC NULLS FIRST, mean#18 ASC NULLS FIRST, cov#19 ASC NULLS FIRST, d_moy#30 ASC NULLS FIRST, mean#36 ASC NULLS FIRST, cov#37 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]
258+
Arguments: rangepartitioning(w_warehouse_sk#7 ASC NULLS FIRST, i_item_sk#6 ASC NULLS FIRST, d_moy#11 ASC NULLS FIRST, mean#18 ASC NULLS FIRST, cov#19 ASC NULLS FIRST, d_moy#30 ASC NULLS FIRST, mean#36 ASC NULLS FIRST, cov#37 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=3]
259259

260260
(46) CometSort
261261
Input [10]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19, w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37]

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/simplified.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ WholeStageCodegen (1)
22
CometColumnarToRow
33
InputAdapter
44
CometSort [w_warehouse_sk,i_item_sk,d_moy,mean,cov,w_warehouse_sk,i_item_sk,d_moy,mean,cov]
5-
CometExchange [w_warehouse_sk,i_item_sk,d_moy,mean,cov,d_moy,mean,cov] #1
5+
CometColumnarExchange [w_warehouse_sk,i_item_sk,d_moy,mean,cov,d_moy,mean,cov] #1
66
CometBroadcastHashJoin [w_warehouse_sk,i_item_sk,d_moy,mean,cov,w_warehouse_sk,i_item_sk,d_moy,mean,cov]
77
CometProject [stdev] [w_warehouse_sk,i_item_sk,d_moy,mean,cov]
88
CometFilter [w_warehouse_sk,i_item_sk,d_moy,stdev,mean]

0 commit comments

Comments
 (0)