Skip to content

Commit 315d6a7

Browse files
authored
fix: metrics tests for native_datafusion experimental native scan (apache#1445)
* Fix ParquetReadSuite scan metrics for native_datafusion. * Fix CometExecSuite native metrics: scan for native_datafusion.
1 parent 0e44b89 commit 315d6a7

File tree

2 files changed

+73
-38
lines changed

2 files changed

+73
-38
lines changed

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import org.scalactic.source.Position
2828
import org.scalatest.Tag
2929

3030
import org.apache.hadoop.fs.Path
31-
import org.apache.spark.sql.{AnalysisException, Column, CometTestBase, DataFrame, DataFrameWriter, Row, SaveMode}
31+
import org.apache.spark.sql._
3232
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3333
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
3434
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Hex}
3535
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateMode, BloomFilterAggregate}
36-
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometNativeScanExec, CometProjectExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometSparkToColumnarExec, CometTakeOrderedAndProjectExec}
36+
import org.apache.spark.sql.comet._
3737
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
3838
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
3939
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes
4343
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
4444
import org.apache.spark.sql.execution.window.WindowExec
4545
import org.apache.spark.sql.expressions.Window
46-
import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum}
46+
import org.apache.spark.sql.functions._
4747
import org.apache.spark.sql.internal.SQLConf
4848
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
4949
import org.apache.spark.unsafe.types.UTF8String
@@ -53,6 +53,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plu
5353
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
5454

5555
class CometExecSuite extends CometTestBase {
56+
5657
import testImplicits._
5758

5859
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
@@ -216,18 +217,18 @@ class CometExecSuite extends CometTestBase {
216217

217218
withTable("t1") {
218219
sql("""
219-
|CREATE TABLE t1 USING PARQUET
220-
|AS SELECT * FROM VALUES
221-
|(1, "a"),
222-
|(2, "a"),
223-
|(3, "a") t(id, value)
224-
|""".stripMargin)
220+
|CREATE TABLE t1 USING PARQUET
221+
|AS SELECT * FROM VALUES
222+
|(1, "a"),
223+
|(2, "a"),
224+
|(3, "a") t(id, value)
225+
|""".stripMargin)
225226
val df = sql("""
226-
|WITH t2 AS (
227-
| SELECT * FROM t1 ORDER BY id
228-
|)
229-
|SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
230-
|""".stripMargin)
227+
|WITH t2 AS (
228+
| SELECT * FROM t1 ORDER BY id
229+
|)
230+
|SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
231+
|""".stripMargin)
231232
checkSparkAnswer(df)
232233
}
233234
}
@@ -532,15 +533,34 @@ class CometExecSuite extends CometTestBase {
532533
val df = sql("SELECT * FROM tbl WHERE _2 > _3")
533534
df.collect()
534535

535-
val metrics = find(df.queryExecution.executedPlan)(s =>
536+
find(df.queryExecution.executedPlan)(s =>
536537
s.isInstanceOf[CometScanExec] || s.isInstanceOf[CometNativeScanExec])
537-
.map(_.metrics)
538-
.get
538+
.foreach(scan => {
539+
val metrics = scan.metrics
540+
scan match {
541+
case _: CometScanExec => {
542+
assert(metrics.contains("scanTime"))
543+
assert(metrics.contains("cast_time"))
544+
assert(metrics("scanTime").value > 0)
545+
assert(metrics("cast_time").value > 0)
546+
}
547+
case _: CometNativeScanExec => {
548+
assert(metrics.contains("time_elapsed_scanning_total"))
549+
assert(metrics.contains("bytes_scanned"))
550+
assert(metrics.contains("output_rows"))
551+
assert(metrics.contains("time_elapsed_opening"))
552+
assert(metrics.contains("time_elapsed_processing"))
553+
assert(metrics.contains("time_elapsed_scanning_until_data"))
554+
assert(metrics("time_elapsed_scanning_total").value > 0)
555+
assert(metrics("bytes_scanned").value > 0)
556+
assert(metrics("output_rows").value == 0)
557+
assert(metrics("time_elapsed_opening").value > 0)
558+
assert(metrics("time_elapsed_processing").value > 0)
559+
assert(metrics("time_elapsed_scanning_until_data").value > 0)
560+
}
561+
}
562+
})
539563

540-
assert(metrics.contains("scanTime"))
541-
assert(metrics.contains("cast_time"))
542-
assert(metrics("scanTime").value > 0)
543-
assert(metrics("cast_time").value > 0)
544564
}
545565
}
546566
}
@@ -1265,16 +1285,16 @@ class CometExecSuite extends CometTestBase {
12651285
| ) VALUES('a')
12661286
""".stripMargin)
12671287
checkSparkAnswerAndOperator(sql("""
1268-
| SELECT
1269-
| name,
1270-
| CAST(part1 AS STRING),
1271-
| CAST(part2 as STRING),
1272-
| CAST(part3 as STRING),
1273-
| part4,
1274-
| part5,
1275-
| part6,
1276-
| part7
1277-
| FROM t1
1288+
| SELECT
1289+
| name,
1290+
| CAST(part1 AS STRING),
1291+
| CAST(part2 as STRING),
1292+
| CAST(part3 as STRING),
1293+
| part4,
1294+
| part5,
1295+
| part6,
1296+
| part7
1297+
| FROM t1
12781298
""".stripMargin))
12791299

12801300
val e = intercept[AnalysisException] {
@@ -1504,17 +1524,17 @@ class CometExecSuite extends CometTestBase {
15041524
.saveAsTable("t1")
15051525

15061526
val df1 = spark.sql("""
1507-
|SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b) AS rn
1508-
|FROM t1 LIMIT 3
1509-
|""".stripMargin)
1527+
|SELECT a, b, ROW_NUMBER() OVER(ORDER BY a, b) AS rn
1528+
|FROM t1 LIMIT 3
1529+
|""".stripMargin)
15101530

15111531
assert(df1.rdd.getNumPartitions == 1)
15121532
checkSparkAnswerAndOperator(df1, classOf[WindowExec])
15131533

15141534
val df2 = spark.sql("""
1515-
|SELECT b, RANK() OVER(ORDER BY a, b) AS rk, DENSE_RANK(b) OVER(ORDER BY a, b) AS s
1516-
|FROM t1 LIMIT 2
1517-
|""".stripMargin)
1535+
|SELECT b, RANK() OVER(ORDER BY a, b) AS rk, DENSE_RANK(b) OVER(ORDER BY a, b) AS s
1536+
|FROM t1 LIMIT 2
1537+
|""".stripMargin)
15181538
assert(df2.rdd.getNumPartitions == 1)
15191539
checkSparkAnswerAndOperator(df2, classOf[WindowExec], classOf[ProjectExec])
15201540

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,7 @@ abstract class ParquetReadSuite extends CometTestBase {
10261026
}
10271027

10281028
test("scan metrics") {
1029-
val metricNames = Seq(
1029+
val cometScanMetricNames = Seq(
10301030
"ParquetRowGroups",
10311031
"ParquetNativeDecodeTime",
10321032
"ParquetNativeLoadTime",
@@ -1035,14 +1035,29 @@ abstract class ParquetReadSuite extends CometTestBase {
10351035
"ParquetInputFileReadSize",
10361036
"ParquetInputFileReadThroughput")
10371037

1038+
val cometNativeScanMetricNames = Seq(
1039+
"time_elapsed_scanning_total",
1040+
"bytes_scanned",
1041+
"output_rows",
1042+
"time_elapsed_opening",
1043+
"time_elapsed_processing",
1044+
"time_elapsed_scanning_until_data")
1045+
10381046
withParquetTable((0 until 10000).map(i => (i, i.toDouble)), "tbl") {
10391047
val df = sql("SELECT * FROM tbl WHERE _1 > 0")
10401048
val scans = df.queryExecution.executedPlan collect {
10411049
case s: CometScanExec => s
10421050
case s: CometBatchScanExec => s
1051+
case s: CometNativeScanExec => s
10431052
}
10441053
assert(scans.size == 1, s"Expect one scan node but found ${scans.size}")
10451054
val metrics = scans.head.metrics
1055+
1056+
val metricNames = scans.head match {
1057+
case _: CometNativeScanExec => cometNativeScanMetricNames
1058+
case _ => cometScanMetricNames
1059+
}
1060+
10461061
metricNames.foreach { metricName =>
10471062
assert(metrics.contains(metricName), s"metric $metricName was not found")
10481063
}

0 commit comments

Comments
 (0)