Skip to content

Commit a309a05

Browse files
Dylan Wonganishshri-db
authored andcommitted
[SPARK-54390][SS] Fix JSON Deserialize in StreamingQueryListenerBus
### What changes were proposed in this pull request? The PR changes a few things: 1. Convert JSON objects and array to strings when deserializing using `ObjectToStringDeserializer`. (main fix) 2. Sets the default values for `inputRowsPerSecond` and `processedRowsPerSecond` to be `Double.NaN`. This fixes another silent deserialization issue. (fix makes testing easier) 3. When getting `jsonValue ` for `"observedMetrics"` in `StreamingQueryProgress` we return `JNothing` if parsing fails. (fix makes testing easier, but is also another issue that needs to be addressed in the future) ### Why are the changes needed? When using Spark Connect, JSON based offsets will not be deserialized properly. ### Does this PR introduce _any_ user-facing change? Yes, this corrects the issues with the StreamingQueryListener with Spark Connect. ### How was this patch tested? Added a new test progress to unit tests which contains Object and Array based source offsets. ### Was this patch authored or co-authored using generative AI tooling? No Closes #53102 from dylanwong250/SPARK-54390. Authored-by: Dylan Wong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 8fe006b commit a309a05

File tree

3 files changed

+302
-14
lines changed

3 files changed

+302
-14
lines changed

sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._
2525
import scala.math.BigDecimal.RoundingMode
2626
import scala.util.control.NonFatal
2727

28-
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
28+
import com.fasterxml.jackson.core.JsonParser
29+
import com.fasterxml.jackson.databind.{DeserializationContext, DeserializationFeature, JsonDeserializer, JsonNode, ObjectMapper}
2930
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
3031
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
3132
import org.json4s._
@@ -191,7 +192,19 @@ class StreamingQueryProgress private[spark] (
191192
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
192193
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
193194
("sink" -> sink.jsonValue) ~
194-
("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue))
195+
("observedMetrics" -> {
196+
// TODO: SPARK-54391
197+
// In Spark connect, the observedMetrics is serialized but is not deserialized properly when
198+
// being sent back to the client and the schema is null. So calling row.jsonValue will throw
199+
// an exception so we need to catch the exception and return JNothing.
200+
// This is because the Row.jsonValue method is a one way method and there is no reverse
201+
// method to convert the JSON back to a Row.
202+
try {
203+
safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue)
204+
} catch {
205+
case NonFatal(e) => JNothing
206+
}
207+
})
195208
}
196209
}
197210

@@ -210,6 +223,19 @@ private[spark] object StreamingQueryProgress {
210223
mapper.readValue[StreamingQueryProgress](json)
211224
}
212225

226+
// SPARK-54390: Custom deserializer that converts JSON objects to strings for offset fields
227+
private class ObjectToStringDeserializer extends JsonDeserializer[String] {
228+
override def deserialize(parser: JsonParser, context: DeserializationContext): String = {
229+
val node: JsonNode = parser.readValueAsTree()
230+
if (node.isTextual) {
231+
node.asText()
232+
} else {
233+
// Convert JSON object/array to string representation
234+
node.toString
235+
}
236+
}
237+
}
238+
213239
/**
214240
* Information about progress made for a source in the execution of a [[StreamingQuery]] during a
215241
* trigger. See [[StreamingQueryProgress]] for more information.
@@ -233,12 +259,19 @@ private[spark] object StreamingQueryProgress {
233259
@Evolving
234260
class SourceProgress protected[spark] (
235261
val description: String,
262+
// SPARK-54390: Use a custom deserializer to convert the JSON object to a string.
263+
@JsonDeserialize(using = classOf[ObjectToStringDeserializer])
236264
val startOffset: String,
265+
@JsonDeserialize(using = classOf[ObjectToStringDeserializer])
237266
val endOffset: String,
267+
@JsonDeserialize(using = classOf[ObjectToStringDeserializer])
238268
val latestOffset: String,
239269
val numInputRows: Long,
240-
val inputRowsPerSecond: Double,
241-
val processedRowsPerSecond: Double,
270+
// The NaN is used in deserialization to indicate the value was not set.
271+
// The NaN is then used to not output this field in the JSON.
272+
// In Spark connect, we need to ensure that the default value is Double.NaN instead of 0.0.
273+
val inputRowsPerSecond: Double = Double.NaN,
274+
val processedRowsPerSecond: Double = Double.NaN,
242275
val metrics: ju.Map[String, String] = Map[String, String]().asJava)
243276
extends Serializable {
244277

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.UUID
2121

2222
import scala.collection.mutable
2323

24+
import org.json4s.jackson.JsonMethods.{compact, parse, render}
2425
import org.scalactic.{Equality, TolerantNumerics}
2526
import org.scalatest.BeforeAndAfter
2627
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -286,6 +287,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
286287
)
287288
}
288289

290+
private def removeFieldFromJson(jsonString: String, fieldName: String): String = {
291+
val jv = parse(jsonString, useBigDecimalForDouble = true)
292+
val removed = jv.removeField { case (name, _) => name == fieldName }
293+
compact(render(removed))
294+
}
295+
289296
test("QueryProgressEvent serialization") {
290297
def testSerialization(event: QueryProgressEvent): Unit = {
291298
import scala.jdk.CollectionConverters._
@@ -294,9 +301,24 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
294301
assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
295302
assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
296303
assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala)
304+
305+
// Verify we can get the event back from the JSON string, this is important for Spark Connect
306+
// and the StreamingQueryListenerBus. This is the method that is used to deserialize the event
307+
// in StreamingQueryListenerBus.queryEventHandler
308+
val eventFromNewEvent = QueryProgressEvent.fromJson(newEvent.json)
309+
// TODO: Remove after SC-206585 is fixed
310+
// We remove the observedMetrics field because it is not serialized properly when being
311+
// removed from the listener bus, so this test is to verify that everything expect the
312+
// observedMetrics field is equal in the JSON string
313+
val eventWithoutObservedMetrics = removeFieldFromJson(event.progress.json, "observedMetrics")
314+
assert(eventFromNewEvent.progress.json === eventWithoutObservedMetrics)
297315
}
298316
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
299317
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
318+
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress3))
319+
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress4))
320+
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress5))
321+
testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress6))
300322
}
301323

302324
test("QueryTerminatedEvent serialization") {

0 commit comments

Comments
 (0)