Skip to content

Commit 3aa3dc7

Browse files
authored
feat: support RangePartitioning with native shuffle (#1862)
1 parent 7fd5797 commit 3aa3dc7

File tree

70 files changed

+1279
-291
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1279
-291
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,18 @@ object CometConf extends ShimCometConf {
309309
.booleanConf
310310
.createWithDefault(false)
311311

312+
val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
313+
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
314+
.doc("Whether to enable hash partitioning for Comet native shuffle.")
315+
.booleanConf
316+
.createWithDefault(true)
317+
318+
val COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
319+
conf("spark.comet.native.shuffle.partitioning.range.enabled")
320+
.doc("Whether to enable range partitioning for Comet native shuffle.")
321+
.booleanConf
322+
.createWithDefault(true)
323+
312324
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
313325
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
314326
.doc(
@@ -772,11 +784,13 @@ private[comet] abstract class ConfigEntry[T](
772784

773785
/**
774786
* Retrieves the config value from the current thread-local [[SQLConf]]
787+
*
775788
* @return
776789
*/
777790
def get(): T = get(SQLConf.get)
778791

779792
def defaultValue: Option[T] = None
793+
780794
def defaultValueString: String
781795

782796
override def toString: String = {
@@ -795,6 +809,7 @@ private[comet] class ConfigEntryWithDefault[T](
795809
version: String)
796810
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic, version) {
797811
override def defaultValue: Option[T] = Some(_defaultValue)
812+
798813
override def defaultValueString: String = stringConverter(_defaultValue)
799814

800815
def get(conf: SQLConf): T = {
@@ -830,6 +845,7 @@ private[comet] class OptionalConfigEntry[T](
830845
}
831846

832847
private[comet] case class ConfigBuilder(key: String) {
848+
833849
import ConfigHelpers._
834850

835851
var _public = true

dev/diffs/3.4.3.diff

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2282,10 +2282,17 @@ index d083cac48ff..3c11bcde807 100644
22822282
import testImplicits._
22832283

22842284
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2285-
index 266bb343526..c3e3d155813 100644
2285+
index 266bb343526..6675cf7b636 100644
22862286
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
22872287
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2288-
@@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
2288+
@@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
2289+
2290+
import scala.util.Random
2291+
2292+
+import org.apache.comet.CometConf
2293+
+
2294+
import org.apache.spark.sql._
2295+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
22892296
import org.apache.spark.sql.catalyst.expressions
22902297
import org.apache.spark.sql.catalyst.expressions._
22912298
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
@@ -2299,7 +2306,7 @@ index 266bb343526..c3e3d155813 100644
22992306
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
23002307
import org.apache.spark.sql.functions._
23012308
import org.apache.spark.sql.internal.SQLConf
2302-
@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2309+
@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23032310
}
23042311
}
23052312

@@ -2322,7 +2329,7 @@ index 266bb343526..c3e3d155813 100644
23222329
// To verify if the bucket pruning works, this function checks two conditions:
23232330
// 1) Check if the pruned buckets (before filtering) are empty.
23242331
// 2) Verify the final result is the same as the expected one
2325-
@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2332+
@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23262333
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
23272334
.queryExecution.executedPlan
23282335
val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2332,7 +2339,7 @@ index 266bb343526..c3e3d155813 100644
23322339

23332340
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
23342341
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2335-
@@ -451,28 +461,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2342+
@@ -451,28 +463,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23362343
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
23372344
val executedPlan =
23382345
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2390,7 +2397,7 @@ index 266bb343526..c3e3d155813 100644
23902397
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
23912398

23922399
// check the output partitioning
2393-
@@ -835,11 +866,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2400+
@@ -835,11 +868,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23942401
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
23952402

23962403
val scanDF = spark.table("bucketed_table").select("j")
@@ -2404,7 +2411,40 @@ index 266bb343526..c3e3d155813 100644
24042411
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
24052412
}
24062413
}
2407-
@@ -1026,15 +1057,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2414+
@@ -894,7 +927,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2415+
}
2416+
2417+
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
2418+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2419+
+ // the same results. Disable Comet native range partitioning.
2420+
withSQLConf(
2421+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2422+
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
2423+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
2424+
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2425+
@@ -913,7 +949,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2426+
}
2427+
2428+
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
2429+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2430+
+ // the same results. Disable Comet native range partitioning.
2431+
withSQLConf(
2432+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2433+
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
2434+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
2435+
2436+
@@ -943,7 +982,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2437+
}
2438+
2439+
test("bucket coalescing eliminates shuffle") {
2440+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2441+
+ // the same results. Disable Comet native range partitioning.
2442+
withSQLConf(
2443+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2444+
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
2445+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2446+
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2447+
@@ -1026,15 +1068,23 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24082448
expectedNumShuffles: Int,
24092449
expectedCoalescedNumBuckets: Option[Int]): Unit = {
24102450
val plan = sql(query).queryExecution.executedPlan
@@ -2816,6 +2856,34 @@ index 52abd248f3a..7a199931a08 100644
28162856
case h: HiveTableScanExec => h.partitionPruningPred.collect {
28172857
case d: DynamicPruningExpression => d.child
28182858
}
2859+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2860+
index a902cb3a69e..800a3acbe99 100644
2861+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2862+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2863+
@@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp}
2864+
import java.util.{Locale, Set}
2865+
2866+
import com.google.common.io.Files
2867+
+import org.apache.comet.CometConf
2868+
import org.apache.hadoop.fs.{FileSystem, Path}
2869+
2870+
import org.apache.spark.{SparkException, TestUtils}
2871+
@@ -838,8 +839,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
2872+
}
2873+
2874+
test("SPARK-2554 SumDistinct partial aggregation") {
2875+
- checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
2876+
- sql("SELECT distinct key FROM src order by key").collect().toSeq)
2877+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2878+
+ // the same results. Disable Comet native range partitioning.
2879+
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false")
2880+
+ {
2881+
+ checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
2882+
+ sql("SELECT distinct key FROM src order by key").collect().toSeq)
2883+
+ }
2884+
}
2885+
2886+
test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
28192887
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
28202888
index 07361cfdce9..b4d53dbe900 100644
28212889
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala

dev/diffs/3.5.6.diff

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2278,10 +2278,18 @@ index d083cac48ff..3c11bcde807 100644
22782278
import testImplicits._
22792279

22802280
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2281-
index 746f289c393..0c99d028163 100644
2281+
index 746f289c393..a90106a1463 100644
22822282
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
22832283
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
2284-
@@ -25,10 +25,11 @@ import org.apache.spark.sql.catalyst.expressions
2284+
@@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
2285+
2286+
import scala.util.Random
2287+
2288+
+import org.apache.comet.CometConf
2289+
+
2290+
import org.apache.spark.sql._
2291+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2292+
import org.apache.spark.sql.catalyst.expressions
22852293
import org.apache.spark.sql.catalyst.expressions._
22862294
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
22872295
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -2295,7 +2303,7 @@ index 746f289c393..0c99d028163 100644
22952303
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
22962304
import org.apache.spark.sql.functions._
22972305
import org.apache.spark.sql.internal.SQLConf
2298-
@@ -102,12 +103,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2306+
@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
22992307
}
23002308
}
23012309

@@ -2318,7 +2326,7 @@ index 746f289c393..0c99d028163 100644
23182326
// To verify if the bucket pruning works, this function checks two conditions:
23192327
// 1) Check if the pruned buckets (before filtering) are empty.
23202328
// 2) Verify the final result is the same as the expected one
2321-
@@ -156,7 +165,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2329+
@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23222330
val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition)
23232331
.queryExecution.executedPlan
23242332
val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2328,7 +2336,7 @@ index 746f289c393..0c99d028163 100644
23282336

23292337
val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
23302338
val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
2331-
@@ -452,28 +462,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2339+
@@ -452,28 +464,49 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23322340
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
23332341
val executedPlan =
23342342
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2386,7 +2394,7 @@ index 746f289c393..0c99d028163 100644
23862394
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
23872395

23882396
// check the output partitioning
2389-
@@ -836,11 +867,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2397+
@@ -836,11 +869,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
23902398
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
23912399

23922400
val scanDF = spark.table("bucketed_table").select("j")
@@ -2400,7 +2408,40 @@ index 746f289c393..0c99d028163 100644
24002408
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
24012409
}
24022410
}
2403-
@@ -1029,15 +1060,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2411+
@@ -895,7 +928,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2412+
}
2413+
2414+
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
2415+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2416+
+ // the same results. Disable Comet native range partitioning.
2417+
withSQLConf(
2418+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2419+
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
2420+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
2421+
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
2422+
@@ -914,7 +950,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2423+
}
2424+
2425+
test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") {
2426+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2427+
+ // the same results. Disable Comet native range partitioning.
2428+
withSQLConf(
2429+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2430+
SQLConf.SHUFFLE_PARTITIONS.key -> "9",
2431+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
2432+
2433+
@@ -944,7 +983,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
2434+
}
2435+
2436+
test("bucket coalescing eliminates shuffle") {
2437+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2438+
+ // the same results. Disable Comet native range partitioning.
2439+
withSQLConf(
2440+
+ CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false",
2441+
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
2442+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
2443+
// The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions.
2444+
@@ -1029,15 +1071,21 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
24042445
Seq(true, false).foreach { aqeEnabled =>
24052446
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled.toString) {
24062447
val plan = sql(query).queryExecution.executedPlan
@@ -2830,6 +2871,34 @@ index 549431ef4f4..e48f1730da6 100644
28302871
withTempDir { dir =>
28312872
withSQLConf(
28322873
"parquet.crypto.factory.class" ->
2874+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2875+
index 6160c3e5f6c..0956d7d9edc 100644
2876+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2877+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
2878+
@@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp}
2879+
import java.util.{Locale, Set}
2880+
2881+
import com.google.common.io.Files
2882+
+import org.apache.comet.CometConf
2883+
import org.apache.hadoop.fs.{FileSystem, Path}
2884+
2885+
import org.apache.spark.{SparkException, TestUtils}
2886+
@@ -838,8 +839,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
2887+
}
2888+
2889+
test("SPARK-2554 SumDistinct partial aggregation") {
2890+
- checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
2891+
- sql("SELECT distinct key FROM src order by key").collect().toSeq)
2892+
+ // Range partitioning uses random samples, so per-partition comparisons do not always yield
2893+
+ // the same results. Disable Comet native range partitioning.
2894+
+ withSQLConf(CometConf.COMET_EXEC_SHUFFLE_WITH_RANGE_PARTITIONING_ENABLED.key -> "false")
2895+
+ {
2896+
+ checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"),
2897+
+ sql("SELECT distinct key FROM src order by key").collect().toSeq)
2898+
+ }
2899+
}
2900+
2901+
test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
28332902
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
28342903
index 1d646f40b3e..7f2cdb8f061 100644
28352904
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala

0 commit comments

Comments
 (0)