Skip to content

Commit 08bd390

Browse files
committed
[SPARK-53794][SS] Add option to limit deletions per maintenance operation associated with rocksdb state provider
### What changes were proposed in this pull request? Add option to limit deletions per maintenance operation associated with rocksdb state provider ### Why are the changes needed? We see some instances where the changelog deletion can take a really long time. This means that for that partition, we also cannot upload full snapshots which affects recovery/replay scenarios. This problem is much more apparent on resource constrained clusters. So, we add an option to allow for incremental cleanup per maintenance operation invocation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] Run completed in 17 seconds, 591 milliseconds. [info] Total number of tests run: 8 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #52511 from anishshri-db/task/SPARK-53794. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 1124b09 commit 08bd390

File tree

5 files changed

+163
-3
lines changed

5 files changed

+163
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2664,6 +2664,16 @@ object SQLConf {
26642664
.doubleConf
26652665
.createWithDefault(0.3)
26662666

2667+
val MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE =
2668+
buildConf("spark.sql.streaming.stateStore.maxVersionsToDeletePerMaintenance")
2669+
.internal()
2670+
.doc("The maximum number of versions to delete per maintenance operation. By default, " +
2671+
"this value is set to -1, which means no limit. Note that, currently this is only " +
2672+
"supported for the RocksDB state store provider.")
2673+
.version("4.1.0")
2674+
.intConf
2675+
.createWithDefault(-1)
2676+
26672677
val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
26682678
.internal()
26692679
.doc("The maximum number of batches which will be retained in memory to avoid " +
@@ -6693,6 +6703,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
66936703

66946704
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
66956705

6706+
def maxVersionsToDeletePerMaintenance: Int = getConf(MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE)
6707+
66966708
def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
66976709

66986710
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,7 @@ class RocksDB(
13961396
val cleanupTime = timeTakenMs {
13971397
fileManager.deleteOldVersions(
13981398
numVersionsToRetain = conf.minVersionsToRetain,
1399+
maxVersionsToDeletePerMaintenance = conf.maxVersionsToDeletePerMaintenance,
13991400
minVersionsToDelete = conf.minVersionsToDelete)
14001401
}
14011402
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms")
@@ -1953,7 +1954,8 @@ case class RocksDBConf(
19531954
compressionCodec: String,
19541955
allowFAllocate: Boolean,
19551956
compression: String,
1956-
reportSnapshotUploadLag: Boolean)
1957+
reportSnapshotUploadLag: Boolean,
1958+
maxVersionsToDeletePerMaintenance: Int)
19571959

19581960
object RocksDBConf {
19591961
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -2144,7 +2146,8 @@ object RocksDBConf {
21442146
storeConf.compressionCodec,
21452147
getBooleanConf(ALLOW_FALLOCATE_CONF),
21462148
getStringConf(COMPRESSION_CONF),
2147-
storeConf.reportSnapshotUploadLag)
2149+
storeConf.reportSnapshotUploadLag,
2150+
storeConf.maxVersionsToDeletePerMaintenance)
21482151
}
21492152

21502153
def apply(): RocksDBConf = apply(new StateStoreConf())

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,8 +580,22 @@ class RocksDBFileManager(
580580
* - Partially written SST files
581581
* - SST files that were used in a version, but that version got overwritten with a different
582582
* set of SST files.
583+
*
584+
* @param numVersionsToRetain the number of RocksDB versions to keep in object store after the
585+
* deletion. Must be greater than 0, or -1 to retain all versions.
586+
* @param maxVersionsToDeletePerMaintenance the max number of RocksDB versions
587+
* to delete per maintenance operation.
588+
* Must be greater than 0, or -1 to delete all stale versions.
589+
* @param minVersionsToDelete the min number of stale versions required to trigger deletion.
590+
* If its set to <= 0, then we will always perform list operations
591+
* to determine deletion candidates. If set to a positive value, then
592+
* we will skip deletion if the number of stale versions is less than
593+
* this value.
583594
*/
584-
def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = {
595+
def deleteOldVersions(
596+
numVersionsToRetain: Int,
597+
maxVersionsToDeletePerMaintenance: Int = -1,
598+
minVersionsToDelete: Long = 0): Unit = {
585599
// Check if enough stale version files present
586600
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
587601

@@ -603,14 +617,22 @@ class RocksDBFileManager(
603617

604618
// Find the versions to delete
605619
val maxSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.last._1
620+
val minSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.head._1
606621

607622
// In order to reconstruct numVersionsToRetain version, retain the latest snapshot
608623
// that satisfies (version <= maxSnapshotVersionPresent - numVersionsToRetain + 1).
624+
// Also require
625+
// minVersionToRetain <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance.
609626
// If none of the snapshots satisfy the condition, minVersionToRetain will be 0 and
610627
// no version gets deleted.
611628
val minVersionToRetain = sortedSnapshotVersionsAndUniqueIds
612629
.map(_._1)
613630
.filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
631+
.filter( v =>
632+
if (maxVersionsToDeletePerMaintenance != -1) {
633+
v <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance
634+
} else true
635+
)
614636
.foldLeft(0L)(math.max)
615637

616638
// When snapshotVersionToDelete is non-empty, there are at least 2 snapshot versions.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class StateStoreConf(
6262
/** Maximum count of versions a State Store implementation should retain in memory */
6363
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
6464

65+
/** Maximum number of versions to delete per maintenance operation */
66+
val maxVersionsToDeletePerMaintenance: Int = sqlConf.maxVersionsToDeletePerMaintenance
67+
6568
/**
6669
* Optional fully qualified name of the subclass of [[StateStoreProvider]]
6770
* managing state data. That is, the implementation of the State Store to use.

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,126 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
783783
}
784784
}
785785

786+
testWithStateStoreCheckpointIdsAndColumnFamilies(
787+
"RocksDB: purge version files with minVersionsToDelete > 0 " +
788+
"and maxVersionsToDeletePerMaintenance > 0",
789+
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
790+
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
791+
val remoteDir = Utils.createTempDir().toString
792+
new File(remoteDir).delete() // to make sure that the directory gets created
793+
val conf = dbConf.copy(
794+
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3,
795+
maxVersionsToDeletePerMaintenance = 1)
796+
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
797+
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
798+
// Commit 5 versions
799+
// stale versions: (1, 2)
800+
// keep versions: (3, 4, 5)
801+
for (version <- 0 to 4) {
802+
// Should upload latest snapshot but not delete any files
803+
// since number of stale versions < minVersionsToDelete
804+
db.load(version)
805+
db.commit()
806+
db.doMaintenance()
807+
}
808+
809+
// Commit 1 more version
810+
// stale versions: (1, 2, 3)
811+
// keep versions: (4, 5, 6)
812+
db.load(5)
813+
db.commit()
814+
815+
// Checkpoint directory before maintenance
816+
if (isChangelogCheckpointingEnabled) {
817+
assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
818+
assert(changelogVersionsPresent(remoteDir) == (1 to 6))
819+
} else {
820+
assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
821+
}
822+
823+
// Should delete stale versions for zip files and change log files
824+
// since number of stale versions >= minVersionsToDelete
825+
db.doMaintenance()
826+
827+
// Checkpoint directory after maintenance
828+
// Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1
829+
assert(snapshotVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
830+
if (isChangelogCheckpointingEnabled) {
831+
assert(changelogVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
832+
}
833+
834+
// Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied
835+
db.load(6)
836+
db.commit()
837+
db.doMaintenance()
838+
// Verify that only one version is deleted because maxVersionsToDeletePerMaintenance = 1
839+
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
840+
if (isChangelogCheckpointingEnabled) {
841+
assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
842+
}
843+
}
844+
}
845+
846+
testWithStateStoreCheckpointIdsAndColumnFamilies(
847+
"RocksDB: purge version files with minVersionsToDelete < maxVersionsToDeletePerMaintenance",
848+
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
849+
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
850+
val remoteDir = Utils.createTempDir().toString
851+
new File(remoteDir).delete() // to make sure that the directory gets created
852+
val conf = dbConf.copy(
853+
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 1,
854+
maxVersionsToDeletePerMaintenance = 2)
855+
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
856+
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
857+
// Commit 5 versions
858+
// stale versions: (1, 2)
859+
// keep versions: (3, 4, 5)
860+
for (version <- 0 to 4) {
861+
// Should upload latest snapshot but not delete any files
862+
// since number of stale versions < minVersionsToDelete
863+
db.load(version)
864+
db.commit()
865+
db.doMaintenance()
866+
}
867+
868+
// Commit 1 more version
869+
// stale versions: (1, 2, 3)
870+
// keep versions: (4, 5, 6)
871+
db.load(5)
872+
db.commit()
873+
874+
// Checkpoint directory before maintenance
875+
// Verify that 2 oldest stale versions are deleted
876+
if (isChangelogCheckpointingEnabled) {
877+
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5))
878+
assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
879+
} else {
880+
assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
881+
}
882+
883+
// Should delete stale versions for zip files and change log files
884+
// since number of stale versions >= minVersionsToDelete
885+
db.doMaintenance()
886+
887+
// Checkpoint directory after maintenance
888+
// Verify that only one version is deleted since thats the only stale version left
889+
assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
890+
if (isChangelogCheckpointingEnabled) {
891+
assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
892+
}
893+
894+
// Commit 1 more version to ensure that minVersionsToDelete constraint is satisfied
895+
db.load(6)
896+
db.commit()
897+
db.doMaintenance()
898+
// Verify that only one version is deleted since thats the only stale version left
899+
assert(snapshotVersionsPresent(remoteDir) == Seq(5, 6, 7))
900+
if (isChangelogCheckpointingEnabled) {
901+
assert(changelogVersionsPresent(remoteDir) == Seq(5, 6, 7))
902+
}
903+
}
904+
}
905+
786906
testWithStateStoreCheckpointIdsAndColumnFamilies(
787907
"RocksDB: minDeltasForSnapshot",
788908
TestWithChangelogCheckpointingEnabled) {

0 commit comments

Comments
 (0)