Skip to content

Commit 4d2c5cf

Browse files
authored
[Spark][Version Checksum] Incrementally compute allFiles in the checksum (delta-io#3899)
1 parent 700bdaf commit 4d2c5cf

File tree

10 files changed

+908
-22
lines changed

10 files changed

+908
-22
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,17 @@ object Checkpoints
619619
if (spark.conf.get(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) {
620620
snapshot.validateChecksum(Map("context" -> "writeCheckpoint"))
621621
}
622+
// Verify allFiles in checksum during checkpoint if we are not doing so already on every
623+
// commit.
624+
val allFilesInCRCEnabled = Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot)
625+
val shouldVerifyAllFilesInCRCEveryCommit =
626+
Snapshot.allFilesInCrcVerificationEnabled(spark, snapshot)
627+
if (allFilesInCRCEnabled && !shouldVerifyAllFilesInCRCEveryCommit) {
628+
snapshot.checksumOpt.foreach { checksum =>
629+
snapshot.validateFileListAgainstCRC(
630+
checksum, contextOpt = Some("triggeredFromCheckpoint"))
631+
}
632+
}
622633

623634
val hadoopConf = deltaLog.newDeltaHadoopConf()
624635

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

Lines changed: 205 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import java.io.FileNotFoundException
2020
import java.nio.charset.StandardCharsets.UTF_8
21+
import java.util.TimeZone
2122

2223
// scalastyle:off import.ordering.noEmptyLine
2324
import scala.collection.immutable.ListMap
@@ -135,6 +136,7 @@ trait RecordChecksum extends DeltaLogging {
135136
* `versionToCompute - 1` or a snapshot. Note that the snapshot may
136137
* belong to any version and this method will only use the snapshot if
137138
* it corresponds to `versionToCompute - 1`.
139+
* @param includeAddFilesInCrc True if the new checksum should include a [[AddFile]]s.
138140
* @return Either the new checksum or an error code string if the checksum could not be computed.
139141
*/
140142
// scalastyle:off argcount
@@ -147,7 +149,8 @@ trait RecordChecksum extends DeltaLogging {
147149
protocol: Protocol,
148150
operationName: String,
149151
txnIdOpt: Option[String],
150-
previousVersionState: Either[Snapshot, VersionChecksum]
152+
previousVersionState: Either[Snapshot, VersionChecksum],
153+
includeAddFilesInCrc: Boolean
151154
): Either[String, VersionChecksum] = {
152155
// scalastyle:on argcount
153156
if (!deltaLog.incrementalCommitEnabled) {
@@ -207,7 +210,8 @@ trait RecordChecksum extends DeltaLogging {
207210
oldVersionChecksum,
208211
oldSnapshot,
209212
actions,
210-
ignoreAddFilesInOperation
213+
ignoreAddFilesInOperation,
214+
includeAddFilesInCrc
211215
)
212216
}
213217

@@ -234,7 +238,8 @@ trait RecordChecksum extends DeltaLogging {
234238
oldVersionChecksum: VersionChecksum,
235239
oldSnapshot: Option[Snapshot],
236240
actions: Seq[Action],
237-
ignoreAddFiles: Boolean
241+
ignoreAddFiles: Boolean,
242+
includeAllFilesInCRC: Boolean
238243
) : Either[String, VersionChecksum] = {
239244
// scalastyle:on argcount
240245
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
@@ -281,6 +286,75 @@ trait RecordChecksum extends DeltaLogging {
281286
val domainMetadata = incrementallyComputeDomainMetadatas(
282287
oldSnapshot, oldVersionChecksum, attemptVersion, actions)
283288

289+
val computeAddFiles = if (includeAllFilesInCRC) {
290+
incrementallyComputeAddFiles(
291+
oldSnapshot = oldSnapshot,
292+
oldVersionChecksum = oldVersionChecksum,
293+
attemptVersion = attemptVersion,
294+
numFilesAfterCommit = numFiles,
295+
actionsToCommit = actions)
296+
} else if (numFiles == 0) {
297+
// If the table becomes empty after the commit, addFiles should be empty.
298+
Option(Nil)
299+
} else {
300+
None
301+
}
302+
303+
val allFiles = computeAddFiles.filter { files =>
304+
val computedNumFiles = files.size
305+
val computedTableSizeBytes = files.map(_.size).sum
306+
// Validate checksum of Incrementally computed files against the computed checksum from
307+
// incremental commits.
308+
if (computedNumFiles != numFiles || computedTableSizeBytes != tableSizeBytes) {
309+
val filePathsFromPreviousVersion = oldVersionChecksum.allFiles
310+
.orElse {
311+
recordFrameProfile("Delta", "VersionChecksum.computeNewChecksum.allFiles") {
312+
oldSnapshot.map(_.allFiles.collect().toSeq)
313+
}
314+
}
315+
.getOrElse(Seq.empty)
316+
.map(_.path)
317+
val addFilePathsInThisCommit = actions.collect { case af: AddFile => af.path }
318+
val removeFilePathsInThisCommit = actions.collect { case rf: RemoveFile => rf.path }
319+
logWarning(log"Incrementally computed files does not match the incremental checksum " +
320+
log"for commit attempt: ${MDC(DeltaLogKeys.VERSION, attemptVersion)}. " +
321+
log"addFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS,
322+
addFilePathsInThisCommit.mkString(","))}], " +
323+
log"removeFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS2,
324+
removeFilePathsInThisCommit.mkString(","))}], " +
325+
log"filePathsFromPreviousVersion: [${MDC(DeltaLogKeys.PATHS3,
326+
filePathsFromPreviousVersion.mkString(","))}], " +
327+
log"computedFiles: [${MDC(DeltaLogKeys.PATHS4,
328+
files.map(_.path).mkString(","))}]")
329+
val eventData = Map(
330+
"attemptVersion" -> attemptVersion,
331+
"expectedNumFiles" -> numFiles,
332+
"expectedTableSizeBytes" -> tableSizeBytes,
333+
"computedNumFiles" -> computedNumFiles,
334+
"computedTableSizeBytes" -> computedTableSizeBytes,
335+
"numAddFilePathsInThisCommit" -> addFilePathsInThisCommit.size,
336+
"numRemoveFilePathsInThisCommit" -> removeFilePathsInThisCommit.size,
337+
"numFilesInPreviousVersion" -> filePathsFromPreviousVersion.size,
338+
"operationName" -> operationName,
339+
"addFilePathsInThisCommit" -> JsonUtils.toJson(addFilePathsInThisCommit.take(10)),
340+
"removeFilePathsInThisCommit" -> JsonUtils.toJson(removeFilePathsInThisCommit.take(10)),
341+
"filePathsFromPreviousVersion" -> JsonUtils.toJson(filePathsFromPreviousVersion.take(10)),
342+
"computedFiles" -> JsonUtils.toJson(files.take(10))
343+
)
344+
recordDeltaEvent(
345+
deltaLog,
346+
opType = "delta.allFilesInCrc.checksumMismatch.aggregated",
347+
data = eventData)
348+
if (Utils.isTesting) {
349+
throw new IllegalStateException("Incrementally Computed State failed checksum check" +
350+
s" for commit $attemptVersion [$eventData]")
351+
}
352+
false
353+
} else {
354+
true
355+
}
356+
}
357+
284358
Right(VersionChecksum(
285359
txnId = txnIdOpt,
286360
tableSizeBytes = tableSizeBytes,
@@ -292,8 +366,8 @@ trait RecordChecksum extends DeltaLogging {
292366
protocol = protocol,
293367
setTransactions = setTransactions,
294368
domainMetadata = domainMetadata,
295-
histogramOpt = None,
296-
allFiles = None
369+
allFiles = allFiles,
370+
histogramOpt = None
297371
))
298372
}
299373

@@ -390,6 +464,62 @@ trait RecordChecksum extends DeltaLogging {
390464
// "domain metadatas not stored" as [[Some]] vs. [[None]].
391465
Some(logReplay.getDomainMetadatas.toSeq).filter(_.size <= threshold)
392466
}
467+
468+
/**
469+
* Incrementally compute [[Snapshot.allFiles]] for the commit `attemptVersion`.
470+
*
471+
* @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1
472+
* @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1
473+
* @param attemptVersion - version which we want to commit
474+
* @param numFilesAfterCommit - number of files in the table after the attemptVersion commit.
475+
* @param actionsToCommit - actions for commit `attemptVersion`
476+
* @return Optional sequence of AddFiles which represents the incrementally computed state for
477+
* commit `attemptVersion`
478+
*/
479+
private def incrementallyComputeAddFiles(
480+
oldSnapshot: Option[Snapshot],
481+
oldVersionChecksum: VersionChecksum,
482+
attemptVersion: Long,
483+
numFilesAfterCommit: Long,
484+
actionsToCommit: Seq[Action]): Option[Seq[AddFile]] = {
485+
486+
// We must enumerate both the pre- and post-commit file lists; give up if they are too big.
487+
val incrementalAllFilesThreshold =
488+
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES)
489+
val numFilesBeforeCommit = oldVersionChecksum.numFiles
490+
if (Math.max(numFilesAfterCommit, numFilesBeforeCommit) > incrementalAllFilesThreshold) {
491+
return None
492+
}
493+
494+
// We try to get files for `attemptVersion - 1` from the old CRC first. If the old CRC doesn't
495+
// have those files, then we will try to get that info from the oldSnapshot (corresponding to
496+
// attemptVersion - 1). Note that oldSnapshot might not be present if another concurrent commits
497+
// have happened in between. In this case we return and not store incrementally computed state
498+
// to crc.
499+
val oldAllFiles = oldVersionChecksum.allFiles
500+
.orElse {
501+
recordFrameProfile("Delta", "VersionChecksum.incrementallyComputeAddFiles") {
502+
oldSnapshot.map(_.allFiles.collect().toSeq)
503+
}
504+
}
505+
.getOrElse { return None }
506+
507+
val canonicalPath = new DeltaLog.CanonicalPathFunction(() => deltaLog.newDeltaHadoopConf())
508+
def normalizePath(action: Action): Action = action match {
509+
case af: AddFile => af.copy(path = canonicalPath(af.path))
510+
case rf: RemoveFile => rf.copy(path = canonicalPath(rf.path))
511+
case others => others
512+
}
513+
514+
// We only work with AddFile, so RemoveFile and SetTransaction retention don't matter.
515+
val logReplay = new InMemoryLogReplay(
516+
minFileRetentionTimestamp = 0,
517+
minSetTransactionRetentionTimestamp = None)
518+
519+
logReplay.append(attemptVersion - 1, oldAllFiles.map(normalizePath).toIterator)
520+
logReplay.append(attemptVersion, actionsToCommit.map(normalizePath).toIterator)
521+
Some(logReplay.allFiles)
522+
}
393523
}
394524

395525
object RecordChecksum {
@@ -523,6 +653,76 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
523653
false
524654
}
525655

656+
/**
657+
* Validate [[Snapshot.allFiles]] against given checksum.allFiles.
658+
* Returns true if validation succeeds, else return false.
659+
* In Unit Tests, this method throws [[IllegalStateException]] so that issues can be caught during
660+
* development.
661+
*/
662+
def validateFileListAgainstCRC(checksum: VersionChecksum, contextOpt: Option[String]): Boolean = {
663+
val fileSortKey = (f: AddFile) => (f.path, f.modificationTime, f.size)
664+
val filesFromCrc = checksum.allFiles.map(_.sortBy(fileSortKey)).getOrElse { return true }
665+
val filesFromStateReconstruction = recordFrameProfile("Delta", "snapshot.allFiles") {
666+
allFilesViaStateReconstruction.collect().toSeq.sortBy(fileSortKey)
667+
}
668+
if (filesFromCrc == filesFromStateReconstruction) return true
669+
670+
val filesFromCrcWithoutStats = filesFromCrc.map(_.copy(stats = ""))
671+
val filesFromStateReconstructionWithoutStats =
672+
filesFromStateReconstruction.map(_.copy(stats = ""))
673+
val mismatchWithStatsOnly =
674+
filesFromCrcWithoutStats == filesFromStateReconstructionWithoutStats
675+
676+
if (mismatchWithStatsOnly) {
677+
// Normalize stats in CRC as per the table schema
678+
val filesFromStateReconstructionMap =
679+
filesFromStateReconstruction.map(af => (af.path, af)).toMap
680+
val parser = DeltaFileProviderUtils.createJsonStatsParser(statsSchema)
681+
var normalizedStatsDiffer = false
682+
filesFromCrc.foreach { addFile =>
683+
val statsFromSR = filesFromStateReconstructionMap(addFile.path).stats
684+
val statsFromSRParsed = parser(statsFromSR)
685+
val statsFromCrcParsed = parser(addFile.stats)
686+
if (statsFromSRParsed != statsFromCrcParsed) {
687+
normalizedStatsDiffer = true
688+
}
689+
}
690+
if (!normalizedStatsDiffer) return true
691+
}
692+
// If incremental all-files-in-crc validation fails, then there is a possibility that the
693+
// issue is not just with incremental all-files-in-crc computation but with overall incremental
694+
// commits. So run the incremental commit crc validation and find out whether that is also
695+
// failing.
696+
val contextForIncrementalCommitCheck = contextOpt.map(c => s"$c.").getOrElse("") +
697+
"delta.allFilesInCrc.checksumMismatch.validateFileListAgainstCRC"
698+
var errorForIncrementalCommitCrcValidation = ""
699+
val incrementalCommitCrcValidationPassed = try {
700+
validateChecksum(Map("context" -> contextForIncrementalCommitCheck))
701+
} catch {
702+
case NonFatal(e) =>
703+
errorForIncrementalCommitCrcValidation += e.getMessage
704+
false
705+
}
706+
val eventData = Map(
707+
"version" -> version,
708+
"mismatchWithStatsOnly" -> mismatchWithStatsOnly,
709+
"filesCountFromCrc" -> filesFromCrc.size,
710+
"filesCountFromStateReconstruction" -> filesFromStateReconstruction.size,
711+
"filesFromCrc" -> JsonUtils.toJson(filesFromCrc),
712+
"incrementalCommitCrcValidationPassed" -> incrementalCommitCrcValidationPassed,
713+
"errorForIncrementalCommitCrcValidation" -> errorForIncrementalCommitCrcValidation,
714+
"context" -> contextOpt.getOrElse("")
715+
)
716+
val message = s"Incremental state reconstruction validation failed for version " +
717+
s"$version [${eventData.mkString(",")}]"
718+
logInfo(message)
719+
recordDeltaEvent(
720+
this.deltaLog,
721+
opType = "delta.allFilesInCrc.checksumMismatch.differentAllFiles",
722+
data = eventData)
723+
if (Utils.isTesting) throw new IllegalStateException(message)
724+
false
725+
}
526726
/**
527727
* Validates the given `checksum` against [[Snapshot.computedState]].
528728
* Returns an tuple of Maps:

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2535,6 +2535,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
25352535
protected def incrementallyDeriveChecksum(
25362536
attemptVersion: Long,
25372537
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = {
2538+
// Don't include [[AddFile]]s in CRC if this commit is modifying the schema of table in some
2539+
// way. This is to make sure we don't carry any DROPPED column from previous CRC to this CRC
2540+
// forever and can start fresh from next commit.
2541+
// If the oldSnapshot itself is missing, we don't incrementally compute the checksum.
2542+
val allFilesInCrcWritePathEnabled =
2543+
Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot) &&
2544+
(snapshot.version == -1 || snapshot.metadata.schema == metadata.schema)
25382545

25392546
incrementallyDeriveChecksum(
25402547
spark,
@@ -2545,7 +2552,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
25452552
protocol = currentTransactionInfo.protocol,
25462553
operationName = currentTransactionInfo.op.name,
25472554
txnIdOpt = Some(currentTransactionInfo.txnId),
2548-
previousVersionState = scala.Left(snapshot)
2555+
previousVersionState = scala.Left(snapshot),
2556+
includeAddFilesInCrc = allFilesInCrcWritePathEnabled
25492557
).toOption
25502558
}
25512559

0 commit comments

Comments
 (0)