Skip to content

Commit 8703b95

Browse files
authored
[spark] Support MergeInto on Paimon append-only table in spark V2 write (#7200)
1 parent c07394d commit 8703b95

File tree

9 files changed

+162
-110
lines changed

9 files changed

+162
-110
lines changed

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,27 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21-
import org.apache.paimon.spark.SparkTable
22-
import org.apache.paimon.spark.commands.MergeIntoPaimonTable
23-
2421
import org.apache.spark.sql.SparkSession
25-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
2623
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}
2724

2825
/** A post-hoc resolution rule for MergeInto. */
2926
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
3027

31-
override def resolveNotMatchedBySourceActions(
32-
merge: MergeIntoTable,
33-
targetOutput: Seq[AttributeReference],
34-
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
28+
/**
29+
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
30+
* Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions.
31+
*/
32+
override def alignMergeIntoTable(
33+
m: MergeIntoTable,
34+
targetOutput: Seq[Attribute]): MergeIntoTable = {
35+
m.copy(
36+
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
37+
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput))
38+
)
39+
}
40+
41+
override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
3542
Seq.empty
3643
}
3744
}

paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,28 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21-
import org.apache.paimon.spark.SparkTable
22-
import org.apache.paimon.spark.commands.MergeIntoPaimonTable
23-
2421
import org.apache.spark.sql.SparkSession
25-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
2623
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}
2724

2825
/** A post-hoc resolution rule for MergeInto. */
2926
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
3027

31-
override def resolveNotMatchedBySourceActions(
32-
merge: MergeIntoTable,
33-
targetOutput: Seq[AttributeReference],
34-
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
28+
/**
29+
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
30+
* Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions.
31+
*/
32+
override def alignMergeIntoTable(
33+
m: MergeIntoTable,
34+
targetOutput: Seq[Attribute]): MergeIntoTable = {
35+
m.copy(
36+
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
37+
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput))
38+
)
39+
}
40+
41+
override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
3542
Seq.empty
3643
}
44+
3745
}

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,43 @@ class MergeIntoAppendNonBucketedTableTest
6161
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
6262
}
6363
}
64+
65+
class V2MergeIntoPrimaryKeyBucketedTableTest
66+
extends MergeIntoTableTestBase
67+
with MergeIntoPrimaryKeyTableTest
68+
with MergeIntoNotMatchedBySourceTest
69+
with PaimonPrimaryKeyBucketedTableTest {
70+
override protected def sparkConf: SparkConf = {
71+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
72+
}
73+
}
74+
75+
class V2MergeIntoPrimaryKeyNonBucketTableTest
76+
extends MergeIntoTableTestBase
77+
with MergeIntoPrimaryKeyTableTest
78+
with MergeIntoNotMatchedBySourceTest
79+
with PaimonPrimaryKeyNonBucketTableTest {
80+
override protected def sparkConf: SparkConf = {
81+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
82+
}
83+
}
84+
85+
class V2MergeIntoAppendBucketedTableTest
86+
extends MergeIntoTableTestBase
87+
with MergeIntoAppendTableTest
88+
with MergeIntoNotMatchedBySourceTest
89+
with PaimonAppendBucketedTableTest {
90+
override protected def sparkConf: SparkConf = {
91+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
92+
}
93+
}
94+
95+
class V2MergeIntoAppendNonBucketedTableTest
96+
extends MergeIntoTableTestBase
97+
with MergeIntoAppendTableTest
98+
with MergeIntoNotMatchedBySourceTest
99+
with PaimonAppendNonBucketTableTest {
100+
override protected def sparkConf: SparkConf = {
101+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
102+
}
103+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,6 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
8989
}
9090
}
9191

92-
/**
93-
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
94-
* Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and
95-
* notMatchedBySourceActions.
96-
*/
97-
protected def alignMergeIntoTable(
98-
m: MergeIntoTable,
99-
targetOutput: Seq[Attribute]): MergeIntoTable = {
100-
m.copy(
101-
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
102-
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)),
103-
notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
104-
)
105-
}
106-
10792
private def recursiveAlignUpdates(
10893
targetAttrs: Seq[NamedExpression],
10994
updates: Seq[AttrUpdate],

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,28 @@
1919
package org.apache.paimon.spark.catalyst.analysis
2020

2121
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
2323
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}
2424

2525
/** A post-hoc resolution rule for MergeInto. */
2626
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {
2727

28-
override def resolveNotMatchedBySourceActions(
29-
merge: MergeIntoTable,
30-
targetOutput: Seq[AttributeReference],
31-
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
32-
merge.notMatchedBySourceActions.map(
33-
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
28+
/**
29+
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
30+
* Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and
31+
* notMatchedBySourceActions.
32+
*/
33+
override def alignMergeIntoTable(
34+
m: MergeIntoTable,
35+
targetOutput: Seq[Attribute]): MergeIntoTable = {
36+
m.copy(
37+
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
38+
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)),
39+
notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
40+
)
41+
}
42+
43+
override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
44+
merge.notMatchedBySourceActions
3445
}
3546
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
2323
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable}
2424

2525
import org.apache.spark.sql.SparkSession
26-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression}
26+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, SubqueryExpression}
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.rules.Rule
2929

@@ -62,64 +62,34 @@ trait PaimonMergeIntoBase
6262
updateActions,
6363
primaryKeys)
6464
}
65-
val alignedMatchedActions =
66-
merge.matchedActions.map(
67-
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
68-
val alignedNotMatchedActions =
69-
merge.notMatchedActions.map(
70-
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
71-
val alignedNotMatchedBySourceActions =
72-
resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled)
73-
74-
if (dataEvolutionEnabled) {
75-
MergeIntoPaimonDataEvolutionTable(
76-
v2Table,
77-
merge.targetTable,
78-
merge.sourceTable,
79-
merge.mergeCondition,
80-
alignedMatchedActions,
81-
alignedNotMatchedActions,
82-
alignedNotMatchedBySourceActions
83-
)
84-
} else {
85-
MergeIntoPaimonTable(
86-
v2Table,
87-
merge.targetTable,
88-
merge.sourceTable,
89-
merge.mergeCondition,
90-
alignedMatchedActions,
91-
alignedNotMatchedActions,
92-
alignedNotMatchedBySourceActions
93-
)
94-
}
95-
}
96-
}
97-
98-
def resolveNotMatchedBySourceActions(
99-
merge: MergeIntoTable,
100-
targetOutput: Seq[AttributeReference],
101-
dataEvolutionEnabled: Boolean): Seq[MergeAction]
10265

103-
protected def checkAndAlignActionAssignment(
104-
action: MergeAction,
105-
targetOutput: Seq[AttributeReference],
106-
dataEvolutionEnabled: Boolean): MergeAction = {
107-
action match {
108-
case d @ DeleteAction(_) => d
109-
case u @ UpdateAction(_, assignments) =>
110-
u.copy(assignments = alignAssignments(targetOutput, assignments))
66+
val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)
11167

112-
case i @ InsertAction(_, assignments) =>
113-
i.copy(assignments = alignAssignments(targetOutput, assignments, isInsert = true))
114-
115-
case _: UpdateStarAction =>
116-
throw new RuntimeException(s"UpdateStarAction should not be here.")
117-
118-
case _: InsertStarAction =>
119-
throw new RuntimeException(s"InsertStarAction should not be here.")
120-
121-
case _ =>
122-
throw new RuntimeException(s"Can't recognize this action: $action")
68+
if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
69+
alignedMergeIntoTable
70+
} else {
71+
if (dataEvolutionEnabled) {
72+
MergeIntoPaimonDataEvolutionTable(
73+
v2Table,
74+
merge.targetTable,
75+
merge.sourceTable,
76+
merge.mergeCondition,
77+
alignedMergeIntoTable.matchedActions,
78+
alignedMergeIntoTable.notMatchedActions,
79+
resolveNotMatchedBySourceActions(alignedMergeIntoTable)
80+
)
81+
} else {
82+
MergeIntoPaimonTable(
83+
v2Table,
84+
merge.targetTable,
85+
merge.sourceTable,
86+
merge.mergeCondition,
87+
alignedMergeIntoTable.matchedActions,
88+
alignedMergeIntoTable.notMatchedActions,
89+
resolveNotMatchedBySourceActions(alignedMergeIntoTable)
90+
)
91+
}
92+
}
12393
}
12494
}
12595

@@ -156,4 +126,8 @@ trait PaimonMergeIntoBase
156126
throw new RuntimeException("Can't update the primary key column in update clause.")
157127
}
158128
}
129+
130+
def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction]
131+
132+
def alignMergeIntoTable(m: MergeIntoTable, targetOutput: Seq[Attribute]): MergeIntoTable
159133
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
2424

2525
import org.apache.spark.sql.catalyst.SQLConfHelper
2626
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
27-
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
27+
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, MergeIntoTable, UpdateTable}
2828

2929
trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
3030

@@ -102,4 +102,13 @@ trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
102102
!updateTable.rewritable ||
103103
!updateTable.aligned
104104
}
105+
106+
/** Determines if DataSourceV2 merge into is not supported for the given table. */
107+
protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
108+
val relation = PaimonRelation.getPaimonRelation(m.targetTable)
109+
val table = relation.table.asInstanceOf[SparkTable]
110+
shouldFallbackToV1(table) ||
111+
!m.rewritable ||
112+
!m.aligned
113+
}
105114
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ import scala.collection.mutable
4242
/**
4343
* Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are intentionally set to empty
4444
* because file-level filtering is handled through Spark's runtime V2 filtering mechanism.
45+
*
46+
* When Spark's runtime filter is not applied (e.g., when NOT MATCHED BY SOURCE is present in
47+
* MergeInto), this scan will read all data from the table.
4548
*/
4649
case class PaimonCopyOnWriteScan(
4750
table: FileStoreTable,
@@ -51,10 +54,29 @@ case class PaimonCopyOnWriteScan(
5154
extends BaseScan
5255
with SupportsRuntimeV2Filtering {
5356

54-
override def inputSplits: Array[Split] = dataSplits.asInstanceOf[Array[Split]]
57+
// Track whether filter() has been called
58+
@volatile private var filterApplied: Boolean = false
59+
60+
private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
61+
62+
override def inputSplits: Array[Split] = {
63+
loadSplits()
64+
dataSplits.asInstanceOf[Array[Split]]
65+
}
5566

5667
var dataSplits: Array[DataSplit] = Array()
5768

69+
private def loadSplits(): Unit = {
70+
val snapshotReader = table.newSnapshotReader()
71+
if (table.coreOptions().manifestDeleteFileDropStats()) {
72+
snapshotReader.dropStats()
73+
}
74+
if (filterApplied) {
75+
snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName))
76+
}
77+
dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray
78+
}
79+
5880
def scannedFiles: Seq[SparkDataFileMeta] = {
5981
dataSplits
6082
.flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, dataSplit.totalBuckets()))
@@ -66,9 +88,9 @@ case class PaimonCopyOnWriteScan(
6688
}
6789

6890
override def filter(predicates: Array[SparkPredicate]): Unit = {
69-
val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
70-
val runtimefilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1)
71-
for (filter <- runtimefilters) {
91+
filterApplied = true
92+
val runtimeFilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1)
93+
for (filter <- runtimeFilters) {
7294
filter match {
7395
case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
7496
for (value <- in.values) {
@@ -78,14 +100,6 @@ case class PaimonCopyOnWriteScan(
78100
case _ => logWarning("Unsupported runtime filter")
79101
}
80102
}
81-
82-
val snapshotReader = table.newSnapshotReader()
83-
if (table.coreOptions().manifestDeleteFileDropStats()) {
84-
snapshotReader.dropStats()
85-
}
86-
87-
snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName))
88-
dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray
89103
}
90104

91105
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
528528
createTable("target", "a INT, b INT, c STRING", Seq("a"))
529529
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
530530

531-
val error = intercept[RuntimeException] {
531+
val error = intercept[Exception] {
532532
spark.sql(s"""
533533
|MERGE INTO target
534534
|USING source
@@ -539,7 +539,11 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
539539
|THEN INSERT (a, b, c) values (a, b, c)
540540
|""".stripMargin)
541541
}.getMessage
542-
assert(error.contains("match more than one source rows"))
542+
// V1 path: "match more than one source rows"
543+
// V2 path: "MERGE_CARDINALITY_VIOLATION"
544+
assert(
545+
error.contains("match more than one source rows") ||
546+
error.contains("MERGE_CARDINALITY_VIOLATION"))
543547
}
544548
}
545549

0 commit comments

Comments
 (0)