diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java index 96edb8d75abb..53ecb5b0341a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java @@ -735,6 +735,7 @@ public void testUpdateNonBlobColumnOnRawBlobTableWithSplitFiles() throws Excepti put(DATA_EVOLUTION_ENABLED.key(), "true"); put("blob-field", "picture"); put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b"); + put("sink.parallelism", "1"); } })); insertInto( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 6920b4401553..96f8c0c5cc9f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -86,9 +86,8 @@ case class MergeIntoPaimonDataEvolutionTable( action match { case updateAction: UpdateAction => for (assignment <- updateAction.assignments) { - if (!assignment.key.equals(assignment.value)) { - val key = assignment.key.asInstanceOf[AttributeReference] - columns ++= Seq(key) + if (isModifiedAssignment(assignment)) { + columns += assignmentKeyAttribute(assignment) } } } @@ -281,9 +280,7 @@ case class MergeIntoPaimonDataEvolutionTable( UpdateAction.apply( update.condition, update.assignments.filter( - a => - updateColumnsSorted.contains( - a.key.asInstanceOf[AttributeReference])) ++ assignments)) + a => updateColumnsSorted.contains(assignmentKeyAttribute(a))) ++ assignments)) for (action <- realUpdateActions) { allFields ++= action.references.flatMap(r => extractFields(r)).seq @@ -618,6 +615,27 @@ object MergeIntoPaimonDataEvolutionTable { final private val ROW_ID_NAME = "_ROW_ID" final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; + private[commands] def isModifiedAssignment(assignment: Assignment): Boolean = { + !sameAttributeReference(assignment.key, assignment.value) + } + + private[commands] def assignmentKeyAttribute(assignment: Assignment): AttributeReference = { + assignment.key match { + case key: AttributeReference => key + case other => + throw new UnsupportedOperationException( + s"Unsupported update assignment key: $other. Only top-level attributes are supported.") + } + } + + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { + (left, right) match { + case (leftAttr: AttributeReference, rightAttr: AttributeReference) => + leftAttr.sameRef(rightAttr) + case _ => false + } + } + private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = { if (indexed.isEmpty) { throw new IllegalArgumentException("The input sorted sequence is empty.") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala new file mode 100644 index 000000000000..eb6373cfce22 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GetStructField, Literal} +import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType} + +class MergeIntoPaimonDataEvolutionTableTest extends SparkFunSuite { + + test("update column detection ignores target self-assignment with different qualifiers") { + val targetPicture = AttributeReference("picture", BinaryType)() + val qualifiedTargetPicture = targetPicture.withQualifier(Seq("t")) + + assert(!targetPicture.equals(qualifiedTargetPicture)) + assert(targetPicture.sameRef(qualifiedTargetPicture)) + assert( + !MergeIntoPaimonDataEvolutionTable.isModifiedAssignment( + Assignment(targetPicture, qualifiedTargetPicture))) + } + + test("update column detection includes source assignment with same column name") { + val targetFileType = AttributeReference("file_type", StringType)() + val sourceFileType = AttributeReference("file_type", StringType)().withQualifier(Seq("s")) + + assert(!targetFileType.sameRef(sourceFileType)) + assert( + MergeIntoPaimonDataEvolutionTable.isModifiedAssignment( + Assignment(targetFileType, sourceFileType))) + } + + test("update column detection rejects non top-level assignment key") { + val targetStruct = + AttributeReference("metadata", StructType(Seq(StructField("name", StringType))))() + val nestedKey = GetStructField(targetStruct, 0, Some("name")) + + intercept[UnsupportedOperationException] { + MergeIntoPaimonDataEvolutionTable.assignmentKeyAttribute( + Assignment(nestedKey, Literal("new-name"))) + } + } +}