Skip to content

Commit a4fdf0b

Browse files
committed
[spark] Eliminate the union stage when merging into without notMatchedBySource
1 parent 842f5b5 commit a4fdf0b

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ case class MergeIntoPaimonTable(
150150
case _ => false
151151
}
152152
}
153-
if (hasUpdate(matchedActions)) {
153+
if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
154154
touchedFilePathsSet ++= findTouchedFiles(
155155
targetDS.join(sourceDS, toColumn(mergeCondition), "inner"),
156156
sparkSession)
@@ -172,10 +172,15 @@ case class MergeIntoPaimonTable(
172172

173173
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been
174174
// modified and was from touched file, it should be kept too.
175-
val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
175+
val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
176176
.withColumn(FILE_TOUCHED_COL, lit(true))
177-
.union(createDataset(sparkSession, unTouchedFileRelation)
178-
.withColumn(FILE_TOUCHED_COL, lit(false)))
177+
val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) {
178+
touchedDsWithFileTouchedCol.union(
179+
createDataset(sparkSession, unTouchedFileRelation)
180+
.withColumn(FILE_TOUCHED_COL, lit(false)))
181+
} else {
182+
touchedDsWithFileTouchedCol
183+
}
179184

180185
val toWriteDS =
181186
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

0 commit comments

Comments
 (0)