Skip to content

Commit c27b1e1

Browse files
authored
fix: enable native_datafusion Spark SQL tests previously ignored in apache#3315 (apache#3696)
Add numOutputRows metric alias to CometNativeScanExec so Spark's streaming ProgressReporter can find input row counts. Remove IgnoreCometNativeDataFusion tags from three Spark SQL tests that now pass with native_datafusion scan.
1 parent 9b773f3 commit c27b1e1

File tree

2 files changed

+13
-53
lines changed

2 files changed

+13
-53
lines changed

dev/diffs/3.5.8.diff

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ index 3cf2bfd17ab..49728c35c42 100644
965965
SQLConf.ANSI_ENABLED.key -> "true") {
966966
withTable("t") {
967967
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
968-
index fa1a64460fc..1d2e215d6a3 100644
968+
index fa1a64460fc..134f0db1fb8 100644
969969
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
970970
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
971971
@@ -17,6 +17,8 @@
@@ -1134,31 +1134,18 @@ index d269290e616..13726a31e07 100644
11341134
}
11351135
}
11361136
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1137-
index cfc8b2cc845..b7c234e1437 100644
1137+
index cfc8b2cc845..c4be7eb3731 100644
11381138
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
11391139
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
1140-
@@ -19,8 +19,9 @@ package org.apache.spark.sql.connector
1141-
import scala.collection.mutable.ArrayBuffer
1142-
1140+
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
11431141
import org.apache.spark.SparkConf
1144-
-import org.apache.spark.sql.{AnalysisException, QueryTest}
1145-
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest}
1142+
import org.apache.spark.sql.{AnalysisException, QueryTest}
11461143
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
11471144
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
11481145
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
11491146
import org.apache.spark.sql.connector.read.ScanBuilder
11501147
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
1151-
@@ -152,7 +153,8 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
1152-
}
1153-
}
1154-
1155-
- test("Fallback Parquet V2 to V1") {
1156-
+ test("Fallback Parquet V2 to V1",
1157-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
1158-
Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format =>
1159-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
1160-
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
1161-
@@ -184,7 +186,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
1148+
@@ -184,7 +185,11 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
11621149
val df = spark.read.format(format).load(path.getCanonicalPath)
11631150
checkAnswer(df, inputData.toDF())
11641151
assert(
@@ -2930,39 +2917,6 @@ index aad91601758..201083bd621 100644
29302917
})
29312918
}
29322919

2933-
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2934-
index b5cf13a9c12..ac17603fb7f 100644
2935-
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2936-
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
2937-
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
2938-
2939-
import org.apache.spark.{SparkException, TestUtils}
2940-
import org.apache.spark.internal.Logging
2941-
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
2942-
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, IgnoreCometNativeDataFusion, Row, SaveMode}
2943-
import org.apache.spark.sql.catalyst.InternalRow
2944-
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
2945-
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LocalRelation}
2946-
@@ -660,7 +660,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
2947-
)
2948-
}
2949-
2950-
- test("SPARK-41198: input row calculation with CTE") {
2951-
+ test("SPARK-41198: input row calculation with CTE",
2952-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
2953-
withTable("parquet_tbl", "parquet_streaming_tbl") {
2954-
spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
2955-
.write.format("parquet").saveAsTable("parquet_tbl")
2956-
@@ -712,7 +713,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
2957-
}
2958-
}
2959-
2960-
- test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
2961-
+ test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources",
2962-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3315")) {
2963-
withTable("parquet_streaming_tbl") {
2964-
val streamInput = MemoryStream[Int]
2965-
val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream")
29662920
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
29672921
index 8f099c31e6b..ce4b7ad25b3 100644
29682922
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,14 @@ case class CometNativeScanExec(
202202

203203
override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)
204204

205-
override lazy val metrics: Map[String, SQLMetric] =
206-
CometMetricNode.nativeScanMetrics(session.sparkContext)
205+
override lazy val metrics: Map[String, SQLMetric] = {
206+
val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
207+
// Map native metric names to Spark metric names
208+
nativeMetrics.get("output_rows") match {
209+
case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
210+
case None => nativeMetrics
211+
}
212+
}
207213

208214
/**
209215
* See [[org.apache.spark.sql.execution.DataSourceScanExec.inputRDDs]]. Only used for tests.

0 commit comments

Comments
 (0)