Skip to content

Commit d3f0604

Browse files
authored
[Spark] Check error classes in MERGE tests (delta-io#4325)
## What changes were proposed in this pull request? Update MERGE tests to use `checkError` instead of inspecting error messages. Also cleans up error `_LEGACY_ERROR_TEMP_DELTA_0011`, a catch all that serves no purpose and hides information, failures to resolve the MERGE plan will anyway result in a proper error message. ## How was this patch tested? Updated tests.
1 parent 8dccbab commit d3f0604

File tree

7 files changed

+201
-195
lines changed

7 files changed

+201
-195
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,7 +1676,7 @@
16761676
},
16771677
"DELTA_MERGE_UNRESOLVED_EXPRESSION" : {
16781678
"message" : [
1679-
"Cannot resolve <sqlExpr> in <clause> given <cols>"
1679+
"Cannot resolve <sqlExpr> in <clause> given columns <cols>."
16801680
],
16811681
"sqlState" : "42601"
16821682
},
@@ -3187,11 +3187,6 @@
31873187
"<optionalPrefixMessage>Found unsupported expression <expression> while parsing target column name parts."
31883188
]
31893189
},
3190-
"_LEGACY_ERROR_TEMP_DELTA_0011" : {
3191-
"message" : [
3192-
"Failed to resolve plan."
3193-
]
3194-
},
31953190
"_LEGACY_ERROR_TEMP_DELTA_0012" : {
31963191
"message" : [
31973192
"Could not resolve expression: <expression>"

spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,7 @@ class DeltaMergeBuilder private(
306306
val resolvedMergeInto =
307307
ResolveDeltaMergeInto.resolveReferencesAndSchema(mergePlan, sparkSession.sessionState.conf)(
308308
tryResolveReferencesForExpressions(sparkSession))
309-
if (!resolvedMergeInto.resolved) {
310-
throw new ExtendedAnalysisException(
311-
new DeltaAnalysisException(
312-
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0011",
313-
messageParameters = Array.empty
314-
),
315-
resolvedMergeInto
316-
)
317-
}
309+
318310
val strippedMergeInto = resolvedMergeInto.copy(
319311
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
320312
)

spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object ResolveDeltaMergeInto {
4343
for (a <- expr.flatMap(_.references).filterNot(_.resolved)) {
4444
// Note: This will throw error only on unresolved attribute issues,
4545
// not other resolution errors like mismatched data types.
46-
val cols = "columns " + plans.flatMap(_.output).map(_.sql).mkString(", ")
46+
val cols = plans.flatMap(_.output).map(_.sql).mkString(", ")
4747
throw new DeltaAnalysisException(
4848
errorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
4949
messageParameters = Array(a.sql, mergeClauseTypeStr, cols),

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3248,19 +3248,6 @@ trait DeltaErrorsSuiteBase
32483248
s"name parts.")
32493249
)
32503250
}
3251-
{
3252-
val e = intercept[DeltaAnalysisException] {
3253-
throw new DeltaAnalysisException(
3254-
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0011",
3255-
messageParameters = Array.empty)
3256-
}
3257-
checkErrorMessage(
3258-
e,
3259-
Some("_LEGACY_ERROR_TEMP_DELTA_0011"),
3260-
None,
3261-
Some("Failed to resolve plan.")
3262-
)
3263-
}
32643251
{
32653252
val exprs = Seq("1".expr, "2".expr)
32663253
val e = intercept[DeltaAnalysisException] {

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
package org.apache.spark.sql.delta
1818

1919
// scalastyle:off import.ordering.noEmptyLine
20-
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
21-
import org.apache.spark.sql.delta.commands.cdc.CDCReader
2220
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2321

2422
import org.apache.spark.sql.Row
2523

2624
trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
2725
import testImplicits._
2826

29-
// All CDC suites run using MergeIntoSQLSuite only. The SQL API for NOT MATCHED BY SOURCE will
30-
// only be available with Spark 3.4. In the meantime, we explicitly run NOT MATCHED BY SOURCE
31-
// tests with CDF enabled and disabled against the Scala API. Use [[testExtendedMerge]
32-
// instead once we can run tests against the SQL API.
27+
/**
28+
* Variant of `testExtendedMerge` that runs a MERGE INTO command, checks the expected result and
29+
* additionally validate that the CDC produced is correct.
30+
*/
3331
protected def testExtendedMergeWithCDC(
3432
name: String,
3533
namePrefix: String = "not matched by source")(
@@ -65,74 +63,87 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBase {
6563
}
6664

6765
// Test analysis errors with NOT MATCHED BY SOURCE clauses.
68-
testErrorsInUnlimitedClauses(
66+
testMergeAnalysisException(
6967
"error on multiple not matched by source update clauses without condition")(
7068
mergeOn = "s.key = t.key",
7169
updateNotMatched(condition = "t.key == 3", set = "value = 2 * value"),
7270
updateNotMatched(set = "value = 3 * value"),
7371
updateNotMatched(set = "value = 4 * value"))(
74-
errorStrs = "when there are more than one not matched by source clauses in a merge " +
75-
"statement, only the last not matched by source clause can omit the condition" :: Nil)
72+
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
73+
expectedMessageParameters = Map.empty)
7674

77-
testErrorsInUnlimitedClauses(
75+
testMergeAnalysisException(
7876
"error on multiple not matched by source update/delete clauses without condition")(
7977
mergeOn = "s.key = t.key",
8078
updateNotMatched(condition = "t.key == 3", set = "value = 2 * value"),
8179
deleteNotMatched(),
8280
updateNotMatched(set = "value = 4 * value"))(
83-
errorStrs = "when there are more than one not matched by source clauses in a merge " +
84-
"statement, only the last not matched by source clause can omit the condition" :: Nil)
81+
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
82+
expectedMessageParameters = Map.empty)
8583

86-
testErrorsInUnlimitedClauses(
84+
testMergeAnalysisException(
8785
"error on non-empty condition following empty condition in not matched by source " +
8886
"update clauses")(
8987
mergeOn = "s.key = t.key",
9088
updateNotMatched(set = "value = 2 * value"),
9189
updateNotMatched(condition = "t.key < 3", set = "value = value"))(
92-
errorStrs = "when there are more than one not matched by source clauses in a merge " +
93-
"statement, only the last not matched by source clause can omit the condition" :: Nil)
90+
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
91+
expectedMessageParameters = Map.empty)
9492

95-
testErrorsInUnlimitedClauses(
93+
testMergeAnalysisException(
9694
"error on non-empty condition following empty condition in not matched by source " +
9795
"delete clauses")(
9896
mergeOn = "s.key = t.key",
9997
deleteNotMatched(),
10098
deleteNotMatched(condition = "t.key < 3"))(
101-
errorStrs = "when there are more than one not matched by source clauses in a merge " +
102-
"statement, only the last not matched by source clause can omit the condition" :: Nil)
99+
expectedErrorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
100+
expectedMessageParameters = Map.empty)
103101

104-
testAnalysisErrorsInExtendedMerge("update not matched condition - unknown reference")(
102+
testMergeAnalysisException("update not matched condition - unknown reference")(
105103
mergeOn = "s.key = t.key",
106104
updateNotMatched(condition = "unknownAttrib > 1", set = "tgtValue = tgtValue + 1"))(
107-
// Should show unknownAttrib as invalid ref and (key, tgtValue, srcValue) as valid column names.
108-
errorStrs = "UPDATE condition" :: "unknownAttrib" :: "key" :: "tgtValue" :: Nil)
105+
expectedErrorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
106+
expectedMessageParameters = Map(
107+
"sqlExpr" -> "unknownAttrib",
108+
"clause" -> "UPDATE condition",
109+
"cols" -> "t.key, t.tgtValue"))
109110

110-
testAnalysisErrorsInExtendedMerge("update not matched condition - aggregation function")(
111+
testMergeAnalysisException("update not matched condition - aggregation function")(
111112
mergeOn = "s.key = t.key",
112113
updateNotMatched(condition = "max(0) > 0", set = "tgtValue = tgtValue + 1"))(
113-
errorStrs = "UPDATE condition" :: "aggregate functions are not supported" :: Nil)
114+
expectedErrorClass = "DELTA_AGGREGATION_NOT_SUPPORTED",
115+
expectedMessageParameters = Map(
116+
"operation" -> "UPDATE condition of MERGE operation",
117+
"predicate" -> "(condition = (max(0) > 0))."))
114118

115-
testAnalysisErrorsInExtendedMerge("update not matched condition - subquery")(
119+
testMergeAnalysisException("update not matched condition - subquery")(
116120
mergeOn = "s.key = t.key",
117-
updateNotMatched(condition = "s.value in (select value from t)", set = "tgtValue = 1"))(
118-
errorStrs = Nil
119-
) // subqueries fail for unresolved reference to `t`
121+
updateNotMatched(condition = "tgtValue in (select value from t)", set = "tgtValue = 1"))(
122+
expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
123+
expectedMessageParameters = Map("relationName" -> "`t`"))
120124

121-
testAnalysisErrorsInExtendedMerge("delete not matched condition - unknown reference")(
125+
testMergeAnalysisException("delete not matched condition - unknown reference")(
122126
mergeOn = "s.key = t.key",
123127
deleteNotMatched(condition = "unknownAttrib > 1"))(
124-
// Should show unknownAttrib as invalid ref and (key, tgtValue, srcValue) as valid column names.
125-
errorStrs = "DELETE condition" :: "unknownAttrib" :: "key" :: "tgtValue" :: Nil)
128+
expectedErrorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
129+
expectedMessageParameters = Map(
130+
"sqlExpr" -> "unknownAttrib",
131+
"clause" -> "DELETE condition",
132+
"cols" -> "t.key, t.tgtValue"))
126133

127-
testAnalysisErrorsInExtendedMerge("delete not matched condition - aggregation function")(
134+
testMergeAnalysisException("delete not matched condition - aggregation function")(
128135
mergeOn = "s.key = t.key",
129136
deleteNotMatched(condition = "max(0) > 0"))(
130-
errorStrs = "DELETE condition" :: "aggregate functions are not supported" :: Nil)
137+
expectedErrorClass = "DELTA_AGGREGATION_NOT_SUPPORTED",
138+
expectedMessageParameters = Map(
139+
"operation" -> "DELETE condition of MERGE operation",
140+
"predicate" -> "(condition = (max(0) > 0))."))
131141

132-
testAnalysisErrorsInExtendedMerge("delete not matched condition - subquery")(
142+
testMergeAnalysisException("delete not matched condition - subquery")(
133143
mergeOn = "s.key = t.key",
134-
deleteNotMatched(condition = "s.srcValue in (select tgtValue from t)"))(
135-
errorStrs = Nil) // subqueries fail for unresolved reference to `t`
144+
deleteNotMatched(condition = "tgtValue in (select tgtValue from t)"))(
145+
expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
146+
expectedMessageParameters = Map("relationName" -> "`t`"))
136147

137148
// Test correctness with NOT MATCHED BY SOURCE clauses.
138149
testExtendedMergeWithCDC("all 3 types of match clauses without conditions")(

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoScalaSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase
6969
// scalastyle:on line.size.limit
7070
)
7171

72+
// Maps expected error classes to actual error classes. Used to handle error classes that are
73+
// different when running using SQL vs. Scala.
74+
override protected val mappedErrorClasses: Map[String, String] = Map(
75+
"NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION" -> "DELTA_NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION",
76+
"NON_LAST_NOT_MATCHED_BY_TARGET_CLAUSE_OMIT_CONDITION" ->
77+
"DELTA_NON_LAST_NOT_MATCHED_CLAUSE_OMIT_CONDITION",
78+
"NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION" ->
79+
"DELTA_NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION"
80+
)
7281

7382
test("basic scala API") {
7483
withTable("source") {

0 commit comments

Comments
 (0)