Skip to content

Commit 8cfbca2

Browse files
authored
fix: run scan metrics test with both native_datafusion and native_iceberg_compat (apache#3690)
The "Comet native metrics: scan" test only ran with SCAN_AUTO (which resolves to native_iceberg_compat), missing coverage for native_datafusion. Changes: - Run the test with both SCAN_NATIVE_DATAFUSION and SCAN_NATIVE_ICEBERG_COMPAT - Use find() with isDefined assert instead of find().foreach() to fail explicitly if no scan node is found - Remove the WHERE clause that returned 0 rows due to predicate pushdown pruning all rows in native_datafusion mode - Add scan mode context to assertion messages for easier debugging
1 parent 73a713a commit 8cfbca2

File tree

1 file changed

+36
-21
lines changed

1 file changed

+36
-21
lines changed

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -558,35 +558,50 @@ class CometExecSuite extends CometTestBase {
558558
}
559559

560560
test("Comet native metrics: scan") {
561-
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
562-
withTempDir { dir =>
563-
val path = new Path(dir.toURI.toString, "native-scan.parquet")
564-
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 10000)
565-
withParquetTable(path.toString, "tbl") {
566-
val df = sql("SELECT * FROM tbl WHERE _2 > _3")
567-
df.collect()
561+
Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach {
562+
scanMode =>
563+
withSQLConf(
564+
CometConf.COMET_EXEC_ENABLED.key -> "true",
565+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
566+
withTempDir { dir =>
567+
val path = new Path(dir.toURI.toString, "native-scan.parquet")
568+
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 10000)
569+
withParquetTable(path.toString, "tbl") {
570+
val df = sql("SELECT * FROM tbl")
571+
df.collect()
568572

569-
find(df.queryExecution.executedPlan)(s =>
570-
s.isInstanceOf[CometScanExec] || s.isInstanceOf[CometNativeScanExec])
571-
.foreach(scan => {
572-
val metrics = scan.metrics
573+
val scan = find(df.queryExecution.executedPlan)(s =>
574+
s.isInstanceOf[CometScanExec] || s.isInstanceOf[CometNativeScanExec])
575+
assert(scan.isDefined, s"Expected to find a Comet scan node for $scanMode")
576+
val metrics = scan.get.metrics
573577

574-
assert(metrics.contains("time_elapsed_scanning_total"))
578+
assert(
579+
metrics.contains("time_elapsed_scanning_total"),
580+
s"[$scanMode] Missing time_elapsed_scanning_total. Available: ${metrics.keys}")
575581
assert(metrics.contains("bytes_scanned"))
576582
assert(metrics.contains("output_rows"))
577583
assert(metrics.contains("time_elapsed_opening"))
578584
assert(metrics.contains("time_elapsed_processing"))
579585
assert(metrics.contains("time_elapsed_scanning_until_data"))
580-
assert(metrics("time_elapsed_scanning_total").value > 0)
581-
assert(metrics("bytes_scanned").value > 0)
582-
assert(metrics("output_rows").value > 0)
583-
assert(metrics("time_elapsed_opening").value > 0)
584-
assert(metrics("time_elapsed_processing").value > 0)
585-
assert(metrics("time_elapsed_scanning_until_data").value > 0)
586-
})
587-
586+
assert(
587+
metrics("time_elapsed_scanning_total").value > 0,
588+
s"[$scanMode] time_elapsed_scanning_total should be > 0")
589+
assert(
590+
metrics("bytes_scanned").value > 0,
591+
s"[$scanMode] bytes_scanned should be > 0")
592+
assert(metrics("output_rows").value > 0, s"[$scanMode] output_rows should be > 0")
593+
assert(
594+
metrics("time_elapsed_opening").value > 0,
595+
s"[$scanMode] time_elapsed_opening should be > 0")
596+
assert(
597+
metrics("time_elapsed_processing").value > 0,
598+
s"[$scanMode] time_elapsed_processing should be > 0")
599+
assert(
600+
metrics("time_elapsed_scanning_until_data").value > 0,
601+
s"[$scanMode] time_elapsed_scanning_until_data should be > 0")
602+
}
603+
}
588604
}
589-
}
590605
}
591606
}
592607

0 commit comments

Comments
 (0)