Skip to content

Commit 5594fb3

Browse files
committed
[spark] Eliminate the union stage when merging into without notMatchedActions
1 parent 83410b8 commit 5594fb3

File tree

1 file changed

+23
-18
lines changed

1 file changed

+23
-18
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,38 @@ case class MergeIntoPaimonTable(
144144
}
145145
} else {
146146
val touchedFilePathsSet = mutable.Set.empty[String]
147+
val intersectionFilePaths = mutable.Set.empty[String]
148+
147149
def hasUpdate(actions: Seq[MergeAction]): Boolean = {
148150
actions.exists {
149151
case _: UpdateAction | _: DeleteAction => true
150152
case _ => false
151153
}
152154
}
153-
if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
154-
touchedFilePathsSet ++= findTouchedFiles(
155-
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), "inner"),
155+
156+
def findTouchedFiles0(joinType: String): Array[String] = {
157+
findTouchedFiles(
158+
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), joinType),
156159
sparkSession,
157-
"_left." + FILE_PATH_COLUMN
158-
)
160+
"_left." + FILE_PATH_COLUMN)
159161
}
162+
163+
if (hasUpdate(matchedActions)) {
164+
touchedFilePathsSet ++= findTouchedFiles0("inner")
165+
} else if (notMatchedActions.nonEmpty) {
166+
intersectionFilePaths ++= findTouchedFiles0("inner")
167+
}
168+
160169
if (hasUpdate(notMatchedBySourceActions)) {
161-
touchedFilePathsSet ++= findTouchedFiles(
162-
targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), "left_anti"),
163-
sparkSession,
164-
"_left." + FILE_PATH_COLUMN)
170+
touchedFilePathsSet ++= findTouchedFiles0("left_anti")
165171
}
166172

167-
val targetFilePaths: Array[String] = findTouchedFiles(targetDS, sparkSession)
168173
val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
169-
val unTouchedFilePaths = targetFilePaths.filterNot(touchedFilePaths.contains)
174+
val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
175+
intersectionFilePaths.diff(touchedFilePathsSet).toArray
176+
} else {
177+
Array[String]()
178+
}
170179

171180
val (touchedFiles, touchedFileRelation) =
172181
createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
@@ -177,13 +186,9 @@ case class MergeIntoPaimonTable(
177186
// modified and was from touched file, it should be kept too.
178187
val touchedDsWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
179188
.withColumn(FILE_TOUCHED_COL, lit(true))
180-
val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty) {
181-
touchedDsWithFileTouchedCol.union(
182-
createDataset(sparkSession, unTouchedFileRelation)
183-
.withColumn(FILE_TOUCHED_COL, lit(false)))
184-
} else {
185-
touchedDsWithFileTouchedCol
186-
}
189+
val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
190+
createDataset(sparkSession, unTouchedFileRelation)
191+
.withColumn(FILE_TOUCHED_COL, lit(false)))
187192

188193
val toWriteDS =
189194
constructChangedRows(sparkSession, targetDSWithFileTouchedCol).drop(ROW_KIND_COL)

0 commit comments

Comments
 (0)