Skip to content

Commit c089ca6

Browse files
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade-merge
2 parents 2978f8b + 8618388 commit c089ca6

File tree

9 files changed

+106
-59
lines changed

9 files changed

+106
-59
lines changed

python/delta/tables.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,19 @@ def whenNotMatchedBySourceDelete(
10131013
new_jbuilder = self.__getNotMatchedBySourceBuilder(condition).delete()
10141014
return DeltaMergeBuilder(self._spark, new_jbuilder)
10151015

1016+
@since(3.2) # type: ignore[arg-type]
1017+
def withSchemaEvolution(self) -> "DeltaMergeBuilder":
1018+
"""
1019+
Enable schema evolution for the merge operation. This allows the target table schema to
1020+
be automatically updated based on the schema of the source DataFrame.
1021+
1022+
See :py:class:`~delta.tables.DeltaMergeBuilder` for complete usage details.
1023+
1024+
:return: this builder
1025+
"""
1026+
new_jbuilder = self._jbuilder.withSchemaEvolution()
1027+
return DeltaMergeBuilder(self._spark, new_jbuilder)
1028+
10161029
@since(0.4) # type: ignore[arg-type]
10171030
def execute(self) -> None:
10181031
"""

python/delta/tests/test_deltatable.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,20 @@ def reset_table() -> None:
335335
.execute()
336336
self.__checkAnswer(dt.toDF(), ([('a', -1), ('b', 2), ('c', 3), ('d', 4), ('e', -5)]))
337337

338+
# Schema evolution
339+
reset_table()
340+
dt.alias("t") \
341+
.merge(source.toDF("key", "extra").alias("s"), expr("t.key = s.key")) \
342+
.whenMatchedUpdate(set={"extra": "-1"}) \
343+
.whenNotMatchedInsertAll() \
344+
.withSchemaEvolution() \
345+
.execute()
346+
self.__checkAnswer(
347+
DeltaTable.forPath(self.spark, self.tempFile).toDF(), # reload the table
348+
([('a', 1, -1), ('b', 2, -1), ('c', 3, None), ('d', 4, None), ('e', None, -5),
349+
('f', None, -6)]),
350+
["key", "value", "extra"])
351+
338352
# ============== Test bad args ==============
339353
# ---- bad args in merge()
340354
with self.assertRaisesRegex(TypeError, "must be DataFrame"):

spark/src/main/scala/org/apache/spark/sql/delta/constraints/DeltaInvariantCheckerExec.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.InternalRow
2828
import org.apache.spark.sql.catalyst.analysis._
2929
import org.apache.spark.sql.catalyst.expressions._
30+
import org.apache.spark.sql.catalyst.optimizer
3031
import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
3132
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
3233
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
3334
import org.apache.spark.sql.catalyst.rules.RuleExecutor
35+
import org.apache.spark.sql.delta.util.AnalysisHelper
3436
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy, UnaryExecNode}
3537
import org.apache.spark.sql.types.StructType
3638

@@ -71,9 +73,17 @@ case class DeltaInvariantCheckerExec(
7173
if (constraints.isEmpty) return child.execute()
7274
val invariantChecks =
7375
DeltaInvariantCheckerExec.buildInvariantChecks(child.output, constraints, session)
74-
val boundRefs = invariantChecks.map(_.withBoundReferences(child.output))
76+
77+
// Resolve current_date()/current_time() expressions.
78+
// We resolve currentTime for all invariants together to make sure we use the same timestamp.
79+
val invariantsFakePlan = AnalysisHelper.FakeLogicalPlan(invariantChecks, Nil)
80+
val newInvariantsPlan = optimizer.ComputeCurrentTime(invariantsFakePlan)
81+
val localOutput = child.output
7582

7683
child.execute().mapPartitionsInternal { rows =>
84+
val boundRefs = newInvariantsPlan.expressions
85+
.asInstanceOf[Seq[CheckDeltaInvariant]]
86+
.map(_.withBoundReferences(localOutput))
7787
val assertions = UnsafeProjection.create(boundRefs)
7888
rows.map { row =>
7989
assertions(row)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaDataFrameWriterV2Suite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -676,9 +676,10 @@ class DeltaDataFrameWriterV2Suite
676676
spark.sql(s"CREATE TABLE $table(id bigint, p int) USING delta PARTITIONED BY (p)")
677677
def verifyNotImplicitCasting(f: => Unit): Unit = {
678678
val e = intercept[DeltaAnalysisException](f)
679-
assert(e.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
680-
assert(Utils.exceptionString(e)
681-
.contains("Failed to merge incompatible data types LongType and IntegerType"))
679+
checkError(
680+
exception = e.getCause.asInstanceOf[DeltaAnalysisException],
681+
errorClass = "DELTA_MERGE_INCOMPATIBLE_DATATYPE",
682+
parameters = Map("currentDataType" -> "LongType", "updateDataType" -> "IntegerType"))
682683
}
683684
verifyNotImplicitCasting {
684685
Seq(1 -> 1).toDF("id", "p").write.mode("append").format("delta").saveAsTable(table)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ import org.apache.spark.sql.errors.QueryErrorsBase
5858
import org.apache.spark.sql.internal.SQLConf
5959
import org.apache.spark.sql.test.SharedSparkSession
6060
import org.apache.spark.sql.types._
61-
import org.apache.spark.util.Utils.exceptionString
6261

6362
trait DeltaErrorsSuiteBase
6463
extends QueryTest
@@ -979,13 +978,14 @@ trait DeltaErrorsSuiteBase
979978
val s2 = StructType(Seq(StructField("c0", StringType)))
980979
SchemaMergingUtils.mergeSchemas(s1, s2)
981980
}
982-
assert(e.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
983-
assert(
984-
exceptionString(e)
985-
.contains("Failed to merge incompatible data types IntegerType and StringType")
986-
&& exceptionString(e)
987-
.contains("Failed to merge fields 'c0' and 'c0'")
988-
)
981+
checkError(
982+
exception = e,
983+
errorClass = "DELTA_FAILED_TO_MERGE_FIELDS",
984+
parameters = Map("currentField" -> "c0", "updateField" -> "c0"))
985+
checkError(
986+
exception = e.getCause.asInstanceOf[DeltaAnalysisException],
987+
errorClass = "DELTA_MERGE_INCOMPATIBLE_DATATYPE",
988+
parameters = Map("currentDataType" -> "IntegerType", "updateDataType" -> "StringType"))
989989
}
990990
{
991991
val e = intercept[DeltaAnalysisException] {

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,10 @@ class DeltaSinkSuite
400400
.mode("append")
401401
.save(outputDir.getCanonicalPath)
402402
}
403-
assert(e.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
404-
assert(Utils.exceptionString(e).contains("incompatible"))
403+
checkError(
404+
exception = e,
405+
errorClass = "DELTA_FAILED_TO_MERGE_FIELDS",
406+
parameters = Map("currentField" -> "id", "updateField" -> "id"))
405407
} finally {
406408
query.stop()
407409
}
@@ -430,9 +432,10 @@ class DeltaSinkSuite
430432
q.processAllAvailable()
431433
}
432434
assert(wrapperException.cause.isInstanceOf[AnalysisException])
433-
assert(wrapperException.cause.asInstanceOf[AnalysisException]
434-
.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
435-
assert(Utils.exceptionString(wrapperException.cause).contains("incompatible"))
435+
checkError(
436+
exception = wrapperException.cause.asInstanceOf[AnalysisException],
437+
errorClass = "DELTA_FAILED_TO_MERGE_FIELDS",
438+
parameters = Map("currentField" -> "id", "updateField" -> "id"))
436439
}
437440
}
438441

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,8 +1305,10 @@ class DeltaSuite extends QueryTest
13051305
.mode("append")
13061306
.save(tempDir.toString)
13071307
}
1308-
assert(e.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
1309-
assert(Utils.exceptionString(e).contains("incompatible"))
1308+
checkError(
1309+
exception = e,
1310+
errorClass = "DELTA_FAILED_TO_MERGE_FIELDS",
1311+
parameters = Map("currentField" -> "value", "updateField" -> "value"))
13101312
}
13111313
}
13121314

spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,18 +277,20 @@ class CheckConstraintsSuite extends QueryTest
277277
}
278278
}
279279

280-
testQuietly("constraint with analyzer-evaluated expressions") {
280+
for (expression <- Seq("year(current_date())", "unix_timestamp()"))
281+
testQuietly(s"constraint with analyzer-evaluated expressions. Expression: $expression") {
281282
withTestTable { table =>
282-
// We use current_timestamp() as the most convenient analyzer-evaluated expression - of course
283-
// in a realistic use case it'd probably not be right to add a constraint on a
283+
// We use current_timestamp()/current_date() as the most convenient
284+
// analyzer-evaluated expressions - of course in a realistic use case
285+
// it'd probably not be right to add a constraint on a
284286
// nondeterministic expression.
285287
sql(s"ALTER TABLE $table ADD CONSTRAINT maxWithAnalyzerEval " +
286-
s"CHECK (num < unix_timestamp())")
288+
s"CHECK (num < $expression)")
287289
val e = intercept[InvariantViolationException] {
288290
sql(s"INSERT INTO $table VALUES (${Int.MaxValue}, 'data')")
289291
}
290292
errorContains(e.getMessage,
291-
"maxwithanalyzereval (num < unix_timestamp()) violated by row")
293+
s"maxwithanalyzereval (num < $expression) violated by row")
292294
}
293295
}
294296

spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import org.apache.spark.sql.functions._
4141
import org.apache.spark.sql.internal.SQLConf
4242
import org.apache.spark.sql.test.SharedSparkSession
4343
import org.apache.spark.sql.types._
44-
import org.apache.spark.util.Utils
4544

4645
class SchemaUtilsSuite extends QueryTest
4746
with SharedSparkSession
@@ -70,29 +69,26 @@ class SchemaUtilsSuite extends QueryTest
7069
s"Error message '${e.getMessage}' didn't contain the patterns: $shouldContainPatterns")
7170
}
7271

73-
private def expectErrorClassAndCause(errorClass: String)(shouldContain: String*)
72+
private def expectAnalysisErrorClass(errorClass: String, params: Map[String, String])
7473
(f: => Unit): Unit = {
7574
val e = intercept[AnalysisException] {
7675
f
7776
}
78-
val msg = Utils.exceptionString(e).toLowerCase(Locale.ROOT)
79-
assert(e.getErrorClass == errorClass)
80-
assert(
81-
shouldContain.map(_.toLowerCase(Locale.ROOT)).forall(msg.contains),
82-
s"Error cause didn't contain: $shouldContain"
83-
)
84-
}
8577

86-
private def expectErrorClassAndCausePattern(errorClass: String)
87-
(shouldContainPatterns: String*)(f: => Unit): Unit = {
88-
val e = intercept[AnalysisException] {
89-
f
78+
@tailrec
79+
def getError(ex: Throwable): Option[DeltaAnalysisException] = ex match {
80+
case e: DeltaAnalysisException if e.getErrorClass() == errorClass => Some(e)
81+
case e: AnalysisException => getError(e.getCause)
82+
case _ => None
9083
}
91-
assert(e.getErrorClass == errorClass)
92-
val patterns =
93-
shouldContainPatterns.map(regex => Pattern.compile(regex, Pattern.CASE_INSENSITIVE))
94-
assert(patterns.forall(_.matcher(Utils.exceptionString(e)).find()),
95-
s"Error cause didn't contain the patterns: $shouldContainPatterns")
84+
85+
val err = getError(e)
86+
assert(err.isDefined, "exception with the error class not found")
87+
checkError(
88+
exception = err.get,
89+
errorClass = errorClass,
90+
parameters = params,
91+
matchPVals = true)
9692
}
9793

9894
/////////////////////////////
@@ -2185,10 +2181,12 @@ class SchemaUtilsSuite extends QueryTest
21852181
.add("b", DecimalType(18, 10))))
21862182
.add("map", MapType(StringType, StringType))
21872183

2188-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")("StringType", "IntegerType") {
2184+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2185+
Map("currentDataType" -> "StringType", "updateDataType" -> "IntegerType")) {
21892186
mergeSchemas(base, new StructType().add("top", IntegerType))
21902187
}
2191-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")("IntegerType", "DateType") {
2188+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2189+
Map("currentDataType" -> "IntegerType", "updateDataType" -> "DateType")) {
21922190
mergeSchemas(base, new StructType()
21932191
.add("struct", new StructType().add("a", DateType)))
21942192
}
@@ -2197,37 +2195,39 @@ class SchemaUtilsSuite extends QueryTest
21972195
// `StructType(StructField(a,IntegerType,true))`.
21982196
// - In Scala 2.13, it extends `scala.collection.immutable.Seq` which returns
21992197
// `Seq(StructField(a,IntegerType,true))`.
2200-
expectErrorClassAndCausePattern("DELTA_FAILED_TO_MERGE_FIELDS")(
2201-
"'struct'", "StructType|Seq\\(", "MapType") {
2198+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2199+
Map("currentDataType" -> "(StructType|Seq)\\(.*", "updateDataType" -> "MapType\\(.*")) {
22022200
mergeSchemas(base, new StructType()
22032201
.add("struct", MapType(StringType, IntegerType)))
22042202
}
2205-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")(
2206-
"'array'", "DecimalType", "DoubleType") {
2203+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2204+
Map("currentDataType" -> "DecimalType\\(.*", "updateDataType" -> "DoubleType")) {
22072205
mergeSchemas(base, new StructType()
22082206
.add("array", ArrayType(new StructType().add("b", DoubleType))))
22092207
}
2210-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")("'array'", "scale") {
2208+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DECIMAL_TYPE",
2209+
Map("decimalRanges" -> "scale.*")) {
22112210
mergeSchemas(base, new StructType()
22122211
.add("array", ArrayType(new StructType().add("b", DecimalType(18, 12)))))
22132212
}
2214-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")("'array'", "precision") {
2213+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DECIMAL_TYPE",
2214+
Map("decimalRanges" -> "precision.*")) {
22152215
mergeSchemas(base, new StructType()
22162216
.add("array", ArrayType(new StructType().add("b", DecimalType(16, 10)))))
22172217
}
22182218
// See the above comment about `StructType`
2219-
expectErrorClassAndCausePattern("DELTA_FAILED_TO_MERGE_FIELDS")(
2220-
"'map'", "MapType", "StructType|Seq\\(") {
2219+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2220+
Map("currentDataType" -> "MapType\\(.*", "updateDataType" -> "(StructType|Seq)\\(.*")) {
22212221
mergeSchemas(base, new StructType()
22222222
.add("map", new StructType().add("b", StringType)))
22232223
}
2224-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")(
2225-
"'map'", "StringType", "IntegerType") {
2224+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2225+
Map("currentDataType" -> "StringType", "updateDataType" -> "IntegerType")) {
22262226
mergeSchemas(base, new StructType()
22272227
.add("map", MapType(StringType, IntegerType)))
22282228
}
2229-
expectErrorClassAndCause("DELTA_FAILED_TO_MERGE_FIELDS")(
2230-
"'map'", "StringType", "IntegerType") {
2229+
expectAnalysisErrorClass("DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2230+
Map("currentDataType" -> "StringType", "updateDataType" -> "IntegerType")) {
22312231
mergeSchemas(base, new StructType()
22322232
.add("map", MapType(IntegerType, StringType)))
22332233
}
@@ -2299,9 +2299,11 @@ class SchemaUtilsSuite extends QueryTest
22992299
val e = intercept[DeltaAnalysisException] {
23002300
mergeSchemas(longType, sourceType)
23012301
}
2302-
assert(e.getErrorClass == "DELTA_FAILED_TO_MERGE_FIELDS")
2303-
assert(Utils.exceptionString(e).contains(
2304-
s"Failed to merge incompatible data types LongType and ${sourceType.head.dataType}"))
2302+
checkError(
2303+
exception = e.getCause.asInstanceOf[AnalysisException],
2304+
errorClass = "DELTA_MERGE_INCOMPATIBLE_DATATYPE",
2305+
parameters = Map("currentDataType" -> "LongType",
2306+
"updateDataType" -> sourceType.head.dataType.toString))
23052307
}
23062308
}
23072309

0 commit comments

Comments
 (0)