Skip to content

Commit 971e832

Browse files
huaxingaoHyukjinKwon
authored andcommitted
[SPARK-28411][PYTHON][SQL] InsertInto with overwrite is not honored
## What changes were proposed in this pull request? In the following python code ``` df.write.mode("overwrite").insertInto("table") ``` ```insertInto``` ignores ```mode("overwrite")``` and appends by default. ## How was this patch tested? Add Unit test. Closes apache#25175 from huaxingao/spark-28411. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 70073b1 commit 971e832

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,15 +745,17 @@ def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
745745
self._jwrite.save(path)
746746

747747
@since(1.4)
748-
def insertInto(self, tableName, overwrite=False):
748+
def insertInto(self, tableName, overwrite=None):
749749
"""Inserts the content of the :class:`DataFrame` to the specified table.
750750
751751
It requires that the schema of the class:`DataFrame` is the same as the
752752
schema of the table.
753753
754754
Optionally overwriting any existing data.
755755
"""
756-
self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)
756+
if overwrite is not None:
757+
self.mode("overwrite" if overwrite else "append")
758+
self._jwrite.insertInto(tableName)
757759

758760
@since(1.4)
759761
def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options):

python/pyspark/sql/tests/test_readwriter.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,27 @@ def count_bucketed_cols(names, table="pyspark_bucket"):
141141
.mode("overwrite").saveAsTable("pyspark_bucket"))
142142
self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
143143

144+
def test_insert_into(self):
145+
df = self.spark.createDataFrame([("a", 1), ("b", 2)], ["C1", "C2"])
146+
with self.table("test_table"):
147+
df.write.saveAsTable("test_table")
148+
self.assertEqual(2, self.spark.sql("select * from test_table").count())
149+
150+
df.write.insertInto("test_table")
151+
self.assertEqual(4, self.spark.sql("select * from test_table").count())
152+
153+
df.write.mode("overwrite").insertInto("test_table")
154+
self.assertEqual(2, self.spark.sql("select * from test_table").count())
155+
156+
df.write.insertInto("test_table", True)
157+
self.assertEqual(2, self.spark.sql("select * from test_table").count())
158+
159+
df.write.insertInto("test_table", False)
160+
self.assertEqual(4, self.spark.sql("select * from test_table").count())
161+
162+
df.write.mode("overwrite").insertInto("test_table", False)
163+
self.assertEqual(6, self.spark.sql("select * from test_table").count())
164+
144165

145166
if __name__ == "__main__":
146167
import unittest

0 commit comments

Comments
 (0)