Skip to content

Commit 010d36f

Browse files
imarkowitzHyukjinKwon
authored andcommitted
[SPARK-53507][CONNECT] Add breaking change info to errors
### What changes were proposed in this pull request? Adds breaking change metadata to error messages Each breaking change includes a migration message explaining how the user should update their code. It also can include a spark config value which can be used to mitigate the breaking change. The migration message is concatenated to the error message. In Scala, we also include the breaking change info in the structured error message, when the STANDARD error format is used. We also include breaking change info in pyspark errors. ### Why are the changes needed? By tagging breaking changes with metadata and a spark config flag, we can build tools to automatically retry spark jobs with the breaking change disabled. ### Does this PR introduce _any_ user-facing change? This PR only adds a framework for creating breaking change errors, but does not define any breaking change errors yet. It adds new methods, for example `getBreakingChangeInfo` on `SparkThrowable`. For existing errors, this function will return `None`. ### How was this patch tested? Tests are added in `SparkThrowableSuite`, `test_connect_errors_conversion.py`, `test_errors.py`, and `FetchErrorDetailsHandlerSuite`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52256 from imarkowitz/ian/breaking-changes. Authored-by: imarkowitz <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent f490471 commit 010d36f

File tree

14 files changed

+813
-18
lines changed

14 files changed

+813
-18
lines changed

common/utils/src/main/java/org/apache/spark/SparkThrowable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ default boolean isInternalError() {
6060
return SparkThrowableHelper.isInternalError(this.getCondition());
6161
}
6262

63+
// If null, the error message is not for a breaking change
64+
default BreakingChangeInfo getBreakingChangeInfo() {
65+
return SparkThrowableHelper.getBreakingChangeInfo(
66+
this.getCondition()).getOrElse(() -> null);
67+
}
68+
6369
default Map<String, String> getMessageParameters() {
6470
return new HashMap<>();
6571
}

common/utils/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
7575
matches.map(m => m.stripSuffix(">").stripPrefix("<"))
7676
}
7777

78+
def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = {
79+
val errorClasses = errorClass.split('.')
80+
errorClasses match {
81+
case Array(mainClass) =>
82+
errorInfoMap.get(mainClass).flatMap(_.breakingChangeInfo)
83+
case Array(mainClass, subClass) =>
84+
errorInfoMap.get(mainClass).flatMap{
85+
errorInfo =>
86+
errorInfo.subClass.flatMap(_.get(subClass))
87+
.flatMap(_.breakingChangeInfo)
88+
.orElse(errorInfo.breakingChangeInfo)
89+
}
90+
case _ => None
91+
}
92+
}
93+
7894
def getMessageTemplate(errorClass: String): String = {
7995
val errorClasses = errorClass.split("\\.")
8096
assert(errorClasses.length == 1 || errorClasses.length == 2)
@@ -128,7 +144,7 @@ private object ErrorClassesJsonReader {
128144
val map = mapper.readValue(url, new TypeReference[Map[String, ErrorInfo]]() {})
129145
val errorClassWithDots = map.collectFirst {
130146
case (errorClass, _) if errorClass.contains('.') => errorClass
131-
case (_, ErrorInfo(_, Some(map), _)) if map.keys.exists(_.contains('.')) =>
147+
case (_, ErrorInfo(_, Some(map), _, _)) if map.keys.exists(_.contains('.')) =>
132148
map.keys.collectFirst { case s if s.contains('.') => s }.get
133149
}
134150
if (errorClassWithDots.isEmpty) {
@@ -147,28 +163,59 @@ private object ErrorClassesJsonReader {
147163
* @param subClass SubClass associated with this class.
148164
* @param message Message format with optional placeholders (e.g. &lt;parm&gt;).
149165
* The error message is constructed by concatenating the lines with newlines.
166+
* @param breakingChangeInfo Additional metadata if the error is due to a breaking change.
150167
*/
151168
private case class ErrorInfo(
152169
message: Seq[String],
153170
subClass: Option[Map[String, ErrorSubInfo]],
154-
sqlState: Option[String]) {
171+
sqlState: Option[String],
172+
breakingChangeInfo: Option[BreakingChangeInfo] = None) {
155173
// For compatibility with multi-line error messages
156174
@JsonIgnore
157-
val messageTemplate: String = message.mkString("\n")
175+
val messageTemplate: String = message.mkString("\n") +
176+
breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("")
158177
}
159178

160179
/**
161180
* Information associated with an error subclass.
162181
*
163182
* @param message Message format with optional placeholders (e.g. &lt;parm&gt;).
164183
* The error message is constructed by concatenating the lines with newlines.
184+
* @param breakingChangeInfo Additional metadata if the error is due to a breaking change.
165185
*/
166-
private case class ErrorSubInfo(message: Seq[String]) {
186+
private case class ErrorSubInfo(
187+
message: Seq[String],
188+
breakingChangeInfo: Option[BreakingChangeInfo] = None) {
167189
// For compatibility with multi-line error messages
168190
@JsonIgnore
169-
val messageTemplate: String = message.mkString("\n")
191+
val messageTemplate: String = message.mkString("\n") +
192+
breakingChangeInfo.map(_.migrationMessage.mkString(" ", "\n", "")).getOrElse("")
170193
}
171194

195+
/**
196+
* Additional information if the error was caused by a breaking change.
197+
*
198+
* @param migrationMessage A message explaining how the user can migrate their job to work
199+
* with the breaking change.
200+
* @param mitigationConfig A spark config flag that can be used to mitigate the
201+
* breaking change.
202+
* @param needsAudit If true, the breaking change should be inspected manually.
203+
* If false, the spark job should be retried by setting the
204+
* mitigationConfig.
205+
*/
206+
case class BreakingChangeInfo(
207+
migrationMessage: Seq[String],
208+
mitigationConfig: Option[MitigationConfig] = None,
209+
needsAudit: Boolean = true
210+
)
211+
212+
/**
213+
* A spark config flag that can be used to mitigate a breaking change.
214+
* @param key The spark config key.
215+
* @param value The spark config value that mitigates the breaking change.
216+
*/
217+
case class MitigationConfig(key: String, value: String)
218+
172219
/**
173220
* Information associated with an error state / SQLSTATE.
174221
*

common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ private[spark] object SparkThrowableHelper {
7373
errorReader.getMessageParameters(errorClass)
7474
}
7575

76+
def getBreakingChangeInfo(errorClass: String): Option[BreakingChangeInfo] = {
77+
if (errorClass == null) {
78+
None
79+
} else {
80+
errorReader.getBreakingChangeInfo(errorClass)
81+
}
82+
}
83+
7684
def isInternalError(errorClass: String): Boolean = {
7785
errorClass != null && errorClass.startsWith("INTERNAL_ERROR")
7886
}
@@ -99,6 +107,19 @@ private[spark] object SparkThrowableHelper {
99107
g.writeStringField("errorClass", errorClass)
100108
if (format == STANDARD) {
101109
g.writeStringField("messageTemplate", errorReader.getMessageTemplate(errorClass))
110+
errorReader.getBreakingChangeInfo(errorClass).foreach { breakingChangeInfo =>
111+
g.writeObjectFieldStart("breakingChangeInfo")
112+
g.writeStringField("migrationMessage",
113+
breakingChangeInfo.migrationMessage.mkString("\n"))
114+
breakingChangeInfo.mitigationConfig.foreach { mitigationConfig =>
115+
g.writeObjectFieldStart("mitigationConfig")
116+
g.writeStringField("key", mitigationConfig.key)
117+
g.writeStringField("value", mitigationConfig.value)
118+
g.writeEndObject()
119+
}
120+
g.writeBooleanField("needsAudit", breakingChangeInfo.needsAudit)
121+
g.writeEndObject()
122+
}
102123
}
103124
val sqlState = e.getSqlState
104125
if (sqlState != null) g.writeStringField("sqlState", sqlState)

core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,90 @@ class SparkThrowableSuite extends SparkFunSuite {
504504
}
505505
}
506506

507+
test("breaking changes info") {
508+
assert(SparkThrowableHelper.getBreakingChangeInfo(null).isEmpty)
509+
510+
val nonBreakingChangeError = new SparkException(
511+
errorClass = "CANNOT_PARSE_DECIMAL",
512+
messageParameters = Map.empty[String, String],
513+
cause = null)
514+
assert(nonBreakingChangeError.getBreakingChangeInfo == null)
515+
516+
withTempDir { dir =>
517+
val json = new File(dir, "errors.json")
518+
Files.writeString(
519+
json.toPath,
520+
"""
521+
|{
522+
| "TEST_ERROR": {
523+
| "message": [
524+
| "Error message 1 with <param1>."
525+
| ],
526+
| "breakingChangeInfo": {
527+
| "migrationMessage": [
528+
| "Migration message with <param2>."
529+
| ],
530+
| "mitigationConfig": {
531+
| "key": "config.key1",
532+
| "value": "config.value1"
533+
| },
534+
| "needsAudit": false
535+
| }
536+
| },
537+
| "TEST_ERROR_WITH_SUBCLASS": {
538+
| "message": [
539+
| "Error message 2 with <param1>."
540+
| ],
541+
| "subClass": {
542+
| "SUBCLASS": {
543+
| "message": [
544+
| "Subclass message with <param2>."
545+
| ],
546+
| "breakingChangeInfo": {
547+
| "migrationMessage": [
548+
| "Subclass migration message with <param3>."
549+
| ],
550+
| "mitigationConfig": {
551+
| "key": "config.key2",
552+
| "value": "config.value2"
553+
| },
554+
| "needsAudit": true
555+
| }
556+
| }
557+
| }
558+
| }
559+
|}
560+
|""".stripMargin,
561+
StandardCharsets.UTF_8)
562+
563+
val error1Params = Map("param1" -> "value1", "param2" -> "value2")
564+
val error2Params = Map("param1" -> "value1", "param2" -> "value2", "param3" -> "value3")
565+
566+
val reader =
567+
new ErrorClassesJsonReader(Seq(errorJsonFilePath.toUri.toURL, json.toURI.toURL))
568+
val errorMessage = reader.getErrorMessage("TEST_ERROR", error1Params)
569+
assert(errorMessage == "Error message 1 with value1. Migration message with value2.")
570+
val breakingChangeInfo = reader.getBreakingChangeInfo("TEST_ERROR")
571+
assert(
572+
breakingChangeInfo.contains(
573+
BreakingChangeInfo(
574+
Seq("Migration message with <param2>."),
575+
Some(MitigationConfig("config.key1", "config.value1")),
576+
needsAudit = false)))
577+
val errorMessage2 =
578+
reader.getErrorMessage("TEST_ERROR_WITH_SUBCLASS.SUBCLASS", error2Params)
579+
assert(
580+
errorMessage2 == "Error message 2 with value1. Subclass message with value2." +
581+
" Subclass migration message with value3.")
582+
val breakingChangeInfo2 = reader.getBreakingChangeInfo("TEST_ERROR_WITH_SUBCLASS.SUBCLASS")
583+
assert(
584+
breakingChangeInfo2.contains(
585+
BreakingChangeInfo(
586+
Seq("Subclass migration message with <param3>."),
587+
Some(MitigationConfig("config.key2", "config.value2")))))
588+
}
589+
}
590+
507591
test("detect unused message parameters") {
508592
checkError(
509593
exception = intercept[SparkException] {

python/pyspark/errors/exceptions/base.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import warnings
1818
from abc import ABC, abstractmethod
1919
from enum import Enum
20-
from typing import Dict, Optional, TypeVar, cast, Iterable, TYPE_CHECKING, List
20+
from typing import Any, Dict, Optional, TypeVar, cast, Iterable, TYPE_CHECKING, List
2121

2222
from pyspark.errors.exceptions.tblib import Traceback
2323
from pyspark.errors.utils import ErrorClassesReader
@@ -138,6 +138,23 @@ def getMessage(self) -> str:
138138
"""
139139
return f"[{self.getCondition()}] {self._message}"
140140

141+
def getBreakingChangeInfo(self) -> Optional[Dict[str, Any]]:
142+
"""
143+
Returns the breaking change info for an error, or None.
144+
145+
Breaking change info is a dict with two fields:
146+
147+
migration_message: list of str
148+
A message explaining how the user can migrate their job to work
149+
with the breaking change.
150+
151+
mitigation_config:
152+
A dict with key: str and value: str fields.
153+
A spark config flag that can be used to mitigate the
154+
breaking change.
155+
"""
156+
return self._error_reader.get_breaking_change_info(self._errorClass)
157+
141158
def getQueryContext(self) -> List["QueryContext"]:
142159
"""
143160
Returns :class:`QueryContext`.

python/pyspark/errors/exceptions/connect.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import grpc
1818
import json
1919
from grpc import StatusCode
20-
from typing import Dict, List, Optional, TYPE_CHECKING
20+
from typing import Any, Dict, List, Optional, TYPE_CHECKING
2121

2222
from pyspark.errors.exceptions.base import (
2323
AnalysisException as BaseAnalysisException,
@@ -95,6 +95,7 @@ def _convert_exception(
9595
display_server_stacktrace = display_server_stacktrace if stacktrace else False
9696

9797
contexts = None
98+
breaking_change_info = None
9899
if resp and resp.HasField("root_error_idx"):
99100
root_error = resp.errors[resp.root_error_idx]
100101
if hasattr(root_error, "spark_throwable"):
@@ -105,6 +106,20 @@ def _convert_exception(
105106
else DataFrameQueryContext(c)
106107
for c in root_error.spark_throwable.query_contexts
107108
]
109+
# Extract breaking change info if present
110+
if hasattr(
111+
root_error.spark_throwable, "breaking_change_info"
112+
) and root_error.spark_throwable.HasField("breaking_change_info"):
113+
bci = root_error.spark_throwable.breaking_change_info
114+
breaking_change_info = {
115+
"migration_message": list(bci.migration_message),
116+
"needs_audit": bci.needs_audit if bci.HasField("needs_audit") else True,
117+
}
118+
if bci.HasField("mitigation_config"):
119+
breaking_change_info["mitigation_config"] = {
120+
"key": bci.mitigation_config.key,
121+
"value": bci.mitigation_config.value,
122+
}
108123

109124
if "org.apache.spark.api.python.PythonException" in classes:
110125
return PythonException(
@@ -134,6 +149,7 @@ def _convert_exception(
134149
display_server_stacktrace=display_server_stacktrace,
135150
contexts=contexts,
136151
grpc_status_code=grpc_status_code,
152+
breaking_change_info=breaking_change_info,
137153
)
138154

139155
# Return UnknownException if there is no matched exception class
@@ -147,6 +163,7 @@ def _convert_exception(
147163
display_server_stacktrace=display_server_stacktrace,
148164
contexts=contexts,
149165
grpc_status_code=grpc_status_code,
166+
breaking_change_info=breaking_change_info,
150167
)
151168

152169

@@ -193,6 +210,7 @@ def __init__(
193210
display_server_stacktrace: bool = False,
194211
contexts: Optional[List[BaseQueryContext]] = None,
195212
grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN,
213+
breaking_change_info: Optional[Dict[str, Any]] = None,
196214
) -> None:
197215
if contexts is None:
198216
contexts = []
@@ -221,6 +239,7 @@ def __init__(
221239
self._display_stacktrace: bool = display_server_stacktrace
222240
self._contexts: List[BaseQueryContext] = contexts
223241
self._grpc_status_code = grpc_status_code
242+
self._breaking_change_info: Optional[Dict[str, Any]] = breaking_change_info
224243
self._log_exception()
225244

226245
def getSqlState(self) -> Optional[str]:
@@ -241,6 +260,15 @@ def getMessage(self) -> str:
241260
def getGrpcStatusCode(self) -> grpc.StatusCode:
242261
return self._grpc_status_code
243262

263+
def getBreakingChangeInfo(self) -> Optional[Dict[str, Any]]:
264+
"""
265+
Returns the breaking change info for an error, or None.
266+
267+
For Spark Connect exceptions, this returns the breaking change info
268+
received from the server, rather than looking it up from local error files.
269+
"""
270+
return self._breaking_change_info
271+
244272
def __str__(self) -> str:
245273
return self.getMessage()
246274

@@ -263,6 +291,7 @@ def __init__(
263291
display_server_stacktrace: bool = False,
264292
contexts: Optional[List[BaseQueryContext]] = None,
265293
grpc_status_code: grpc.StatusCode = StatusCode.UNKNOWN,
294+
breaking_change_info: Optional[Dict[str, Any]] = None,
266295
) -> None:
267296
super().__init__(
268297
message=message,
@@ -274,6 +303,7 @@ def __init__(
274303
display_server_stacktrace=display_server_stacktrace,
275304
contexts=contexts,
276305
grpc_status_code=grpc_status_code,
306+
breaking_change_info=breaking_change_info,
277307
)
278308

279309

0 commit comments

Comments
 (0)