Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 32 additions & 65 deletions dev/diffs/3.5.7.diff
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git a/pom.xml b/pom.xml
index 68e2c422a24..d971894ffe6 100644
index a0e25ce4d8d..7db86212507 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,8 @@
Expand Down Expand Up @@ -38,7 +38,7 @@ index 68e2c422a24..d971894ffe6 100644
</dependencyManagement>

diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index f08b33575fc..424e0da32fd 100644
index e3d324c8edb..22342150522 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -77,6 +77,10 @@
Expand Down Expand Up @@ -216,7 +216,7 @@ index 0efe0877e9b..423d3b3d76d 100644
-- SELECT_HAVING
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 9815cb816c9..95b5f9992b0 100644
index e5494726695..00937f025c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
Expand All @@ -239,7 +239,7 @@ index 9815cb816c9..95b5f9992b0 100644

test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 5a8681aed97..da9d25e2eb4 100644
index 6f3090d8908..c08a60fb0c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
Expand Down Expand Up @@ -336,7 +336,7 @@ index 7ee18df3756..d09f70e5d99 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 47a311c71d5..342e71cfdd4 100644
index a1d5d579338..c201d39cc78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
Expand Down Expand Up @@ -624,7 +624,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 4d256154c85..66a5473852d 100644
index 44c8cb92fc3..f098beeca26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -822,7 +822,7 @@ index 4d256154c85..66a5473852d 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1611,6 +1640,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -832,7 +832,7 @@ index 4d256154c85..66a5473852d 100644
}.size == 1)
}
}
@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1655,14 +1687,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -855,7 +855,7 @@ index 4d256154c85..66a5473852d 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1798,7 +1836,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand All @@ -879,7 +879,7 @@ index c26757c9cff..d55775f09d7 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 793a0da6a86..181bfc16e4b 100644
index 3cf2bfd17ab..49728c35c42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand Down Expand Up @@ -2050,10 +2050,10 @@ index 8e88049f51e..8f3cf8a0f80 100644
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4f8a9e39716..fb55ac7a955 100644
index 8ed9ef1630e..eed2a6f5ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

Expand Down Expand Up @@ -2736,7 +2736,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index e937173a590..ca06132102d 100644
index e937173a590..15feb013bae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
Expand Down Expand Up @@ -2785,7 +2785,7 @@ index e937173a590..ca06132102d 100644
}
}

@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
@@ -242,6 +265,20 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}

Expand All @@ -2795,15 +2795,6 @@ index e937173a590..ca06132102d 100644
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
+
+ /**
+ * Whether to enable ansi mode This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def enableCometAnsiMode: Boolean = {
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
+ v != null && v.toBoolean
+ }
+
+ /**
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
Expand All @@ -2815,7 +2806,7 @@ index e937173a590..ca06132102d 100644
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -435,6 +481,8 @@ private[sql] trait SQLTestUtilsBase
@@ -435,6 +472,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2825,10 +2816,10 @@ index e937173a590..ca06132102d 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..a5ea58146ad 100644
index ed2e309fa07..81ae825ca79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2838,24 +2829,13 @@ index ed2e309fa07..a5ea58146ad 100644
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ .set("spark.comet.sparkToColumnar.enabled", "true")
+ .set("spark.comet.sparkToColumnar.supportedOperatorList", "LocalTableScan")
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down Expand Up @@ -2989,10 +2969,10 @@ index 6160c3e5f6c..0956d7d9edc 100644

test("SPARK-4963 DataFrame sample on mutable row return wrong result") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d646f40b3e..5babe505301 100644
index 1d646f40b3e..dde4a3b516f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,54 @@ object TestHive
@@ -53,25 +53,41 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3038,25 +3018,12 @@ index 1d646f40b3e..5babe505301 100644
+ conf
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.sparkToColumnar.enabled", "true")
+ .set("spark.comet.sparkToColumnar.supportedOperatorList", "LocalTableScan")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ }

+ conf
Expand Down
Loading