Skip to content

Commit 3b2dd85

Browse files
zecookiezHeartSaVioR
authored andcommitted
[SPARK-51252][SS] Add instance metrics for last uploaded snapshot version in HDFS State Stores
### What changes were proposed in this pull request? SPARK-51252 Similar to SPARK-51097, This PR sets up instance-specific metrics (`SnapshotLastUploaded.partition_<partition id>_<state store name>` to be precise) in the executor side and publishes them through StreamingQueryProgress. The main difference is that this PR adds the metrics to HDFS state stores to have similar observability to RocksDB state stores. There are two non-test differences compared to the RocksDB PR: - Add tracking of lastUploadedSnapshotVersion to HDFS state stores, which is updated in the state store object instead of RocksDB. Also updates at the end of writeUpdates instead of uploadSnapshot. - Metric creation is done in the state store provider class instead of object The fixes for SparkUI does not affect this PR, since that was an issue with the underlying instance metrics collection process. I still re-verified that these metrics do not show up on SparkUI (screenshot below) As for the test changes: - Modified the RocksDB tests slightly to process some additional data in the queries, because the version difference check used in HDFS and RocksDB state stores uses > and >= respectively. The general structure of the tests remain the same. ### Why are the changes needed? We would like to introduce a similar level of observability into other types of state stores. As for the metrics themselves, it would be helpful in identifying performance degradation issues behind maintenance tasks and more as described in SPARK-51097. ### Does this PR introduce _any_ user-facing change? There will be some new metrics displayed from StreamingQueryProgress: ``` Streaming query made progress: { ... "stateOperators" : [ { ... "customMetrics" : { ... "SnapshotLastUploaded.partition_0_default" : 2, "SnapshotLastUploaded.partition_12_default" : 10, "SnapshotLastUploaded.partition_8_default" : 10, ... } } ], "sources" : ..., "sink" : ... } ``` Similar to its RocksDB variant, the amount of metrics reported is also tuned using `STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT`, and will by default report 5 instance metrics. ### How was this patch tested? Four new tests are added into a new suite StateStoreInstanceMetricSuite, making eight tests in total for both state stores. Future instance metrics can be included into this suite for testing and verification. The first two tests execute a dedup streaming query and verifies metrics are properly filtered + updated through the StreamingQueryProgress logs, but with different StateStore providers that skip maintenance tasks for specific partitions. The other two tests execute a join streaming query, which contains four state stores per partition instead of one. These two tests verifies metrics are properly collected and filtered as well. Verified the metrics do not show up on Spark UI: ![image](https://github.com/user-attachments/assets/c3c722fe-3bca-44a3-8f8d-85ed865def05) ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50030 from zecookiez/SPARK-51252-hdfs. Lead-authored-by: Zeyu Chen <zycm03@gmail.com> Co-authored-by: Zeyu Chen <ZYCM03@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 3155c4b commit 3b2dd85

File tree

4 files changed

+413
-293
lines changed

4 files changed

+413
-293
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
103103
lastProgress.stateOperators.head.customMetrics.keySet().asScala == Set(
104104
"loadedMapCacheHitCount",
105105
"loadedMapCacheMissCount",
106-
"stateOnCurrentVersionSizeBytes"))
106+
"stateOnCurrentVersionSizeBytes",
107+
"SnapshotLastUploaded.partition_0_default"))
107108
assert(lastProgress.sources.nonEmpty)
108109
assert(lastProgress.sink.description == "MemorySink")
109110
assert(lastProgress.observedMetrics.isEmpty)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
2020
import java.io._
2121
import java.util
2222
import java.util.Locale
23-
import java.util.concurrent.atomic.LongAdder
23+
import java.util.concurrent.atomic.{AtomicLong, LongAdder}
2424

2525
import scala.collection.mutable
2626
import scala.jdk.CollectionConverters._
@@ -219,7 +219,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
219219
supportedCustomMetrics.find(_.name == name).map(_ -> value)
220220
} + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate))
221221

222-
StateStoreMetrics(mapToUpdate.size(), metricsFromProvider("memoryUsedBytes"), customMetrics)
222+
val instanceMetrics = Map(
223+
instanceMetricSnapshotLastUpload.withNewId(
224+
stateStoreId.partitionId, stateStoreId.storeName) -> lastUploadedSnapshotVersion.get()
225+
)
226+
227+
StateStoreMetrics(
228+
mapToUpdate.size(),
229+
metricsFromProvider("memoryUsedBytes"),
230+
customMetrics,
231+
instanceMetrics
232+
)
223233
}
224234

225235
override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = {
@@ -386,6 +396,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
386396
Nil
387397
}
388398

399+
override def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] =
400+
Seq(instanceMetricSnapshotLastUpload)
401+
389402
private def toMessageWithContext: MessageWithContext = {
390403
log"HDFSStateStoreProvider[id = (op=${MDC(LogKeys.OP_ID, stateStoreId.operatorId)}," +
391404
log"part=${MDC(LogKeys.PARTITION_ID, stateStoreId.partitionId)})," +
@@ -419,6 +432,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
419432
private val loadedMapCacheHitCount: LongAdder = new LongAdder
420433
private val loadedMapCacheMissCount: LongAdder = new LongAdder
421434

435+
// This is updated when the maintenance task writes the snapshot file and read by the task
436+
// thread. -1 represents no version has ever been uploaded.
437+
private val lastUploadedSnapshotVersion: AtomicLong = new AtomicLong(-1L)
438+
422439
private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
423440
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
424441
"estimated size of state only on current version")
@@ -431,6 +448,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
431448
StateStoreCustomSumMetric("loadedMapCacheMissCount",
432449
"count of cache miss on states cache in provider")
433450

451+
private lazy val instanceMetricSnapshotLastUpload: StateStoreInstanceMetric =
452+
StateStoreSnapshotLastUploadInstanceMetric()
453+
434454
private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)
435455

436456
private def commitUpdates(
@@ -677,6 +697,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
677697
logInfo(log"Written snapshot file for version ${MDC(LogKeys.FILE_VERSION, version)} of " +
678698
log"${MDC(LogKeys.STATE_STORE_PROVIDER, this)} at ${MDC(LogKeys.FILE_NAME, targetFile)} " +
679699
log"for ${MDC(LogKeys.OP_TYPE, opType)}")
700+
// Compare and update with the version that was just uploaded.
701+
lastUploadedSnapshotVersion.updateAndGet(v => Math.max(version, v))
680702
}
681703

682704
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala

Lines changed: 2 additions & 290 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,21 @@ package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io.File
2121

22-
import scala.concurrent.duration.DurationInt
23-
import scala.jdk.CollectionConverters.{MapHasAsScala, SetHasAsScala}
22+
import scala.jdk.CollectionConverters.SetHasAsScala
2423

2524
import org.scalatest.time.{Minute, Span}
2625

2726
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
28-
import org.apache.spark.sql.functions.{count, expr}
27+
import org.apache.spark.sql.functions.count
2928
import org.apache.spark.sql.internal.SQLConf
3029
import org.apache.spark.sql.streaming._
3130
import org.apache.spark.sql.streaming.OutputMode.Update
3231
import org.apache.spark.util.Utils
3332

34-
// SkipMaintenanceOnCertainPartitionsProvider is a test-only provider that skips running
35-
// maintenance for partitions 0 and 1 (these are arbitrary choices). This is used to test
36-
// snapshot upload lag can be observed through StreamingQueryProgress metrics.
37-
class SkipMaintenanceOnCertainPartitionsProvider extends RocksDBStateStoreProvider {
38-
override def doMaintenance(): Unit = {
39-
if (stateStoreId.partitionId == 0 || stateStoreId.partitionId == 1) {
40-
return
41-
}
42-
super.doMaintenance()
43-
}
44-
}
45-
4633
class RocksDBStateStoreIntegrationSuite extends StreamTest
4734
with AlsoTestWithRocksDBFeatures {
4835
import testImplicits._
4936

50-
private val SNAPSHOT_LAG_METRIC_PREFIX = "SnapshotLastUploaded.partition_"
51-
5237
testWithColumnFamilies("RocksDBStateStore",
5338
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
5439
withTempDir { dir =>
@@ -286,277 +271,4 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
286271
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
287272
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
288273
}
289-
290-
private def snapshotLagMetricName(
291-
partitionId: Long,
292-
storeName: String = StateStoreId.DEFAULT_STORE_NAME): String = {
293-
s"$SNAPSHOT_LAG_METRIC_PREFIX${partitionId}_$storeName"
294-
}
295-
296-
testWithChangelogCheckpointingEnabled(
297-
"SPARK-51097: Verify snapshot lag metrics are updated correctly with RocksDBStateStoreProvider"
298-
) {
299-
withSQLConf(
300-
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
301-
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
302-
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
303-
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
304-
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
305-
) {
306-
withTempDir { checkpointDir =>
307-
val inputData = MemoryStream[String]
308-
val result = inputData.toDS().dropDuplicates()
309-
310-
testStream(result, outputMode = OutputMode.Update)(
311-
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
312-
AddData(inputData, "a"),
313-
ProcessAllAvailable(),
314-
AddData(inputData, "b"),
315-
ProcessAllAvailable(),
316-
CheckNewAnswer("a", "b"),
317-
Execute { q =>
318-
// Make sure only smallest K active metrics are published
319-
eventually(timeout(10.seconds)) {
320-
val instanceMetrics = q.lastProgress
321-
.stateOperators(0)
322-
.customMetrics
323-
.asScala
324-
.view
325-
.filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
326-
// Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT
327-
assert(
328-
instanceMetrics.size == q.sparkSession.conf
329-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
330-
)
331-
assert(instanceMetrics.forall(_._2 == 1))
332-
}
333-
},
334-
StopStream
335-
)
336-
}
337-
}
338-
}
339-
340-
testWithChangelogCheckpointingEnabled(
341-
"SPARK-51097: Verify snapshot lag metrics are updated correctly with " +
342-
"SkipMaintenanceOnCertainPartitionsProvider"
343-
) {
344-
withSQLConf(
345-
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
346-
classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
347-
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
348-
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
349-
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
350-
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
351-
) {
352-
withTempDir { checkpointDir =>
353-
val inputData = MemoryStream[String]
354-
val result = inputData.toDS().dropDuplicates()
355-
356-
testStream(result, outputMode = OutputMode.Update)(
357-
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
358-
AddData(inputData, "a"),
359-
ProcessAllAvailable(),
360-
AddData(inputData, "b"),
361-
ProcessAllAvailable(),
362-
CheckNewAnswer("a", "b"),
363-
Execute { q =>
364-
// Partitions getting skipped (id 0 and 1) do not have an uploaded version, leaving
365-
// those instance metrics as -1.
366-
eventually(timeout(10.seconds)) {
367-
assert(
368-
q.lastProgress
369-
.stateOperators(0)
370-
.customMetrics
371-
.get(snapshotLagMetricName(0)) === -1
372-
)
373-
assert(
374-
q.lastProgress
375-
.stateOperators(0)
376-
.customMetrics
377-
.get(snapshotLagMetricName(1)) === -1
378-
)
379-
// Make sure only smallest K active metrics are published
380-
val instanceMetrics = q.lastProgress
381-
.stateOperators(0)
382-
.customMetrics
383-
.asScala
384-
.view
385-
.filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
386-
// Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT
387-
assert(
388-
instanceMetrics.size == q.sparkSession.conf
389-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
390-
)
391-
// Two metrics published are -1, the remainder should all be 1 as they
392-
// uploaded properly.
393-
assert(
394-
instanceMetrics.count(_._2 == 1) == q.sparkSession.conf
395-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2
396-
)
397-
}
398-
},
399-
StopStream
400-
)
401-
}
402-
}
403-
}
404-
405-
testWithChangelogCheckpointingEnabled(
406-
"SPARK-51097: Verify snapshot lag metrics are updated correctly for join queries with " +
407-
"RocksDBStateStoreProvider"
408-
) {
409-
withSQLConf(
410-
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
411-
classOf[RocksDBStateStoreProvider].getName,
412-
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
413-
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
414-
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
415-
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
416-
) {
417-
withTempDir { checkpointDir =>
418-
val input1 = MemoryStream[Int]
419-
val input2 = MemoryStream[Int]
420-
421-
val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue")
422-
val df2 = input2
423-
.toDF()
424-
.select($"value" as "rightKey", ($"value" * 3) as "rightValue")
425-
val joined = df1.join(df2, expr("leftKey = rightKey"))
426-
427-
testStream(joined)(
428-
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
429-
AddData(input1, 1, 5),
430-
ProcessAllAvailable(),
431-
AddData(input2, 1, 5, 10),
432-
ProcessAllAvailable(),
433-
CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
434-
Execute { q =>
435-
eventually(timeout(10.seconds)) {
436-
// Make sure only smallest K active metrics are published.
437-
// There are 5 * 4 = 20 metrics in total because of join, but only 10 are published.
438-
val instanceMetrics = q.lastProgress
439-
.stateOperators(0)
440-
.customMetrics
441-
.asScala
442-
.view
443-
.filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
444-
// Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT
445-
assert(
446-
instanceMetrics.size == q.sparkSession.conf
447-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
448-
)
449-
// All state store instances should have uploaded a version
450-
assert(instanceMetrics.forall(_._2 == 1))
451-
}
452-
},
453-
StopStream
454-
)
455-
}
456-
}
457-
}
458-
459-
testWithChangelogCheckpointingEnabled(
460-
"SPARK-51097: Verify snapshot lag metrics are updated correctly for join queries with " +
461-
"SkipMaintenanceOnCertainPartitionsProvider"
462-
) {
463-
withSQLConf(
464-
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
465-
classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
466-
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
467-
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
468-
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
469-
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
470-
) {
471-
withTempDir { checkpointDir =>
472-
val input1 = MemoryStream[Int]
473-
val input2 = MemoryStream[Int]
474-
475-
val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue")
476-
val df2 = input2
477-
.toDF()
478-
.select($"value" as "rightKey", ($"value" * 3) as "rightValue")
479-
val joined = df1.join(df2, expr("leftKey = rightKey"))
480-
481-
testStream(joined)(
482-
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
483-
AddData(input1, 1, 5),
484-
ProcessAllAvailable(),
485-
AddData(input2, 1, 5, 10),
486-
ProcessAllAvailable(),
487-
CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
488-
Execute { q =>
489-
eventually(timeout(10.seconds)) {
490-
// Make sure only smallest K active metrics are published.
491-
// There are 5 * 4 = 20 metrics in total because of join, but only 10 are published.
492-
val allInstanceMetrics = q.lastProgress
493-
.stateOperators(0)
494-
.customMetrics
495-
.asScala
496-
.view
497-
.filterKeys(_.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
498-
val badInstanceMetrics = allInstanceMetrics.filterKeys(
499-
k =>
500-
k.startsWith(snapshotLagMetricName(0, "")) ||
501-
k.startsWith(snapshotLagMetricName(1, ""))
502-
)
503-
// Determined by STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT
504-
assert(
505-
allInstanceMetrics.size == q.sparkSession.conf
506-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)
507-
)
508-
// Two ids are blocked, each with four state stores
509-
assert(badInstanceMetrics.count(_._2 == -1) == 2 * 4)
510-
// The rest should have uploaded a version
511-
assert(
512-
allInstanceMetrics.count(_._2 == 1) == q.sparkSession.conf
513-
.get(SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT) - 2 * 4
514-
)
515-
}
516-
},
517-
StopStream
518-
)
519-
}
520-
}
521-
}
522-
523-
testWithChangelogCheckpointingEnabled(
524-
"SPARK-51097: Verify RocksDB instance metrics are not collected in execution plan"
525-
) {
526-
withSQLConf(
527-
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName
528-
) {
529-
withTempDir { checkpointDir =>
530-
val input1 = MemoryStream[Int]
531-
val input2 = MemoryStream[Int]
532-
533-
val df1 = input1.toDF().select($"value" as "leftKey", ($"value" * 2) as "leftValue")
534-
val df2 = input2
535-
.toDF()
536-
.select($"value" as "rightKey", ($"value" * 3) as "rightValue")
537-
val joined = df1.join(df2, expr("leftKey = rightKey"))
538-
539-
testStream(joined)(
540-
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
541-
AddData(input1, 1, 5),
542-
ProcessAllAvailable(),
543-
AddData(input2, 1, 5, 10),
544-
ProcessAllAvailable(),
545-
CheckNewAnswer((1, 2, 1, 3), (5, 10, 5, 15)),
546-
AssertOnQuery { q =>
547-
// Go through all elements in the execution plan and verify none of the metrics
548-
// are generated from RocksDB's snapshot lag instance metrics.
549-
q.lastExecution.executedPlan
550-
.collect {
551-
case node => node.metrics
552-
}
553-
.forall { nodeMetrics =>
554-
nodeMetrics.forall(metric => !metric._1.startsWith(SNAPSHOT_LAG_METRIC_PREFIX))
555-
}
556-
},
557-
StopStream
558-
)
559-
}
560-
}
561-
}
562274
}

0 commit comments

Comments
 (0)