Skip to content

Commit c777fa8

Browse files
authored
chore: Update CometTestBase to stop setting the scan implementation to native_comet (#2176)
1 parent 7b85d03 commit c777fa8

File tree

9 files changed

+216
-183
lines changed

9 files changed

+216
-183
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
316316
}
317317

318318
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {
319+
320+
// this class is intended to be used with a specific scan impl
321+
assert(scanImpl != CometConf.SCAN_AUTO)
322+
319323
override def isTypeSupported(
320324
dt: DataType,
321325
name: String,

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -232,43 +232,47 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
232232
}
233233

234234
test("array_contains - test all types (native Parquet reader)") {
235-
withTempDir { dir =>
236-
val path = new Path(dir.toURI.toString, "test.parquet")
237-
val filename = path.toString
238-
val random = new Random(42)
239-
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
240-
ParquetGenerator.makeParquetFile(
241-
random,
242-
spark,
243-
filename,
244-
100,
245-
DataGenOptions(
246-
allowNull = true,
247-
generateNegativeZero = true,
248-
generateArray = true,
249-
generateStruct = true,
250-
generateMap = false))
251-
}
252-
val table = spark.read.parquet(filename)
253-
table.createOrReplaceTempView("t1")
254-
val complexTypeFields =
255-
table.schema.fields.filter(field => isComplexType(field.dataType))
256-
val primitiveTypeFields =
257-
table.schema.fields.filterNot(field => isComplexType(field.dataType))
258-
for (field <- primitiveTypeFields) {
259-
val fieldName = field.name
260-
val typeName = field.dataType.typeName
261-
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
262-
.createOrReplaceTempView("t2")
263-
checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2"))
264-
checkSparkAnswerAndOperator(
265-
sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2"))
266-
}
267-
for (field <- complexTypeFields) {
268-
val fieldName = field.name
269-
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
270-
.createOrReplaceTempView("t3")
271-
checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
235+
// TODO test fails if scan is auto
236+
// https://github.com/apache/datafusion-comet/issues/2173
237+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
238+
withTempDir { dir =>
239+
val path = new Path(dir.toURI.toString, "test.parquet")
240+
val filename = path.toString
241+
val random = new Random(42)
242+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
243+
ParquetGenerator.makeParquetFile(
244+
random,
245+
spark,
246+
filename,
247+
100,
248+
DataGenOptions(
249+
allowNull = true,
250+
generateNegativeZero = true,
251+
generateArray = true,
252+
generateStruct = true,
253+
generateMap = false))
254+
}
255+
val table = spark.read.parquet(filename)
256+
table.createOrReplaceTempView("t1")
257+
val complexTypeFields =
258+
table.schema.fields.filter(field => isComplexType(field.dataType))
259+
val primitiveTypeFields =
260+
table.schema.fields.filterNot(field => isComplexType(field.dataType))
261+
for (field <- primitiveTypeFields) {
262+
val fieldName = field.name
263+
val typeName = field.dataType.typeName
264+
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
265+
.createOrReplaceTempView("t2")
266+
checkSparkAnswerAndOperator(sql("SELECT array_contains(a, b) FROM t2"))
267+
checkSparkAnswerAndOperator(
268+
sql(s"SELECT array_contains(a, cast(null as $typeName)) FROM t2"))
269+
}
270+
for (field <- complexTypeFields) {
271+
val fieldName = field.name
272+
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
273+
.createOrReplaceTempView("t3")
274+
checkSparkAnswer(sql("SELECT array_contains(a, b) FROM t3"))
275+
}
272276
}
273277
}
274278
}
@@ -406,9 +410,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
406410
}
407411

408412
test("array_intersect") {
409-
// https://github.com/apache/datafusion-comet/issues/1441
410-
assume(!usingDataSourceExec)
411-
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
413+
// TODO test fails if scan is auto
414+
// https://github.com/apache/datafusion-comet/issues/2174
415+
withSQLConf(
416+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET,
417+
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
412418

413419
Seq(true, false).foreach { dictionaryEnabled =>
414420
withTempDir { dir =>

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -944,28 +944,31 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
944944
// Complex Types
945945

946946
test("cast StructType to StringType") {
947-
// https://github.com/apache/datafusion-comet/issues/1441
948-
assume(!usingDataSourceExec)
949-
Seq(true, false).foreach { dictionaryEnabled =>
950-
withTempDir { dir =>
951-
val path = new Path(dir.toURI.toString, "test.parquet")
952-
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
953-
withParquetTable(path.toString, "tbl") {
954-
// primitives
955-
checkSparkAnswerAndOperator(
956-
"SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl")
957-
checkSparkAnswerAndOperator("SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl")
958-
// decimals
959-
// TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved
960-
checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl")
961-
// dates & timestamps
962-
checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl")
963-
// named struct
964-
checkSparkAnswerAndOperator(
965-
"SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl")
966-
// nested struct
967-
checkSparkAnswerAndOperator(
968-
"SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl")
947+
// TODO test fails if scan is auto
948+
// https://github.com/apache/datafusion-comet/issues/2175
949+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
950+
Seq(true, false).foreach { dictionaryEnabled =>
951+
withTempDir { dir =>
952+
val path = new Path(dir.toURI.toString, "test.parquet")
953+
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
954+
withParquetTable(path.toString, "tbl") {
955+
// primitives
956+
checkSparkAnswerAndOperator(
957+
"SELECT CAST(struct(_1, _2, _3, _4, _5, _6, _7, _8) as string) FROM tbl")
958+
checkSparkAnswerAndOperator(
959+
"SELECT CAST(struct(_9, _10, _11, _12) as string) FROM tbl")
960+
// decimals
961+
// TODO add _16 when https://github.com/apache/datafusion-comet/issues/1068 is resolved
962+
checkSparkAnswerAndOperator("SELECT CAST(struct(_15, _17) as string) FROM tbl")
963+
// dates & timestamps
964+
checkSparkAnswerAndOperator("SELECT CAST(struct(_18, _19, _20) as string) FROM tbl")
965+
// named struct
966+
checkSparkAnswerAndOperator(
967+
"SELECT CAST(named_struct('a', _1, 'b', _2) as string) FROM tbl")
968+
// nested struct
969+
checkSparkAnswerAndOperator(
970+
"SELECT CAST(named_struct('a', named_struct('b', _1, 'c', _2)) as string) FROM tbl")
971+
}
969972
}
970973
}
971974
}

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,10 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
374374
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
375375
pos: Position): Unit = {
376376
Seq("native", "jvm").foreach { shuffleMode =>
377-
Seq("native_comet", "native_datafusion", "native_iceberg_compat").foreach { scanImpl =>
377+
Seq(
378+
CometConf.SCAN_NATIVE_COMET,
379+
CometConf.SCAN_NATIVE_DATAFUSION,
380+
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl =>
378381
super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) {
379382
withSQLConf(
380383
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,8 +587,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
587587
}
588588

589589
test("fix: native Unsafe row accessors return incorrect results") {
590-
// https://github.com/apache/datafusion-comet/issues/1538
591-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
590+
// TODO byte/short issue
591+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET)
592592
Seq(10, 201).foreach { numPartitions =>
593593
withTempDir { dir =>
594594
val path = new Path(dir.toURI.toString, "test.parquet")

0 commit comments

Comments
 (0)