Skip to content

Commit 18b6ec1

Browse files
arunmahadevanHyukjinKwon
authored andcommitted
[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress
## What changes were proposed in this pull request? Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source). Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further. ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#21721 from arunmahadevan/SPARK-24748. Authored-by: Arun Mahadevan <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 6afe6f3 commit 18b6ec1

File tree

9 files changed

+306
-14
lines changed

9 files changed

+306
-14
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
/**
23+
* An interface for reporting custom metrics from streaming sources and sinks
24+
*/
25+
@InterfaceStability.Evolving
26+
public interface CustomMetrics {
27+
/**
28+
* Returns a JSON serialized representation of custom metrics
29+
*
30+
* @return JSON serialized representation of custom metrics
31+
*/
32+
String json();
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.sources.v2.reader.streaming;
18+
19+
import org.apache.spark.annotation.InterfaceStability;
20+
import org.apache.spark.sql.sources.v2.CustomMetrics;
21+
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
22+
23+
/**
24+
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
25+
* interface to report custom metrics that gets reported under the
26+
* {@link org.apache.spark.sql.streaming.SourceProgress}
27+
*
28+
*/
29+
@InterfaceStability.Evolving
30+
public interface SupportsCustomReaderMetrics extends DataSourceReader {
31+
/**
32+
* Returns custom metrics specific to this data source.
33+
*/
34+
CustomMetrics getCustomMetrics();
35+
36+
/**
37+
* Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
38+
* (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
39+
* your custom metrics work right and correct values are reported always. The default action
40+
* on invalid metrics is to ignore it.
41+
*
42+
* @param ex the exception
43+
*/
44+
default void onInvalidMetrics(Exception ex) {
45+
// default is to ignore invalid custom metrics
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.sources.v2.writer.streaming;
18+
19+
import org.apache.spark.annotation.InterfaceStability;
20+
import org.apache.spark.sql.sources.v2.CustomMetrics;
21+
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
22+
23+
/**
24+
* A mix in interface for {@link DataSourceWriter}. Data source writers can implement this
25+
* interface to report custom metrics that gets reported under the
26+
* {@link org.apache.spark.sql.streaming.SinkProgress}
27+
*
28+
*/
29+
@InterfaceStability.Evolving
30+
public interface SupportsCustomWriterMetrics extends DataSourceWriter {
31+
/**
32+
* Returns custom metrics specific to this data source.
33+
*/
34+
CustomMetrics getCustomMetrics();
35+
36+
/**
37+
* Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid
38+
* (e.g. Invalid data that cannot be parsed). Throwing an error here would ensure that
39+
* your custom metrics work right and correct values are reported always. The default action
40+
* on invalid metrics is to ignore it.
41+
*
42+
* @param ex the exception
43+
*/
44+
default void onInvalidMetrics(Exception ex) {
45+
// default is to ignore invalid custom metrics
46+
}
47+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,22 @@ import java.util.{Date, UUID}
2222

2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
25+
import scala.util.control.NonFatal
26+
27+
import org.json4s.JsonAST.JValue
28+
import org.json4s.jackson.JsonMethods.parse
2529

2630
import org.apache.spark.internal.Logging
27-
import org.apache.spark.sql.{DataFrame, SparkSession}
31+
import org.apache.spark.sql.SparkSession
2832
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
2933
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3034
import org.apache.spark.sql.execution.QueryExecution
31-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
32-
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
35+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
36+
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter
37+
import org.apache.spark.sql.sources.v2.CustomMetrics
38+
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, SupportsCustomReaderMetrics}
39+
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
40+
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsCustomWriterMetrics
3341
import org.apache.spark.sql.streaming._
3442
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
3543
import org.apache.spark.util.Clock
@@ -156,18 +164,51 @@ trait ProgressReporter extends Logging {
156164
}
157165
logDebug(s"Execution stats: $executionStats")
158166

167+
// extracts and validates custom metrics from readers and writers
168+
def extractMetrics(
169+
getMetrics: () => Option[CustomMetrics],
170+
onInvalidMetrics: (Exception) => Unit): Option[String] = {
171+
try {
172+
getMetrics().map(m => {
173+
val json = m.json()
174+
parse(json)
175+
json
176+
})
177+
} catch {
178+
case ex: Exception if NonFatal(ex) =>
179+
onInvalidMetrics(ex)
180+
None
181+
}
182+
}
183+
159184
val sourceProgress = sources.distinct.map { source =>
185+
val customReaderMetrics = source match {
186+
case s: SupportsCustomReaderMetrics =>
187+
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
188+
189+
case _ => None
190+
}
191+
160192
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
161193
new SourceProgress(
162194
description = source.toString,
163195
startOffset = currentTriggerStartOffsets.get(source).orNull,
164196
endOffset = currentTriggerEndOffsets.get(source).orNull,
165197
numInputRows = numRecords,
166198
inputRowsPerSecond = numRecords / inputTimeSec,
167-
processedRowsPerSecond = numRecords / processingTimeSec
199+
processedRowsPerSecond = numRecords / processingTimeSec,
200+
customReaderMetrics.orNull
168201
)
169202
}
170-
val sinkProgress = new SinkProgress(sink.toString)
203+
204+
val customWriterMetrics = dataSourceWriter match {
205+
case Some(s: SupportsCustomWriterMetrics) =>
206+
extractMetrics(() => Option(s.getCustomMetrics), s.onInvalidMetrics)
207+
208+
case _ => None
209+
}
210+
211+
val sinkProgress = new SinkProgress(sink.toString, customWriterMetrics.orNull)
171212

172213
val newProgress = new StreamingQueryProgress(
173214
id = id,
@@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
196237
currentStatus = currentStatus.copy(isTriggerActive = false)
197238
}
198239

240+
/** Extract writer from the executed query plan. */
241+
private def dataSourceWriter: Option[DataSourceWriter] = {
242+
if (lastExecution == null) return None
243+
lastExecution.executedPlan.collect {
244+
case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
245+
p.asInstanceOf[WriteToDataSourceV2Exec].writer
246+
}.headOption match {
247+
case Some(w: MicroBatchWriter) => Some(w.writer)
248+
case _ => None
249+
}
250+
}
251+
199252
/** Extract statistics about stateful operators from the executed query plan. */
200253
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
201254
if (lastExecution == null) return Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
2626
* the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
2727
* streaming writer.
2828
*/
29-
class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceWriter {
29+
class MicroBatchWriter(batchId: Long, val writer: StreamWriter) extends DataSourceWriter {
3030
override def commit(messages: Array[WriterCommitMessage]): Unit = {
3131
writer.commit(batchId, messages)
3232
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import scala.collection.mutable
2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.util.control.NonFatal
2525

26+
import org.json4s.NoTypeHints
27+
import org.json4s.jackson.Serialization
28+
2629
import org.apache.spark.internal.Logging
2730
import org.apache.spark.sql.Row
2831
import org.apache.spark.sql.catalyst.InternalRow
@@ -32,9 +35,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
3235
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
3336
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
3437
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
35-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
38+
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamWriteSupport}
3639
import org.apache.spark.sql.sources.v2.writer._
37-
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
40+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics}
3841
import org.apache.spark.sql.streaming.OutputMode
3942
import org.apache.spark.sql.types.StructType
4043

@@ -114,14 +117,25 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
114117
batches.clear()
115118
}
116119

120+
def numRows: Int = synchronized {
121+
batches.foldLeft(0)(_ + _.data.length)
122+
}
123+
117124
override def toString(): String = "MemorySinkV2"
118125
}
119126

120127
case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
121128
extends WriterCommitMessage {}
122129

130+
class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
131+
private implicit val formats = Serialization.formats(NoTypeHints)
132+
override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
133+
}
134+
123135
class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, schema: StructType)
124-
extends DataSourceWriter with Logging {
136+
extends DataSourceWriter with SupportsCustomWriterMetrics with Logging {
137+
138+
private val memoryV2CustomMetrics = new MemoryV2CustomMetrics(sink)
125139

126140
override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode, schema)
127141

@@ -135,10 +149,16 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, sc
135149
override def abort(messages: Array[WriterCommitMessage]): Unit = {
136150
// Don't accept any of the new input.
137151
}
152+
153+
override def getCustomMetrics: CustomMetrics = {
154+
memoryV2CustomMetrics
155+
}
138156
}
139157

140158
class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
141-
extends StreamWriter {
159+
extends StreamWriter with SupportsCustomWriterMetrics {
160+
161+
private val customMemoryV2Metrics = new MemoryV2CustomMetrics(sink)
142162

143163
override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode, schema)
144164

@@ -152,6 +172,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode, schema:
152172
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
153173
// Don't accept any of the new input.
154174
}
175+
176+
override def getCustomMetrics: CustomMetrics = {
177+
customMemoryV2Metrics
178+
}
155179
}
156180

157181
case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,27 @@ class SourceProgress protected[sql](
163163
val endOffset: String,
164164
val numInputRows: Long,
165165
val inputRowsPerSecond: Double,
166-
val processedRowsPerSecond: Double) extends Serializable {
166+
val processedRowsPerSecond: Double,
167+
val customMetrics: String) extends Serializable {
168+
169+
/** SourceProgress without custom metrics. */
170+
protected[sql] def this(
171+
description: String,
172+
startOffset: String,
173+
endOffset: String,
174+
numInputRows: Long,
175+
inputRowsPerSecond: Double,
176+
processedRowsPerSecond: Double) {
177+
178+
this(
179+
description,
180+
startOffset,
181+
endOffset,
182+
numInputRows,
183+
inputRowsPerSecond,
184+
processedRowsPerSecond,
185+
null)
186+
}
167187

168188
/** The compact JSON representation of this progress. */
169189
def json: String = compact(render(jsonValue))
@@ -178,12 +198,18 @@ class SourceProgress protected[sql](
178198
if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
179199
}
180200

181-
("description" -> JString(description)) ~
201+
val jsonVal = ("description" -> JString(description)) ~
182202
("startOffset" -> tryParse(startOffset)) ~
183203
("endOffset" -> tryParse(endOffset)) ~
184204
("numInputRows" -> JInt(numInputRows)) ~
185205
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
186206
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
207+
208+
if (customMetrics != null) {
209+
jsonVal ~ ("customMetrics" -> parse(customMetrics))
210+
} else {
211+
jsonVal
212+
}
187213
}
188214

189215
private def tryParse(json: String) = try {
@@ -202,7 +228,13 @@ class SourceProgress protected[sql](
202228
*/
203229
@InterfaceStability.Evolving
204230
class SinkProgress protected[sql](
205-
val description: String) extends Serializable {
231+
val description: String,
232+
val customMetrics: String) extends Serializable {
233+
234+
/** SinkProgress without custom metrics. */
235+
protected[sql] def this(description: String) {
236+
this(description, null)
237+
}
206238

207239
/** The compact JSON representation of this progress. */
208240
def json: String = compact(render(jsonValue))
@@ -213,6 +245,12 @@ class SinkProgress protected[sql](
213245
override def toString: String = prettyJson
214246

215247
private[sql] def jsonValue: JValue = {
216-
("description" -> JString(description))
248+
val jsonVal = ("description" -> JString(description))
249+
250+
if (customMetrics != null) {
251+
jsonVal ~ ("customMetrics" -> parse(customMetrics))
252+
} else {
253+
jsonVal
254+
}
217255
}
218256
}

0 commit comments

Comments
 (0)