diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 52e704172fc8..909162d0c7bf 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -150,7 +150,7 @@ case class MergeIntoPaimonTable( case _ => false } } - if (hasUpdate(matchedActions)) { + if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) { touchedFilePathsSet ++= findTouchedFiles( targetDS.join(sourceDS, toColumn(mergeCondition), "inner"), sparkSession) @@ -172,10 +172,15 @@ case class MergeIntoPaimonTable( // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been // modified and was from touched file, it should be kept too. - val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation) + val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation) .withColumn(FILE_TOUCHED_COL, lit(true)) - .union(createDataset(sparkSession, unTouchedFileRelation) - .withColumn(FILE_TOUCHED_COL, lit(false))) + val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) { + touchedDsWithFileTouchedCol.union( + createDataset(sparkSession, unTouchedFileRelation) + .withColumn(FILE_TOUCHED_COL, lit(false))) + } else { + touchedDsWithFileTouchedCol + } val toWriteDS = constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)