Skip to content

Commit ee41857

Browse files
asl3cloud-fan
authored andcommitted
Revert "[SPARK-54310][SQL] Add numSourceRows metric for MergeIntoExec"
### What changes were proposed in this pull request? Clean revert of d65234b. Will later handle for cases of sourceSide child nodes without `numOutputRows`, and will re-target the new implementation to later Spark release. ### Why are the changes needed? The current implementation may grab the incorrect `numOutputRows` metric if there is an intermediary node (such as custom Spark operator) which does not support the metric. This is because we target the first sourceSide child node with `numOutputRows`. If a SparkExtension node does not contain this metric but transforms the source table, then we could progress all the way to the source table and grab the incorrect metric. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing CI, as this is a revert ### Was this patch authored or co-authored using generative AI tooling? No Closes #53293 from asl3/numsourcerowsrevert. Authored-by: Amanda Liu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 9969548 commit ee41857

File tree

4 files changed

+227
-346
lines changed

4 files changed

+227
-346
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@
2727
@Evolving
2828
public interface MergeSummary extends WriteSummary {
2929

30-
/**
31-
* Returns the number of source rows.
32-
*/
33-
long numSourceRows();
34-
3530
/**
3631
* Returns the number of target rows copied unmodified because they did not match any action,
3732
* or -1 if not found.

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package org.apache.spark.sql.connector.write
2121
* Implementation of [[MergeSummary]] that provides MERGE operation summary.
2222
*/
2323
private[sql] case class MergeSummaryImpl(
24-
numSourceRows: Long,
2524
numTargetRowsCopied: Long,
2625
numTargetRowsDeleted: Long,
2726
numTargetRowsUpdated: Long,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
3131
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege}
3232
import org.apache.spark.sql.connector.expressions.Transform
3333
import org.apache.spark.sql.connector.metric.CustomMetric
34-
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, WriteSummary}
34+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary}
3535
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3636
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode}
3737
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
38-
import org.apache.spark.sql.execution.joins.BaseJoinExec
3938
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
4039
import org.apache.spark.sql.types.StructType
4140
import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
@@ -493,9 +492,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
493492
private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
494493
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
495494
val metrics = n.metrics
496-
val numSourceRows = getNumSourceRows(n)
497495
MergeSummaryImpl(
498-
numSourceRows,
499496
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
500497
metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L),
501498
metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L),
@@ -507,40 +504,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
507504
)
508505
}
509506
}
510-
511-
private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
512-
def hasTargetTable(plan: SparkPlan): Boolean = {
513-
collectFirst(plan) {
514-
case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => scan
515-
}.isDefined
516-
}
517-
518-
def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = {
519-
val leftHasTarget = hasTargetTable(join.left)
520-
val rightHasTarget = hasTargetTable(join.right)
521-
522-
val sourceSide = if (leftHasTarget) {
523-
Some(join.right)
524-
} else if (rightHasTarget) {
525-
Some(join.left)
526-
} else {
527-
None
528-
}
529-
530-
sourceSide.flatMap { side =>
531-
collectFirst(side) {
532-
case source if source.metrics.contains("numOutputRows") =>
533-
source
534-
}
535-
}
536-
}
537-
538-
(for {
539-
join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }
540-
sourceScan <- findSourceScan(join)
541-
metric <- sourceScan.metrics.get("numOutputRows")
542-
} yield metric.value).getOrElse(-1L)
543-
}
544507
}
545508

546509
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {

0 commit comments

Comments
 (0)