Skip to content

Commit d96867c

Browse files
committed
[spark] Eliminate the union stage when merging into without notMatchedBySource
1 parent 842f5b5 commit d96867c

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,15 @@ case class MergeIntoPaimonTable(
172172

173173
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been
174174
// modified and was from touched file, it should be kept too.
175-
val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
175+
val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
176176
.withColumn(FILE_TOUCHED_COL, lit(true))
177-
.union(createDataset(sparkSession, unTouchedFileRelation)
178-
.withColumn(FILE_TOUCHED_COL, lit(false)))
177+
val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) {
178+
touchedDsWithFileTouchedCol.union(
179+
createDataset(sparkSession, unTouchedFileRelation)
180+
.withColumn(FILE_TOUCHED_COL, lit(false)))
181+
} else {
182+
touchedDsWithFileTouchedCol
183+
}
179184

180185
val toWriteDS =
181186
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

0 commit comments

Comments
 (0)