Skip to content

Commit 23785d3

Browse files
ericm-dbHeartSaVioR
authored andcommitted
[SPARK-51904][SS] Removing async metadata purging for StateSchemaV3 and ignoring non-batch files when listing OperatorMetadata files
### What changes were proposed in this pull request? Currently, we don't want to purge StateSchemaV3 files, so we need to remove the relevant call from MicrobatchExecution. Additionally, we want to ignore any files in the metadata or state schema directory that don't have a Long (which would cause a parse exception) ### Why are the changes needed? The changes are needed because we cannot purge schema files because these are necessary until full rewrite is implemented. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50700 from ericm-db/remove-async-purge. Authored-by: Eric Marnadi <eric.marnadi@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent c2305ed commit 23785d3

File tree

3 files changed

+234
-11
lines changed

3 files changed

+234
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,17 +332,24 @@ class OperatorStateMetadataV2Reader(
332332
if (!fm.exists(offsetLog)) {
333333
return Array.empty
334334
}
335+
// Offset Log files are numeric so we want to skip any files that don't
336+
// conform to this
335337
fm.list(offsetLog)
336338
.filter(f => !f.getPath.getName.startsWith(".")) // ignore hidden files
337-
.map(_.getPath.getName.toLong).sorted
339+
.flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption)
340+
.sorted
338341
}
339342

340343
// List the available batches in the operator metadata directory
341344
private def listOperatorMetadataBatches(): Array[Long] = {
342345
if (!fm.exists(metadataDirPath)) {
343346
return Array.empty
344347
}
345-
fm.list(metadataDirPath).map(_.getPath.getName.toLong).sorted
348+
349+
// filter out non-numeric file names (as OperatorStateMetadataV2 file names are numeric)
350+
fm.list(metadataDirPath)
351+
.flatMap(f => scala.util.Try(f.getPath.getName.toLong).toOption)
352+
.sorted
346353
}
347354

348355
override def read(): Option[OperatorStateMetadata] = {
@@ -407,6 +414,8 @@ class OperatorStateMetadataV2FileManager(
407414
if (thresholdBatchId != 0) {
408415
val earliestBatchIdKept = deleteMetadataFiles(thresholdBatchId)
409416
// we need to delete everything from 0 to (earliestBatchIdKept - 1), inclusive
417+
// TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since earliestBatchIdKept
418+
// is always 0, and the earliest schema file to 'keep' is -1.
410419
deleteSchemaFiles(earliestBatchIdKept - 1)
411420
}
412421
}
@@ -418,11 +427,19 @@ class OperatorStateMetadataV2FileManager(
418427
commitLog.listBatchesOnDisk.headOption.getOrElse(0L)
419428
}
420429

430+
// TODO: [SPARK-50845]: Currently, deleteSchemaFiles is a no-op since thresholdBatchId
431+
// is always -1
421432
private def deleteSchemaFiles(thresholdBatchId: Long): Unit = {
433+
if (thresholdBatchId <= 0) {
434+
return
435+
}
436+
// StateSchemaV3 filenames are of the format {batchId}_{UUID}
437+
// so we want to filter for files that do not have this format
422438
val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath)
423439
val filesBeforeThreshold = schemaFiles.filter { path =>
424-
val batchIdInPath = path.getName.split("_").head.toLong
425-
batchIdInPath <= thresholdBatchId
440+
scala.util.Try(path.getName.split("_").head.toLong)
441+
.toOption
442+
.exists(_ <= thresholdBatchId)
426443
}
427444
filesBeforeThreshold.foreach { path =>
428445
fm.delete(path)
@@ -460,8 +477,8 @@ class OperatorStateMetadataV2FileManager(
460477
}
461478
}
462479

463-
// TODO: Implement state schema file purging logic once we have
464-
// enabled full-rewrite.
480+
// TODO: [SPARK-50845]: Return earliest schema file we need after implementing
481+
// full-rewrite
465482
0
466483
}
467484
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateAvroSuite.scala

Lines changed: 192 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import org.apache.hadoop.fs.Path
2021
import org.scalactic.source.Position
2122
import org.scalatest.Tag
23+
import org.scalatest.matchers.must.Matchers.be
24+
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
25+
import org.scalatest.time.{Seconds, Span}
2226

2327
import org.apache.spark.SparkUnsupportedOperationException
2428
import org.apache.spark.sql.Row
2529
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
26-
import org.apache.spark.sql.execution.streaming.MemoryStream
27-
import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStoreInvalidValueSchemaEvolution, StateStoreValueSchemaEvolutionThresholdExceeded}
30+
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MemoryStream, MicroBatchExecution}
31+
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
32+
import org.apache.spark.sql.execution.streaming.state.{OperatorStateMetadataV2, RocksDBStateStoreProvider, StateStoreInvalidValueSchemaEvolution, StateStoreValueSchemaEvolutionThresholdExceeded}
2833
import org.apache.spark.sql.internal.SQLConf
2934
import org.apache.spark.sql.streaming.util.StreamManualClock
35+
import org.apache.spark.sql.types.StructType
3036

3137
class TransformWithStateAvroSuite extends TransformWithStateSuite {
3238

@@ -264,6 +270,190 @@ class TransformWithStateAvroSuite extends TransformWithStateSuite {
264270
}
265271
}
266272

273+
test("transformWithState - verify schema files are retained through multiple evolutions") {
274+
withSQLConf(
275+
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
276+
SQLConf.SHUFFLE_PARTITIONS.key ->
277+
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
278+
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1",
279+
SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "avro") {
280+
withTempDir { chkptDir =>
281+
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
282+
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
283+
val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
284+
285+
// Start with initial basic state schema
286+
val inputData = MemoryStream[String]
287+
val result1 = inputData.toDS()
288+
.groupByKey(x => x)
289+
.transformWithState(new DefaultValueInitialProcessor(),
290+
TimeMode.None(),
291+
OutputMode.Update())
292+
293+
testStream(result1, OutputMode.Update())(
294+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
295+
AddData(inputData, "a"),
296+
CheckNewAnswer(("a", BasicState("a".hashCode, "a"))),
297+
Execute { q =>
298+
eventually(timeout(Span(5, Seconds))) {
299+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
300+
}
301+
},
302+
StopStream
303+
)
304+
305+
val hadoopConf = spark.sessionState.newHadoopConf()
306+
val fm = CheckpointFileManager.create(new Path(chkptDir.toString),
307+
hadoopConf)
308+
fm.mkdirs(new Path(new Path(chkptDir.toString, DIR_NAME_OFFSETS),
309+
"dummy_path_name"))
310+
fm.mkdirs(
311+
new Path(OperatorStateMetadataV2.metadataDirPath(
312+
new Path(new Path(new Path(chkptDir.toString), "state"), "0")
313+
),
314+
"dummy_path_name")
315+
)
316+
val dummySchemaPath =
317+
new Path(stateSchemaPath, "__dummy_file_path")
318+
fm.mkdirs(dummySchemaPath)
319+
320+
321+
// Capture initial schema files (after first schema evolution)
322+
val initialSchemaFiles = getFiles(stateSchemaPath).length
323+
assert(initialSchemaFiles > 0, "Expected schema files after initial run")
324+
325+
// Second run with evolved state (adding fields)
326+
val result2 = inputData.toDS()
327+
.groupByKey(x => x)
328+
.transformWithState(new DefaultValueEvolvedProcessor(),
329+
TimeMode.None(),
330+
OutputMode.Update())
331+
332+
testStream(result2, OutputMode.Update())(
333+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
334+
AddData(inputData, "b"),
335+
CheckNewAnswer(("b", EvolvedState("b".hashCode, "b", 100L, true, 99.9))),
336+
Execute { q =>
337+
eventually(timeout(Span(5, Seconds))) {
338+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
339+
}
340+
},
341+
StopStream
342+
)
343+
344+
// Capture schema files after second evolution
345+
val afterAddingFieldsSchemaFiles = getFiles(stateSchemaPath).length
346+
assert(afterAddingFieldsSchemaFiles > initialSchemaFiles,
347+
s"Expected more schema files after adding fields," +
348+
s" but had $initialSchemaFiles before and $afterAddingFieldsSchemaFiles after")
349+
350+
// Third run with TwoLongs schema
351+
val result3 = inputData.toDS()
352+
.groupByKey(x => x)
353+
.transformWithState(new RunningCountStatefulProcessorTwoLongs(),
354+
TimeMode.None(),
355+
OutputMode.Update())
356+
357+
testStream(result3, OutputMode.Update())(
358+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
359+
AddData(inputData, "c"),
360+
CheckNewAnswer(("c", "1")),
361+
Execute { q =>
362+
eventually(timeout(Span(5, Seconds))) {
363+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
364+
}
365+
},
366+
StopStream
367+
)
368+
369+
// Capture schema files after third evolution
370+
val afterTwoLongsSchemaFiles = getFiles(stateSchemaPath).length
371+
assert(afterTwoLongsSchemaFiles > afterAddingFieldsSchemaFiles,
372+
"Expected more schema files after TwoLongs schema change")
373+
374+
// Fourth run with ReorderedLongs schema
375+
val result4 = inputData.toDS()
376+
.groupByKey(x => x)
377+
.transformWithState(new RunningCountStatefulProcessorReorderedFields(),
378+
TimeMode.None(),
379+
OutputMode.Update())
380+
381+
testStream(result4, OutputMode.Update())(
382+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
383+
AddData(inputData, "d"),
384+
CheckNewAnswer(("d", "1")),
385+
Execute { q =>
386+
eventually(timeout(Span(5, Seconds))) {
387+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
388+
}
389+
},
390+
StopStream
391+
)
392+
393+
// Capture schema files after fourth evolution
394+
val afterReorderedSchemaFiles = getFiles(stateSchemaPath).length
395+
assert(afterReorderedSchemaFiles > afterTwoLongsSchemaFiles,
396+
"Expected more schema files after ReorderedLongs schema change")
397+
398+
// Fifth run with RenamedFields schema
399+
val result5 = inputData.toDS()
400+
.groupByKey(x => x)
401+
.transformWithState(new RenameEvolvedProcessor(),
402+
TimeMode.None(),
403+
OutputMode.Update())
404+
405+
testStream(result5, OutputMode.Update())(
406+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
407+
AddData(inputData, "e"),
408+
CheckNewAnswer(("e", "1")),
409+
// Run multiple batches to trigger maintenance
410+
AddData(inputData, "f"),
411+
CheckNewAnswer(("f", "1")),
412+
AddData(inputData, "g"),
413+
CheckNewAnswer(("g", "1")),
414+
Execute { q =>
415+
eventually(timeout(Span(5, Seconds))) {
416+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
417+
}
418+
},
419+
StopStream
420+
)
421+
422+
// Verify metadata files were purged with MIN_BATCHES_TO_RETAIN=1
423+
val finalMetadataFiles = getFiles(metadataPath).length
424+
// We expect the dummy folder and 2 metadata files
425+
assert(finalMetadataFiles <= 3,
426+
s"Expected metadata files to be purged to at most 3, but found $finalMetadataFiles")
427+
428+
// Verify schema files were NOT purged despite aggressive metadata purging
429+
val schemaFiles = getFiles(stateSchemaPath).map(_.getPath.getName)
430+
val finalSchemaFiles = schemaFiles.length
431+
assert(finalSchemaFiles >= 5,
432+
s"Expected at least 5 schema files to be retained" +
433+
s" (one per schema evolution), but found $finalSchemaFiles")
434+
assert(schemaFiles.contains(dummySchemaPath.getName))
435+
436+
// Verify we can read historical state for different batches
437+
// This should work even though metadata may have been purged for earlier batches
438+
val latestStateDf = spark.read
439+
.format("statestore")
440+
.option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
441+
.option(StateSourceOptions.STATE_VAR_NAME, "countState")
442+
.load()
443+
444+
assert(latestStateDf.count() > 0, "Expected to read current state data")
445+
446+
// Check schema of latest state - should have RenamedFields schema structure
447+
val latestValueField = latestStateDf.schema.fields.find(_.name == "value").get
448+
val latestValueType = latestValueField.dataType.asInstanceOf[StructType]
449+
450+
// Should have value4 field from RenamedFields
451+
assert(latestValueType.fields.exists(f => f.name == "value4"),
452+
"Expected renamed schema with value4 field")
453+
}
454+
}
455+
}
456+
267457
test("transformWithState - adding field should succeed") {
268458
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
269459
classOf[RocksDBStateStoreProvider].getName,

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
3333
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
3434
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3535
import org.apache.spark.sql.execution.streaming._
36+
import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.DIR_NAME_OFFSETS
3637
import org.apache.spark.sql.execution.streaming.state._
3738
import org.apache.spark.sql.functions.timestamp_seconds
3839
import org.apache.spark.sql.internal.SQLConf
@@ -1830,6 +1831,21 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest
18301831
CheckNewAnswer(("a", "1")),
18311832
StopStream
18321833
)
1834+
1835+
// Here we are writing non-metadata files to the operator metadata directory to ensure that
1836+
// they are ignored during restart.
1837+
val hadoopConf = spark.sessionState.newHadoopConf()
1838+
val fm = CheckpointFileManager.create(new Path(checkpointDir.toString),
1839+
hadoopConf)
1840+
fm.mkdirs(new Path(new Path(checkpointDir.toString, DIR_NAME_OFFSETS),
1841+
"dummy_path_name"))
1842+
fm.mkdirs(
1843+
new Path(OperatorStateMetadataV2.metadataDirPath(
1844+
new Path(new Path(new Path(checkpointDir.toString), "state"), "0")
1845+
),
1846+
"dummy_path_name")
1847+
)
1848+
18331849
val result2 = inputData.toDS()
18341850
.groupByKey(x => x)
18351851
.transformWithState(new RunningCountStatefulProcessorWithProcTimeTimer(),
@@ -1886,17 +1902,17 @@ abstract class TransformWithStateSuite extends StateStoreMetricsTest
18861902
}
18871903
}
18881904

1889-
private def getFiles(path: Path): Array[FileStatus] = {
1905+
private[sql] def getFiles(path: Path): Array[FileStatus] = {
18901906
val hadoopConf = spark.sessionState.newHadoopConf()
18911907
val fileManager = CheckpointFileManager.create(path, hadoopConf)
18921908
fileManager.list(path)
18931909
}
18941910

1895-
private def getStateSchemaPath(stateCheckpointPath: Path): Path = {
1911+
private[sql] def getStateSchemaPath(stateCheckpointPath: Path): Path = {
18961912
new Path(stateCheckpointPath, "_stateSchema/default/")
18971913
}
18981914

1899-
// TODO: [SPARK-50845] Re-enable tests after StateSchemaV3 threshold change
1915+
// TODO: [SPARK-50845] Re-enable tests after full-rewrite is enabled.
19001916
ignore("transformWithState - verify that metadata and schema logs are purged") {
19011917
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
19021918
classOf[RocksDBStateStoreProvider].getName,

0 commit comments

Comments
 (0)