Skip to content

Commit 4aa89b5

Browse files
authored
Skip row index Spark SQL tests for native_datafusion Parquet scan. (#1724)
1 parent b36db64 commit 4aa89b5

File tree

4 files changed

+113
-12
lines changed

4 files changed

+113
-12
lines changed

dev/diffs/3.4.3.diff

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2231,18 +2231,34 @@ index 240bb4e6dcb..8287ffa03ca 100644
22312231

22322232
import testImplicits._
22332233
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2234-
index 351c6d698fc..36492fe936d 100644
2234+
index 351c6d698fc..583d9225cca 100644
22352235
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
22362236
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2237-
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2237+
@@ -20,12 +20,14 @@ import java.io.File
2238+
2239+
import scala.collection.JavaConverters._
2240+
2241+
+import org.apache.comet.CometConf
2242+
import org.apache.hadoop.fs.Path
2243+
import org.apache.parquet.column.ParquetProperties._
2244+
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
22382245
import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
22392246

22402247
import org.apache.spark.sql.QueryTest
22412248
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
22422249
import org.apache.spark.sql.execution.FileSourceScanExec
22432250
import org.apache.spark.sql.execution.datasources.FileFormat
22442251
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2245-
@@ -230,6 +231,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2252+
@@ -172,6 +174,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2253+
2254+
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2255+
test (s"$label - ${conf.desc}") {
2256+
+ // native_datafusion Parquet scan does not support row index generation.
2257+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2258+
withSQLConf(conf.sqlConfs: _*) {
2259+
withTempPath { path =>
2260+
val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
2261+
@@ -230,6 +234,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
22462262
case f: FileSourceScanExec =>
22472263
numPartitions += f.inputRDD.partitions.length
22482264
numOutputRows += f.metrics("numOutputRows").value
@@ -2255,6 +2271,15 @@ index 351c6d698fc..36492fe936d 100644
22552271
case _ =>
22562272
}
22572273
assert(numPartitions > 0)
2274+
@@ -291,6 +301,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2275+
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2276+
2277+
test(s"invalid row index column type - ${conf.desc}") {
2278+
+ // native_datafusion Parquet scan does not support row index generation.
2279+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2280+
withSQLConf(conf.sqlConfs: _*) {
2281+
withTempPath{ path =>
2282+
val df = spark.range(0, 10, 1, 1).toDF("id")
22582283
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
22592284
index 5c0b7def039..151184bc98c 100644
22602285
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

dev/diffs/3.5.4.diff

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,18 +2248,34 @@ index 4f906411345..6cc69f7e915 100644
22482248

22492249
import testImplicits._
22502250
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2251-
index 27c2a2148fd..1d93d0eb8bc 100644
2251+
index 27c2a2148fd..df04a15fb1f 100644
22522252
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
22532253
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2254-
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2254+
@@ -20,12 +20,14 @@ import java.io.File
2255+
2256+
import scala.collection.JavaConverters._
2257+
2258+
+import org.apache.comet.CometConf
2259+
import org.apache.hadoop.fs.Path
2260+
import org.apache.parquet.column.ParquetProperties._
2261+
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
22552262
import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
22562263

22572264
import org.apache.spark.sql.QueryTest
22582265
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
22592266
import org.apache.spark.sql.execution.FileSourceScanExec
22602267
import org.apache.spark.sql.execution.datasources.FileFormat
22612268
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2262-
@@ -243,6 +244,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2269+
@@ -172,6 +174,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2270+
2271+
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2272+
test (s"$label - ${conf.desc}") {
2273+
+ // native_datafusion Parquet scan does not support row index generation.
2274+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2275+
withSQLConf(conf.sqlConfs: _*) {
2276+
withTempPath { path =>
2277+
// Read row index using _metadata.row_index if that is supported by the file format.
2278+
@@ -243,6 +247,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
22632279
case f: FileSourceScanExec =>
22642280
numPartitions += f.inputRDD.partitions.length
22652281
numOutputRows += f.metrics("numOutputRows").value
@@ -2272,6 +2288,15 @@ index 27c2a2148fd..1d93d0eb8bc 100644
22722288
case _ =>
22732289
}
22742290
assert(numPartitions > 0)
2291+
@@ -301,6 +311,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2292+
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2293+
2294+
test(s"invalid row index column type - ${conf.desc}") {
2295+
+ // native_datafusion Parquet scan does not support row index generation.
2296+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2297+
withSQLConf(conf.sqlConfs: _*) {
2298+
withTempPath{ path =>
2299+
val df = spark.range(0, 10, 1, 1).toDF("id")
22752300
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
22762301
index 5c0b7def039..151184bc98c 100644
22772302
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

dev/diffs/3.5.5.diff

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,18 +2248,34 @@ index 4f906411345..6cc69f7e915 100644
22482248

22492249
import testImplicits._
22502250
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2251-
index 27c2a2148fd..1d93d0eb8bc 100644
2251+
index 27c2a2148fd..df04a15fb1f 100644
22522252
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
22532253
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2254-
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2254+
@@ -20,12 +20,14 @@ import java.io.File
2255+
2256+
import scala.collection.JavaConverters._
2257+
2258+
+import org.apache.comet.CometConf
2259+
import org.apache.hadoop.fs.Path
2260+
import org.apache.parquet.column.ParquetProperties._
2261+
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
22552262
import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
22562263

22572264
import org.apache.spark.sql.QueryTest
22582265
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
22592266
import org.apache.spark.sql.execution.FileSourceScanExec
22602267
import org.apache.spark.sql.execution.datasources.FileFormat
22612268
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2262-
@@ -243,6 +244,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2269+
@@ -172,6 +174,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2270+
2271+
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2272+
test (s"$label - ${conf.desc}") {
2273+
+ // native_datafusion Parquet scan does not support row index generation.
2274+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2275+
withSQLConf(conf.sqlConfs: _*) {
2276+
withTempPath { path =>
2277+
// Read row index using _metadata.row_index if that is supported by the file format.
2278+
@@ -243,6 +247,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
22632279
case f: FileSourceScanExec =>
22642280
numPartitions += f.inputRDD.partitions.length
22652281
numOutputRows += f.metrics("numOutputRows").value
@@ -2272,6 +2288,15 @@ index 27c2a2148fd..1d93d0eb8bc 100644
22722288
case _ =>
22732289
}
22742290
assert(numPartitions > 0)
2291+
@@ -301,6 +311,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2292+
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2293+
2294+
test(s"invalid row index column type - ${conf.desc}") {
2295+
+ // native_datafusion Parquet scan does not support row index generation.
2296+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2297+
withSQLConf(conf.sqlConfs: _*) {
2298+
withTempPath{ path =>
2299+
val df = spark.range(0, 10, 1, 1).toDF("id")
22752300
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
22762301
index 5c0b7def039..151184bc98c 100644
22772302
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

dev/diffs/4.0.0-preview1.diff

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2476,18 +2476,35 @@ index 6d9092391a9..6da095120d1 100644
24762476

24772477
import testImplicits._
24782478
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2479-
index 95378d94674..0c915fdc634 100644
2479+
index 95378d94674..2b75ffad4d7 100644
24802480
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
24812481
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2482-
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
2482+
@@ -20,6 +20,7 @@ import java.io.File
2483+
2484+
import scala.jdk.CollectionConverters._
2485+
2486+
+import org.apache.comet.CometConf
2487+
import org.apache.hadoop.fs.Path
2488+
import org.apache.parquet.column.ParquetProperties._
2489+
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2490+
@@ -27,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
24832491

24842492
import org.apache.spark.SparkException
24852493
import org.apache.spark.sql.QueryTest
24862494
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
24872495
import org.apache.spark.sql.execution.FileSourceScanExec
24882496
import org.apache.spark.sql.execution.datasources.FileFormat
24892497
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2490-
@@ -245,6 +246,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2498+
@@ -174,6 +176,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2499+
2500+
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2501+
test (s"$label - ${conf.desc}") {
2502+
+ // native_datafusion Parquet scan does not support row index generation.
2503+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2504+
withSQLConf(conf.sqlConfs: _*) {
2505+
withTempPath { path =>
2506+
// Read row index using _metadata.row_index if that is supported by the file format.
2507+
@@ -245,6 +249,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
24912508
case f: FileSourceScanExec =>
24922509
numPartitions += f.inputRDD.partitions.length
24932510
numOutputRows += f.metrics("numOutputRows").value
@@ -2500,6 +2517,15 @@ index 95378d94674..0c915fdc634 100644
25002517
case _ =>
25012518
}
25022519
assert(numPartitions > 0)
2520+
@@ -303,6 +313,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2521+
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2522+
2523+
test(s"invalid row index column type - ${conf.desc}") {
2524+
+ // native_datafusion Parquet scan does not support row index generation.
2525+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2526+
withSQLConf(conf.sqlConfs: _*) {
2527+
withTempPath{ path =>
2528+
val df = spark.range(0, 10, 1, 1).toDF("id")
25032529
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
25042530
index 5c0b7def039..151184bc98c 100644
25052531
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

0 commit comments

Comments
 (0)