Skip to content

Commit 83410b8

Browse files
authored
[spark] Eliminate the union stage when merging into without notMatchedBySource (#5137)
1 parent a25a106 commit 83410b8

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ case class MergeIntoPaimonTable(
150150
case _ => false
151151
}
152152
}
153-
if (hasUpdate(matchedActions)) {
153+
if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
154154
touchedFilePathsSet ++= findTouchedFiles(
155155
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), "inner"),
156156
sparkSession,
@@ -175,10 +175,15 @@ case class MergeIntoPaimonTable(
175175

176176
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been
177177
// modified and was from touched file, it should be kept too.
178-
val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
178+
val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
179179
.withColumn(FILE_TOUCHED_COL, lit(true))
180-
.union(createDataset(sparkSession, unTouchedFileRelation)
181-
.withColumn(FILE_TOUCHED_COL, lit(false)))
180+
val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) {
181+
touchedDsWithFileTouchedCol.union(
182+
createDataset(sparkSession, unTouchedFileRelation)
183+
.withColumn(FILE_TOUCHED_COL, lit(false)))
184+
} else {
185+
touchedDsWithFileTouchedCol
186+
}
182187

183188
val toWriteDS =
184189
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

0 commit comments

Comments
 (0)