Skip to content

Commit 429402b

Browse files
WweiLHeartSaVioR
authored andcommitted
[SPARK-49883][SS] State Store Checkpoint Structure V2 Integration with RocksDB and RocksDBFileManager
### What changes were proposed in this pull request? This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. `version_uniqueId.zip`, `version_uniqueId.changelog`). Below is a explanation on the changes and rationale. Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (`version_uniqueId.zip`), because snapshot files are single source of truth. In addition to `LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId` added in #47895 (review), also add a in-memory map `versionToUniqueIdLineage` that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again. ## RocksDB: #### Load - When `loadedVersion != version`, try to load a changelog file with `version_checkpointUniqueId.changelog`. There could be multiple cases: 1. Version corresponds to a zip file: 1. `version_uniqueId.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2 2. `version.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1 2. Version corresponds to a changelog file: 1. `version_uniqueId.changelog`, this means changelog was enabled, and previously query run ckpt v2 2. `version.changelog`, this means changelog was enabled, and previously query run ckpt v1 - For case i.a, we construct a new empty lineage `(version, sessionCheckpointId)`. `version_uniqueId.changelog`. - For case ii.a, we read the lineage file stored in - For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without `uniqueId`, but newer files will be constructed with uniqueId. checkpoint version v1 to checkpoint version v2. Next the code finds the latest snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id. Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load(). Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet. - When `loadedVersion == version`, the same rocks db instance is reused and the lineage is stored in memory to `versionToUniqueIdLineage`. #### Commit - Also save `sessionCheckpointId` to `latestSnapshot` - Add `(newVersion, sessionCheckpointId)` to `versionToUniqueIdLineage` #### Abort Also clear up `versionToUniqueIdLineage` ## RocksDBFileManager: - A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None. - ### deleteOldVersions: If there are multiple `version_uniqueId1.zip`(changelog) and `versioion.uniqueId2.zip`, all are deleted. ## Changelog Reader / Writer We purpose to save the lineage to the first line of the changelog files. For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this. For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`. ### Why are the changes needed? Improve fault tolerance to RocksDB State Store. ### Does this PR introduce _any_ user-facing change? Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance ### How was this patch tested? Modified existing unit tests. For unit tests and backward compatibility tests please refer to: #48356 ### Was this patch authored or co-authored using generative AI tooling? No Closes #48355 from WweiL/integration-for-review. Lead-authored-by: WweiL <[email protected]> Co-authored-by: Wei Liu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent df08177 commit 429402b

File tree

16 files changed

+1235
-651
lines changed

16 files changed

+1235
-651
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,11 @@
233233
"An error occurred during loading state."
234234
],
235235
"subClass" : {
236+
"CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT" : {
237+
"message" : [
238+
"Cannot find a base snapshot checkpoint with lineage: <lineage>."
239+
]
240+
},
236241
"CANNOT_READ_CHECKPOINT" : {
237242
"message" : [
238243
"Cannot read RocksDB checkpoint metadata. Expected <expectedVersion>, but found <actualVersion>."

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,8 @@ private[spark] object LogKeys {
353353
case object LABEL_COLUMN extends LogKey
354354
case object LARGEST_CLUSTER_INDEX extends LogKey
355355
case object LAST_ACCESS_TIME extends LogKey
356+
case object LAST_COMMITTED_CHECKPOINT_ID extends LogKey
357+
case object LAST_COMMIT_BASED_CHECKPOINT_ID extends LogKey
356358
case object LAST_VALID_TIME extends LogKey
357359
case object LATEST_BATCH_ID extends LogKey
358360
case object LATEST_COMMITTED_BATCH_ID extends LogKey
@@ -361,8 +363,10 @@ private[spark] object LogKeys {
361363
case object LEFT_EXPR extends LogKey
362364
case object LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
363365
case object LINE extends LogKey
366+
case object LINEAGE extends LogKey
364367
case object LINE_NUM extends LogKey
365368
case object LISTENER extends LogKey
369+
case object LOADED_CHECKPOINT_ID extends LogKey
366370
case object LOADED_VERSION extends LogKey
367371
case object LOAD_FACTOR extends LogKey
368372
case object LOAD_TIME extends LogKey

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2602,6 +2602,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
26022602
cause = null)
26032603
}
26042604

2605+
def cannotFindBaseSnapshotCheckpoint(lineage: String): Throwable = {
2606+
new SparkException (
2607+
errorClass =
2608+
"CANNOT_LOAD_STATE_STORE.CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT",
2609+
messageParameters = Map("lineage" -> lineage),
2610+
cause = null)
2611+
}
2612+
26052613
def unexpectedFileSize(
26062614
dfsFile: Path,
26072615
localFile: File,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ class MicroBatchExecution(
513513
execCtx.startOffsets ++= execCtx.endOffsets
514514
watermarkTracker.setWatermark(
515515
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
516+
currentStateStoreCkptId ++= commitMetadata.stateUniqueIds
516517
} else if (latestCommittedBatchId == latestBatchId - 1) {
517518
execCtx.endOffsets.foreach {
518519
case (source: Source, end: Offset) =>
@@ -965,7 +966,8 @@ class MicroBatchExecution(
965966
updateStateStoreCkptId(execCtx, latestExecPlan)
966967
}
967968
execCtx.reportTimeTaken("commitOffsets") {
968-
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
969+
if (!commitLog.add(execCtx.batchId,
970+
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
969971
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
970972
}
971973
}

0 commit comments

Comments
 (0)