Skip to content

Commit ff9d819

Browse files
[Spark] Refactor deduplication in DeltaMergeBuilder to utilize DeduplicateRelations Rule. (delta-io#2771)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Currently, `DeltaMergeBuilder` performs a custom relation deduplication logic reminiscent to the DeduplicateRelations analyzer rule. This duplicates logic and might cause issues from unforeseen interaction with the rule during the analysis of the MERGE plan. This PR refactors the deduplication in DeltaMergeBuilder to utilise the `DeduplicateRelations` rule. The new flow is as follows: 1. Un-resolve the ambiguous pre-resolved references. 2. Invoke DeduplicateRelations to do the deduplication work with FakeLogicalPlan. 3. Use the deduplicated source and target plan in the final MERGE command. ## How was this patch tested? Existing tests. `MergeIntoScalaSuite` covers self merges with the Scala API. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No.
1 parent a172276 commit ff9d819

File tree

1 file changed

+25
-30
lines changed

1 file changed

+25
-30
lines changed

spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,9 @@ class DeltaMergeBuilder private(
337337

338338
private def mergePlan: DeltaMergeInto = {
339339
var targetPlan = targetTable.toDF.queryExecution.analyzed
340-
val sourcePlan = source.queryExecution.analyzed
340+
var sourcePlan = source.queryExecution.analyzed
341+
var condition = onCondition.expr
342+
var clauses = whenClauses
341343

342344
// If source and target have duplicate, pre-resolved references (can happen with self-merge),
343345
// then rewrite the references in target with new exprId to avoid ambiguity.
@@ -346,17 +348,25 @@ class DeltaMergeBuilder private(
346348
// optional SubqueryAlias.
347349
val duplicateResolvedRefs = targetPlan.outputSet.intersect(sourcePlan.outputSet)
348350
if (duplicateResolvedRefs.nonEmpty) {
349-
val refReplacementMap = duplicateResolvedRefs.toSeq.flatMap {
350-
case a: AttributeReference =>
351-
Some(a.exprId -> a.withExprId(NamedExpression.newExprId))
352-
case _ => None
353-
}.toMap
354-
targetPlan = targetPlan.transformAllExpressions {
355-
case a: AttributeReference if refReplacementMap.contains(a.exprId) =>
356-
refReplacementMap(a.exprId)
357-
}
358-
logInfo("Rewritten duplicate refs between target and source plans: "
359-
+ refReplacementMap.toSeq.mkString(", "))
351+
val exprs = (condition +: clauses).map(_.transform {
352+
// If any expression contain duplicate, pre-resolved references, we can't simply
353+
// replace the references in the same way as the target because we don't know
354+
// whether the user intended to refer to the source or the target columns. Instead,
355+
// we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity
356+
// and throw the usual error messages when needed.
357+
case a: AttributeReference if duplicateResolvedRefs.contains(a) =>
358+
UnresolvedAttribute(a.qualifier :+ a.name)
359+
})
360+
// Deduplicate the attribute IDs in the target and source plans, and all the MERGE
361+
// expressions (condition and MERGE clauses), so that we can avoid duplicated attribute ID
362+
// when building the MERGE command later.
363+
val fakePlan = AnalysisHelper.FakeLogicalPlan(exprs, Seq(sourcePlan, targetPlan))
364+
val newPlan = org.apache.spark.sql.catalyst.analysis.DeduplicateRelations(fakePlan)
365+
.asInstanceOf[AnalysisHelper.FakeLogicalPlan]
366+
sourcePlan = newPlan.children(0)
367+
targetPlan = newPlan.children(1)
368+
condition = newPlan.exprs.head
369+
clauses = newPlan.exprs.takeRight(clauses.size).asInstanceOf[Seq[DeltaMergeIntoClause]]
360370
}
361371

362372
// Note: The Scala API cannot generate MergeIntoTable just like the SQL parser because
@@ -367,24 +377,9 @@ class DeltaMergeBuilder private(
367377
// (possible in Scala API, but syntactically not possible in SQL). This issue is tracked
368378
// by https://issues.apache.org/jira/browse/SPARK-34962.
369379
val merge = DeltaMergeInto(
370-
targetPlan,
371-
sourcePlan,
372-
onCondition.expr,
373-
whenClauses,
374-
withSchemaEvolution = schemaEvolutionEnabled)
375-
val finalMerge = if (duplicateResolvedRefs.nonEmpty) {
376-
// If any expression contain duplicate, pre-resolved references, we can't simply
377-
// replace the references in the same way as the target because we don't know
378-
// whether the user intended to refer to the source or the target columns. Instead,
379-
// we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity
380-
// and throw the usual error messages when needed.
381-
merge.transformExpressions {
382-
case a: AttributeReference if duplicateResolvedRefs.contains(a) =>
383-
UnresolvedAttribute(a.qualifier :+ a.name)
384-
}
385-
} else merge
386-
logDebug("Generated merged plan:\n" + finalMerge)
387-
finalMerge
380+
targetPlan, sourcePlan, condition, clauses, withSchemaEvolution = schemaEvolutionEnabled)
381+
logDebug("Generated merged plan:\n" + merge)
382+
merge
388383
}
389384
}
390385

0 commit comments

Comments
 (0)