Skip to content

Commit 2d8e142

Browse files
authored
feat: CometNativeScan metrics from ParquetFileMetrics and FileStreamMetrics (#1172)
* Sync with main. * Spark History server sems to appead the word total at the end, so I removed the redundant word at the beginning of the new metrics. * Fix typo.
1 parent aaf57c8 commit 2d8e142

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
2828
import org.apache.spark.sql.execution._
2929
import org.apache.spark.sql.execution.datasources._
30+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3031
import org.apache.spark.sql.types._
3132
import org.apache.spark.util.collection._
3233

@@ -59,6 +60,7 @@ case class CometNativeScanExec(
5960

6061
override def outputPartitioning: Partitioning =
6162
UnknownPartitioning(originalPlan.inputRDD.getNumPartitions)
63+
6264
override def outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering
6365

6466
override def stringArgs: Iterator[Any] = Iterator(output)
@@ -74,6 +76,83 @@ case class CometNativeScanExec(
7476
}
7577

7678
override def hashCode(): Int = Objects.hashCode(output)
79+
80+
override lazy val metrics: Map[String, SQLMetric] = {
81+
// We don't append CometMetricNode.baselineMetrics because
82+
// elapsed_compute has no counterpart on the native side.
83+
Map(
84+
"output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
85+
"time_elapsed_opening" ->
86+
SQLMetrics.createNanoTimingMetric(
87+
sparkContext,
88+
"Wall clock time elapsed for file opening"),
89+
"time_elapsed_scanning_until_data" ->
90+
SQLMetrics.createNanoTimingMetric(
91+
sparkContext,
92+
"Wall clock time elapsed for file scanning + " +
93+
"first record batch of decompression + decoding"),
94+
"time_elapsed_scanning_total" ->
95+
SQLMetrics.createNanoTimingMetric(
96+
sparkContext,
97+
"Elapsed wall clock time for for scanning " +
98+
"+ record batch decompression / decoding"),
99+
"time_elapsed_processing" ->
100+
SQLMetrics.createNanoTimingMetric(
101+
sparkContext,
102+
"Wall clock time elapsed for data decompression + decoding"),
103+
"file_open_errors" ->
104+
SQLMetrics.createMetric(sparkContext, "Count of errors opening file"),
105+
"file_scan_errors" ->
106+
SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"),
107+
"predicate_evaluation_errors" ->
108+
SQLMetrics.createMetric(
109+
sparkContext,
110+
"Number of times the predicate could not be evaluated"),
111+
"row_groups_matched_bloom_filter" ->
112+
SQLMetrics.createMetric(
113+
sparkContext,
114+
"Number of row groups whose bloom filters were checked and matched (not pruned)"),
115+
"row_groups_pruned_bloom_filter" ->
116+
SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"),
117+
"row_groups_matched_statistics" ->
118+
SQLMetrics.createMetric(
119+
sparkContext,
120+
"Number of row groups whose statistics were checked and matched (not pruned)"),
121+
"row_groups_pruned_statistics" ->
122+
SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"),
123+
"bytes_scanned" ->
124+
SQLMetrics.createSizeMetric(sparkContext, "Number of bytes scanned"),
125+
"pushdown_rows_pruned" ->
126+
SQLMetrics.createMetric(
127+
sparkContext,
128+
"Rows filtered out by predicates pushed into parquet scan"),
129+
"pushdown_rows_matched" ->
130+
SQLMetrics.createMetric(sparkContext, "Rows passed predicates pushed into parquet scan"),
131+
"row_pushdown_eval_time" ->
132+
SQLMetrics.createNanoTimingMetric(
133+
sparkContext,
134+
"Time spent evaluating row-level pushdown filters"),
135+
"statistics_eval_time" ->
136+
SQLMetrics.createNanoTimingMetric(
137+
sparkContext,
138+
"Time spent evaluating row group-level statistics filters"),
139+
"bloom_filter_eval_time" ->
140+
SQLMetrics.createNanoTimingMetric(
141+
sparkContext,
142+
"Time spent evaluating row group Bloom Filters"),
143+
"page_index_rows_pruned" ->
144+
SQLMetrics.createMetric(sparkContext, "Rows filtered out by parquet page index"),
145+
"page_index_rows_matched" ->
146+
SQLMetrics.createMetric(sparkContext, "Rows passed through the parquet page index"),
147+
"page_index_eval_time" ->
148+
SQLMetrics.createNanoTimingMetric(
149+
sparkContext,
150+
"Time spent evaluating parquet page index filters"),
151+
"metadata_load_time" ->
152+
SQLMetrics.createNanoTimingMetric(
153+
sparkContext,
154+
"Time spent reading and parsing metadata from the footer"))
155+
}
77156
}
78157

79158
object CometNativeScanExec extends DataTypeSupport {
@@ -102,6 +181,7 @@ object CometNativeScanExec extends DataTypeSupport {
102181
case other: AnyRef => other
103182
case null => null
104183
}
184+
105185
val newArgs = mapProductIterator(scanExec, transform(_))
106186
val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
107187
val batchScanExec = CometNativeScanExec(

0 commit comments

Comments
 (0)