Skip to content

Commit d47c09a

Browse files
authored
[Spark][Version Checksum] Clean up stale checksum files during cleanup (delta-io#4509)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Fixes Metadata Cleanup so that checksum files are also considered for removal. Resolves delta-io#4475 ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Updated DeltaRetentionSuite ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent c7a71d7 commit d47c09a

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ trait MetadataCleanup extends DeltaLogging {
149149
}
150150

151151
/** Helper function for getting the version of a checkpoint or a commit. */
152-
def getDeltaFileOrCheckpointVersion(filePath: Path): Long = {
153-
require(isCheckpointFile(filePath) || isDeltaFile(filePath))
152+
def getDeltaFileChecksumOrCheckpointVersion(filePath: Path): Long = {
153+
require(isCheckpointFile(filePath) || isDeltaFile(filePath) || isChecksumFile(filePath))
154154
getFileVersion(filePath)
155155
}
156156

@@ -165,10 +165,10 @@ trait MetadataCleanup extends DeltaLogging {
165165
if (latestCheckpoint.isEmpty) return Iterator.empty
166166
val threshold = latestCheckpoint.get.version - 1L
167167
val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf())
168-
.filter(f => isCheckpointFile(f) || isDeltaFile(f))
168+
.filter(f => isCheckpointFile(f) || isDeltaFile(f) || isChecksumFile(f))
169169

170170
new BufferingLogDeletionIterator(
171-
files, fileCutOffTime, threshold, getDeltaFileOrCheckpointVersion)
171+
files, fileCutOffTime, threshold, getDeltaFileChecksumOrCheckpointVersion)
172172
}
173173

174174
protected def checkpointExistsAtCleanupBoundary(deltaLog: DeltaLog, version: Long): Boolean = {

spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class DeltaRetentionSuite extends QueryTest
4444
protected override def sparkConf: SparkConf = super.sparkConf
4545

4646
override protected def getLogFiles(dir: File): Seq[File] =
47-
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)
47+
getDeltaFiles(dir) ++ getUnbackfilledDeltaFiles(dir) ++ getCheckpointFiles(dir)++
48+
getCrcFiles(dir)
4849

4950
test("delete expired logs") {
5051
withTempDir { tempDir =>
@@ -80,7 +81,7 @@ class DeltaRetentionSuite extends QueryTest
8081

8182
log.checkpoint()
8283

83-
val expectedFiles = Seq("04.json", "04.checkpoint.parquet")
84+
val expectedFiles = Seq("04.json", "04.checkpoint.parquet", "04.crc")
8485
// after checkpointing, the files should be cleared
8586
log.cleanUpExpiredLogs(log.snapshot)
8687
val afterCleanup = getLogFiles(logPath)
@@ -262,7 +263,7 @@ class DeltaRetentionSuite extends QueryTest
262263
}
263264
}
264265

265-
test("the checkpoint file for version 0 should be cleaned") {
266+
test("the checkpoint and checksum for version 0 should be cleaned") {
266267
withTempDir { tempDir =>
267268
val clock = new ManualClock(getStartTimeForRetentionTest)
268269
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
@@ -289,6 +290,9 @@ class DeltaRetentionSuite extends QueryTest
289290
initialFiles.foreach { file =>
290291
assert(!afterCleanup.contains(file))
291292
}
293+
compareVersions(getCrcVersions(logPath), "checksum", Set(1))
294+
compareVersions(getFileVersions(getDeltaFiles(logPath)), "commit", Set(1))
295+
compareVersions(getFileVersions(getCheckpointFiles(logPath)), "checkpoint", Set(1))
292296
}
293297
}
294298

spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ trait DeltaRetentionSuiteBase extends QueryTest
8989
files.map(f => f.getName()).map(s => s.substring(0, s.indexOf(".")).toLong).toSet
9090
}
9191

92+
protected def getCrcFiles(dir: File): Seq[File] =
93+
dir.listFiles().filter(f => FileNames.isChecksumFile(new Path(f.getCanonicalPath)))
94+
95+
protected def getCrcVersions(dir: File): Set[Long] =
96+
getFileVersions(getCrcFiles(dir))
97+
98+
protected def getDeltaAndCrcFiles(dir: File): Seq[File] =
99+
getDeltaFiles(dir) ++ getCrcFiles(dir)
100+
101+
92102
protected def getDeltaVersions(dir: File): Set[Long] = {
93103
val backfilledDeltaVersions = getFileVersions(getDeltaFiles(dir))
94104
val unbackfilledDeltaVersions = getUnbackfilledDeltaVersions(dir)

0 commit comments

Comments
 (0)