Skip to content

Commit 8618388

Browse files
authored
[PySpark] Add schema evolution config to PySpark DeltaMergeBuilder (delta-io#2778)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [X] PySpark ## Description This PR continues from delta-io#2737 to add a `withSchemaEvolution()` method for `DeltaMergeBuilder` in PySpark. ## How was this patch tested? New unit tests. ## Does this PR introduce _any_ user-facing changes? Yes, this PR allows the user to turn on schema evolution for MERGE in PySpark by calling the `table.merge(...).withSchemaEvolution()` method.
1 parent 9f040d4 commit 8618388

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

python/delta/tables.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,19 @@ def whenNotMatchedBySourceDelete(
10131013
new_jbuilder = self.__getNotMatchedBySourceBuilder(condition).delete()
10141014
return DeltaMergeBuilder(self._spark, new_jbuilder)
10151015

1016+
@since(3.2) # type: ignore[arg-type]
1017+
def withSchemaEvolution(self) -> "DeltaMergeBuilder":
1018+
"""
1019+
Enable schema evolution for the merge operation. This allows the target table schema to
1020+
be automatically updated based on the schema of the source DataFrame.
1021+
1022+
See :py:class:`~delta.tables.DeltaMergeBuilder` for complete usage details.
1023+
1024+
:return: this builder
1025+
"""
1026+
new_jbuilder = self._jbuilder.withSchemaEvolution()
1027+
return DeltaMergeBuilder(self._spark, new_jbuilder)
1028+
10161029
@since(0.4) # type: ignore[arg-type]
10171030
def execute(self) -> None:
10181031
"""

python/delta/tests/test_deltatable.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,20 @@ def reset_table() -> None:
335335
.execute()
336336
self.__checkAnswer(dt.toDF(), ([('a', -1), ('b', 2), ('c', 3), ('d', 4), ('e', -5)]))
337337

338+
# Schema evolution
339+
reset_table()
340+
dt.alias("t") \
341+
.merge(source.toDF("key", "extra").alias("s"), expr("t.key = s.key")) \
342+
.whenMatchedUpdate(set={"extra": "-1"}) \
343+
.whenNotMatchedInsertAll() \
344+
.withSchemaEvolution() \
345+
.execute()
346+
self.__checkAnswer(
347+
DeltaTable.forPath(self.spark, self.tempFile).toDF(), # reload the table
348+
([('a', 1, -1), ('b', 2, -1), ('c', 3, None), ('d', 4, None), ('e', None, -5),
349+
('f', None, -6)]),
350+
["key", "value", "extra"])
351+
338352
# ============== Test bad args ==============
339353
# ---- bad args in merge()
340354
with self.assertRaisesRegex(TypeError, "must be DataFrame"):

0 commit comments

Comments
 (0)