Skip to content

Conversation

@Aitozi
Copy link
Contributor

@Aitozi Aitozi commented Feb 22, 2025

…edBySource

Purpose

Linked issue: close #5136

Tests

API and Format

Documentation

@Aitozi Aitozi force-pushed the merge-into-eliminate-union branch from d96867c to a4fdf0b Compare February 22, 2025 11:13
@Aitozi
Copy link
Contributor Author

Aitozi commented Feb 22, 2025

before:

== Parsed Logical Plan ==
Project [a#210, b#211, c#212]
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), IntegerType, false) AS a#210, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), IntegerType, false) AS b#211, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, c), StringType, false), true, false, true) AS c#212, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, _row_kind_), ByteType, false) AS _row_kind_#213]
   +- MapPartitions org.apache.paimon.spark.commands.MergeIntoPaimonTable$$Lambda$4050/0x0000000801837840@6d86d1ad, obj#209: org.apache.spark.sql.Row
      +- DeserializeToObject createexternalrow(a#10, b#11, c#12.toString, _source_row_#149, a#62, b#63, c#64.toString, __paimon_file_path#80.toString, __paimon_row_index#81L, if (isnull(__paimon_partition#82)) null else createexternalrow(), __paimon_bucket#83, _file_touched_col_#95, _target_row_#136, StructField(a,IntegerType,true), StructField(b,IntegerType,true), StructField(c,StringType,true), StructField(_source_row_,BooleanType,true), StructField(a,IntegerType,true), StructField(b,IntegerType,true), StructField(c,StringType,true), StructField(__paimon_file_path,StringType,true), StructField(__paimon_row_index,LongType,true), StructField(__paimon_partition,StructType(),true), StructField(__paimon_bucket,IntegerType,true), ... 2 more fields), obj#208: org.apache.spark.sql.Row
         +- Join FullOuter, (a#62 = a#10)
            :- Project [a#10, b#11, c#12, true AS _source_row_#149]
            :  +- SubqueryAlias source
            :     +- View (`source`, [a#10,b#11,c#12])
            :        +- Project [_1#3 AS a#10, _2#4 AS b#11, _3#5 AS c#12]
            :           +- LocalRelation [_1#3, _2#4, _3#5]
            +- Project [a#62, b#63, c#64, __paimon_file_path#80, __paimon_row_index#81L, __paimon_partition#82, __paimon_bucket#83, _file_touched_col_#95, true AS _target_row_#136]
               +- Union false, false
                  :- Project [a#62, b#63, c#64, __paimon_file_path#80, __paimon_row_index#81L, __paimon_partition#82, __paimon_bucket#83, true AS _file_touched_col_#95]
                  :  +- RelationV2[a#62, b#63, c#64, __paimon_file_path#80, __paimon_row_index#81L, __paimon_partition#82, __paimon_bucket#83] target
                  +- Project [a#62 AS a#120, b#63 AS b#121, c#64 AS c#122, __paimon_file_path#84 AS __paimon_file_path#123, __paimon_row_index#85L AS __paimon_row_index#124L, __paimon_partition#86 AS __paimon_partition#125, __paimon_bucket#87 AS __paimon_bucket#126, _file_touched_col_#111 AS _file_touched_col_#127]
                     +- Project [a#62, b#63, c#64, __paimon_file_path#84, __paimon_row_index#85L, __paimon_partition#86, __paimon_bucket#87, false AS _file_touched_col_#111]
                        +- RelationV2[a#62, b#63, c#64, __paimon_file_path#84, __paimon_row_index#85L, __paimon_partition#86, __paimon_bucket#87] target

after:

Paimon MergeInto: only insert
== Parsed Logical Plan ==
Project [a#197, b#198, c#199]
+- SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), IntegerType, false) AS a#197, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), IntegerType, false) AS b#198, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, c), StringType, false), true, false, true) AS c#199, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, _row_kind_), ByteType, false) AS _row_kind_#200]
   +- MapPartitions org.apache.paimon.spark.commands.MergeIntoPaimonTable$$Lambda$4123/0x000000080184b840@4420f4d4, obj#196: org.apache.spark.sql.Row
      +- DeserializeToObject createexternalrow(a#10, b#11, c#12.toString, _source_row_#136, a#62, b#63, c#64.toString, __paimon_file_path#99.toString, __paimon_row_index#100L, if (isnull(__paimon_partition#101)) null else createexternalrow(), __paimon_bucket#102, _file_touched_col_#114, _target_row_#123, StructField(a,IntegerType,true), StructField(b,IntegerType,true), StructField(c,StringType,true), StructField(_source_row_,BooleanType,true), StructField(a,IntegerType,true), StructField(b,IntegerType,true), StructField(c,StringType,true), StructField(__paimon_file_path,StringType,true), StructField(__paimon_row_index,LongType,true), StructField(__paimon_partition,StructType(),true), StructField(__paimon_bucket,IntegerType,true), ... 2 more fields), obj#195: org.apache.spark.sql.Row
         +- Join FullOuter, (a#62 = a#10)
            :- Project [a#10, b#11, c#12, true AS _source_row_#136]
            :  +- SubqueryAlias source
            :     +- View (`source`, [a#10,b#11,c#12])
            :        +- Project [_1#3 AS a#10, _2#4 AS b#11, _3#5 AS c#12]
            :           +- LocalRelation [_1#3, _2#4, _3#5]
            +- Project [a#62, b#63, c#64, __paimon_file_path#99, __paimon_row_index#100L, __paimon_partition#101, __paimon_bucket#102, _file_touched_col_#114, true AS _target_row_#123]
               +- Project [a#62, b#63, c#64, __paimon_file_path#99, __paimon_row_index#100L, __paimon_partition#101, __paimon_bucket#102, true AS _file_touched_col_#114]
                  +- RelationV2[a#62, b#63, c#64, __paimon_file_path#99, __paimon_row_index#100L, __paimon_partition#101, __paimon_bucket#102] target

@YannByron YannByron self-assigned this Feb 27, 2025
}
}
if (hasUpdate(matchedActions)) {
if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this line in detail?

Copy link
Contributor Author

@Aitozi Aitozi Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when there is matched or not matched action by target, we should evaluate on the file splits after inner join. Otherwise, all the source rows are not matched by target.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test of this could verify this.

  test(s"Paimon MergeInto: only insert") {
    withTable("source", "target") {

      Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")

      createTable("target", "a INT, b INT, c STRING", Seq("a"))
      spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

      spark.sql(s"""
                   |MERGE INTO target
                   |USING source
                   |ON target.a = source.a
                   |WHEN NOT MATCHED
                   |THEN INSERT (a, b, c) values (a, b, c)
                   |""".stripMargin)

      checkAnswer(
        spark.sql("SELECT * FROM target ORDER BY a, b"),
        Row(1, 10, "c1") :: Row(2, 20, "c2") :: Row(3, 300, "c33") :: Nil)
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @YannByron , I have another thought for this question, and opened another PR

I think we should not use this condition to move this untouched files into touched file set, which will increase the rewrite input/output size.

Instead, if not matched action is non empty, these files should be input as untouched files, and we only have to consume the untouched files only when the not matched by target action is present

@YannByron YannByron merged commit 83410b8 into apache:master Mar 2, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Improve the spark merge into performance for append table

2 participants