Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/content/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ INSERT INTO my_table VALUES (1, 'p1'), (2, 'p2');

-- Static overwrite (Overwrite the whole table)
INSERT OVERWRITE my_table VALUES (3, 'p1');
-- or
INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1');

SELECT * FROM my_table;
/*
Expand All @@ -95,6 +97,19 @@ SELECT * FROM my_table;
+---+---+
*/

-- Static overwrite with specified partitions (Only overwrite pt='p1')
INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3);

SELECT * FROM my_table;
/*
+---+---+
| id| pt|
+---+---+
| 2| p2|
| 3| p1|
+---+---+
*/

-- Dynamic overwrite (Only overwrite pt='p1')
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE my_table VALUES (3, 'p1');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,4 +575,37 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase {
}
}
}

test("Paimon Insert: dynamic insert overwrite partition") {
withTable("my_table") {
sql("CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt)")

for (mode <- Seq("static", "dynamic")) {
withSparkSQLConf("spark.sql.sources.partitionOverwriteMode" -> mode) {
sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
// INSERT OVERWRITE table
sql("INSERT OVERWRITE my_table VALUES (3, 'p1')")
if (mode == "dynamic") {
checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, "p2"), Row(3, "p1")))
} else {
checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, "p1"))
}

sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
// INSERT OVERWRITE table PARTITION (pt)
sql("INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1')")
if (mode == "dynamic") {
checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, "p2"), Row(3, "p1")))
} else {
checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, "p1"))
}

sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
// INSERT OVERWRITE table PARTITION (pt='p1')
sql("INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3)")
checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, "p2"), Row(3, "p1")))
}
}
}
}
}
Loading