Skip to content

Commit abcb499

Browse files
authored
chore: Add unit tests for CometScanRule (apache#2867)
1 parent 86f8976 commit abcb499

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ jobs:
141141
org.apache.spark.CometPluginsDefaultSuite
142142
org.apache.spark.CometPluginsNonOverrideSuite
143143
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
144+
org.apache.comet.rules.CometScanRuleSuite
144145
org.apache.comet.rules.CometExecRuleSuite
145146
org.apache.spark.sql.CometTPCDSQuerySuite
146147
org.apache.spark.sql.CometTPCDSQueryTestSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ jobs:
106106
org.apache.spark.CometPluginsDefaultSuite
107107
org.apache.spark.CometPluginsNonOverrideSuite
108108
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
109+
org.apache.comet.rules.CometScanRuleSuite
109110
org.apache.comet.rules.CometExecRuleSuite
110111
org.apache.spark.sql.CometTPCDSQuerySuite
111112
org.apache.spark.sql.CometTPCDSQueryTestSuite
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.rules
21+
22+
import scala.util.Random
23+
24+
import org.apache.spark.sql._
25+
import org.apache.spark.sql.comet._
26+
import org.apache.spark.sql.execution._
27+
import org.apache.spark.sql.execution.adaptive.QueryStageExec
28+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
29+
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
31+
32+
import org.apache.comet.CometConf
33+
import org.apache.comet.parquet.CometParquetScan
34+
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
35+
36+
/**
37+
* Test suite specifically for CometScanRule transformation logic.
38+
*/
39+
class CometScanRuleSuite extends CometTestBase {
40+
41+
/** Helper method to apply CometExecRule and return the transformed plan */
42+
private def applyCometScanRule(plan: SparkPlan): SparkPlan = {
43+
CometScanRule(spark).apply(stripAQEPlan(plan))
44+
}
45+
46+
/** Create a test data frame that is used in all tests */
47+
private def createTestDataFrame = {
48+
val testSchema = new StructType(
49+
Array(
50+
StructField("id", DataTypes.IntegerType, nullable = true),
51+
StructField("name", DataTypes.StringType, nullable = true)))
52+
FuzzDataGenerator.generateDataFrame(new Random(42), spark, testSchema, 100, DataGenOptions())
53+
}
54+
55+
/** Create a SparkPlan from the specified SQL with Comet disabled */
56+
private def createSparkPlan(spark: SparkSession, sql: String): SparkPlan = {
57+
var sparkPlan: SparkPlan = null
58+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
59+
val df = spark.sql(sql)
60+
sparkPlan = df.queryExecution.executedPlan
61+
}
62+
sparkPlan
63+
}
64+
65+
/** Count the number of the specified operator in the plan */
66+
private def countOperators(plan: SparkPlan, opClass: Class[_]): Int = {
67+
stripAQEPlan(plan).collect {
68+
case stage: QueryStageExec =>
69+
countOperators(stage.plan, opClass)
70+
case op if op.getClass.isAssignableFrom(opClass) => 1
71+
}.sum
72+
}
73+
74+
test("CometExecRule should replace FileSourceScanExec, but only when Comet is enabled") {
75+
withTempPath { path =>
76+
createTestDataFrame.write.parquet(path.toString)
77+
withTempView("test_data") {
78+
spark.read.parquet(path.toString).createOrReplaceTempView("test_data")
79+
80+
val sparkPlan =
81+
createSparkPlan(spark, "SELECT id, id * 2 as doubled FROM test_data WHERE id % 2 == 0")
82+
83+
// Count original Spark operators
84+
assert(countOperators(sparkPlan, classOf[FileSourceScanExec]) == 1)
85+
86+
for (cometEnabled <- Seq(true, false)) {
87+
withSQLConf(CometConf.COMET_ENABLED.key -> cometEnabled.toString) {
88+
89+
val transformedPlan = applyCometScanRule(sparkPlan)
90+
91+
if (cometEnabled) {
92+
assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 0)
93+
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 1)
94+
} else {
95+
assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1)
96+
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
97+
}
98+
}
99+
}
100+
}
101+
}
102+
}
103+
104+
test("CometExecRule should replace BatchScanExec, but only when Comet is enabled") {
105+
withTempPath { path =>
106+
createTestDataFrame.write.parquet(path.toString)
107+
withTempView("test_data") {
108+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
109+
spark.read.parquet(path.toString).createOrReplaceTempView("test_data")
110+
111+
val sparkPlan =
112+
createSparkPlan(
113+
spark,
114+
"SELECT id, id * 2 as doubled FROM test_data WHERE id % 2 == 0")
115+
116+
// Count original Spark operators
117+
assert(countOperators(sparkPlan, classOf[BatchScanExec]) == 1)
118+
119+
for (cometEnabled <- Seq(true, false)) {
120+
withSQLConf(CometConf.COMET_ENABLED.key -> cometEnabled.toString) {
121+
122+
val transformedPlan = applyCometScanRule(sparkPlan)
123+
124+
if (cometEnabled) {
125+
assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 0)
126+
assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 1)
127+
128+
// CometScanRule should have replaced the underlying scan
129+
val scan = transformedPlan.collect { case scan: CometBatchScanExec => scan }.head
130+
assert(scan.wrapped.scan.isInstanceOf[CometParquetScan])
131+
132+
} else {
133+
assert(countOperators(transformedPlan, classOf[BatchScanExec]) == 1)
134+
assert(countOperators(transformedPlan, classOf[CometBatchScanExec]) == 0)
135+
}
136+
}
137+
}
138+
}
139+
}
140+
}
141+
}
142+
143+
test("CometScanRule should fallback to Spark for unsupported data types in v1 scan") {
144+
withTempPath { path =>
145+
// Create test data with unsupported types (e.g., BinaryType, CalendarIntervalType)
146+
import org.apache.spark.sql.types._
147+
val unsupportedSchema = new StructType(
148+
Array(
149+
StructField("id", DataTypes.IntegerType, nullable = true),
150+
StructField(
151+
"value",
152+
DataTypes.ByteType,
153+
nullable = true
154+
), // Unsupported in some scan modes
155+
StructField("name", DataTypes.StringType, nullable = true)))
156+
157+
val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2"))
158+
159+
val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), unsupportedSchema)
160+
df.write.parquet(path.toString)
161+
162+
withTempView("unsupported_data") {
163+
spark.read.parquet(path.toString).createOrReplaceTempView("unsupported_data")
164+
165+
val sparkPlan =
166+
createSparkPlan(spark, "SELECT id, value FROM unsupported_data WHERE id = 1")
167+
168+
withSQLConf(
169+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
170+
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
171+
val transformedPlan = applyCometScanRule(sparkPlan)
172+
173+
// Should fallback to Spark due to unsupported ByteType in schema
174+
assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1)
175+
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
176+
}
177+
}
178+
}
179+
}
180+
181+
}

0 commit comments

Comments
 (0)