Skip to content

Commit 6afe6f3

Browse files
HeartSaVioRHyukjinKwon
authored andcommitted
[SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics
## What changes were proposed in this pull request? The patch adds metrics regarding state and watermark to dropwizard metrics, so that watermark and state rows/size can be tracked via time-series manner. ## How was this patch tested? Manually tested with CSV metric sink. Closes apache#21622 from HeartSaVioR/SPARK-24637. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 1076e4f commit 6afe6f3

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.text.SimpleDateFormat
21+
2022
import com.codahale.metrics.{Gauge, MetricRegistry}
2123

2224
import org.apache.spark.internal.Logging
2325
import org.apache.spark.metrics.source.{Source => CodahaleSource}
26+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2427
import org.apache.spark.sql.streaming.StreamingQueryProgress
2528

2629
/**
@@ -39,6 +42,23 @@ class MetricsReporter(
3942
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
4043
registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)
4144

45+
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
46+
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
47+
48+
registerGauge("eventTime-watermark",
49+
progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
50+
51+
registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L)
52+
registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L)
53+
54+
private def convertStringDateToMillis(isoUtcDateStr: String) = {
55+
if (isoUtcDateStr != null) {
56+
timestampFormat.parse(isoUtcDateStr).getTime
57+
} else {
58+
0L
59+
}
60+
}
61+
4262
private def registerGauge[T](
4363
name: String,
4464
f: StreamingQueryProgress => T,

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
467467
assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)
468468
assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
469469
assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0)
470+
assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] == 0)
471+
assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0)
472+
assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0)
470473
sq.stop()
471474
}
472475
}

0 commit comments

Comments
 (0)