Skip to content

Commit 1692b55

Browse files
jackywang-dbsryza
authored andcommitted
[SPARK-53728][SDP] Print PipelineEvent Message with Error In Test
### What changes were proposed in this pull request? Pulling some changes from #51644 For debugging purposes, print PipelineEvent with error to the console. Most tests expects pipeline to succeed, this makes it easier to see the error when it happens. Otherwise, developers have to manually add println to print the error message contained in the event, which is cumbersome. ### Why are the changes needed? Easier to debug test failures, avoid the need to manually add println for test failures. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No Closes #52463 from JiaqiWang18/SPARK-53728-print-event-with-error-in-test. Authored-by: Jacky Wang <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 776ffd5 commit 1692b55

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,6 @@ class PipelineEventSender(
158158
}
159159

160160
private def constructProtoEvent(event: PipelineEvent): proto.PipelineEvent = {
161-
val message = if (event.error.nonEmpty) {
162-
// Returns the message associated with a Throwable and all its causes
163-
def getExceptionMessages(throwable: Throwable): Seq[String] = {
164-
throwable.getMessage +:
165-
Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
166-
}
167-
val errorMessages = getExceptionMessages(event.error.get)
168-
s"""${event.message}
169-
|Error: ${errorMessages.mkString("\n")}""".stripMargin
170-
} else {
171-
event.message
172-
}
173161
val protoEventBuilder = proto.PipelineEvent
174162
.newBuilder()
175163
.setTimestamp(
@@ -182,7 +170,7 @@ class PipelineEventSender(
182170
.setSeconds(event.timestamp.getTime / 1000)
183171
.setNanos(event.timestamp.getNanos)
184172
.build())
185-
.setMessage(message)
173+
.setMessage(event.messageWithError)
186174
protoEventBuilder.build()
187175
}
188176
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineEvent.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,23 @@ case class PipelineEvent(
3939
message: String,
4040
details: EventDetails,
4141
error: Option[Throwable]
42-
)
42+
) {
43+
/** Combines the message and error (if any) into a single string */
44+
def messageWithError: String = {
45+
if (error.nonEmpty) {
46+
// Returns the message associated with a Throwable and all its causes
47+
def getExceptionMessages(throwable: Throwable): Seq[String] = {
48+
throwable.getMessage +:
49+
Option(throwable.getCause).map(getExceptionMessages).getOrElse(Nil)
50+
}
51+
val errorMessages = getExceptionMessages(error.get)
52+
s"""${message}
53+
|Error: ${errorMessages.mkString("\n")}""".stripMargin
54+
} else {
55+
message
56+
}
57+
}
58+
}
4359

4460
/**
4561
* Describes where the event originated from

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.pipelines.utils
1919

20+
import org.scalatest.Assertions.fail
21+
2022
import org.apache.spark.sql.catalyst.TableIdentifier
2123
import org.apache.spark.sql.classic.SparkSession
2224
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
@@ -51,17 +53,34 @@ trait TestPipelineUpdateContextMixin {
5153
* @param fullRefreshTables Set of tables to be fully refreshed.
5254
* @param refreshTables Set of tables to be refreshed.
5355
* @param resetCheckpointFlows Set of flows to be reset.
56+
* @param failOnErrorEvent Whether to fail test when receiving event with error.
5457
*/
5558
case class TestPipelineUpdateContext(
5659
spark: SparkSession,
5760
unresolvedGraph: DataflowGraph,
5861
fullRefreshTables: TableFilter = NoTables,
5962
refreshTables: TableFilter = AllTables,
60-
resetCheckpointFlows: FlowFilter = AllFlows
63+
resetCheckpointFlows: FlowFilter = AllFlows,
64+
failOnErrorEvent: Boolean = false
6165
) extends PipelineUpdateContext {
6266
val eventBuffer = new PipelineRunEventBuffer()
6367

64-
override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent
68+
override val eventCallback: PipelineEvent => Unit = { event =>
69+
eventBuffer.addEvent(event)
70+
// For debugging purposes, print the event to the console.
71+
// Most tests expects pipeline to succeed, this makes it easier to see
72+
// the error when it happens.
73+
if (event.error.nonEmpty) {
74+
// scalastyle:off println
75+
println("\n=== Received Pipeline Event with Error ===")
76+
println(event.messageWithError)
77+
println("=================================")
78+
// scalastyle:on println
79+
if (failOnErrorEvent) {
80+
fail(s"Pipeline event with error received: ${event.messageWithError}")
81+
}
82+
}
83+
}
6584

6685
override def flowProgressEventLogger: FlowProgressEventLogger = {
6786
new FlowProgressEventLogger(eventCallback = eventCallback)

0 commit comments

Comments
 (0)