Skip to content

Commit 9f45281

Browse files
authored
[Spark][Version Checksum] Incrementally compute the checksum (delta-io#3828)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> delta-io#3799 added the capability to write a Checksum file after every commit. However, writing a checksum currently requires a full state reconstruction --- which is expensive. This PR adds the capability to compute most of the fields incrementally (apply the current delta on top of the last checksum to get the checksum of the current version). This works as long as the the actual operation performed matches exactly with the specified operation type in the commit. Note that this feature is gated behind a flag that is `true` by default. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added tests in ChecksumSuite. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 1eff5df commit 9f45281

File tree

7 files changed

+323
-18
lines changed

7 files changed

+323
-18
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

Lines changed: 186 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ trait RecordChecksum extends DeltaLogging {
8282
}
8383

8484
val version = snapshot.version
85-
val checksum = snapshot.computeChecksum.copy(txnId = Some(txnId))
85+
val checksumWithoutTxnId = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum)
86+
val checksum = checksumWithoutTxnId.copy(txnId = Some(txnId))
8687
val eventData = mutable.Map[String, Any]("operationSucceeded" -> false)
8788
eventData("numAddFileActions") = checksum.allFiles.map(_.size).getOrElse(-1)
8889
eventData("numSetTransactionActions") = checksum.setTransactions.map(_.size).getOrElse(-1)
@@ -115,6 +116,190 @@ trait RecordChecksum extends DeltaLogging {
115116
opType = "delta.checksum.write",
116117
data = eventData)
117118
}
119+
120+
/**
121+
* Incrementally derive checksum for the just-committed or about-to-be committed snapshot.
122+
* @param spark The SparkSession
123+
* @param deltaLog The DeltaLog
124+
* @param versionToCompute The version for which we want to compute the checksum
125+
* @param actions The actions corresponding to the version `versionToCompute`
126+
* @param metadata The metadata corresponding to the version `versionToCompute`
127+
* @param protocol The protocol corresponding to the version `versionToCompute`
128+
* @param operationName The operation name corresponding to the version `versionToCompute`
129+
* @param txnIdOpt The transaction identifier for the version `versionToCompute`
130+
* @param previousVersionState Contains either the versionChecksum corresponding to
131+
* `versionToCompute - 1` or a snapshot. Note that the snapshot may
132+
* belong to any version and this method will only use the snapshot if
133+
* it corresponds to `versionToCompute - 1`.
134+
* @return Either the new checksum or an error code string if the checksum could not be computed.
135+
*/
136+
// scalastyle:off argcount
137+
def incrementallyDeriveChecksum(
138+
spark: SparkSession,
139+
deltaLog: DeltaLog,
140+
versionToCompute: Long,
141+
actions: Seq[Action],
142+
metadata: Metadata,
143+
protocol: Protocol,
144+
operationName: String,
145+
txnIdOpt: Option[String],
146+
previousVersionState: Either[Snapshot, VersionChecksum]
147+
): Either[String, VersionChecksum] = {
148+
// scalastyle:on argcount
149+
if (!deltaLog.incrementalCommitEnabled) {
150+
return Left("INCREMENTAL_COMMITS_DISABLED")
151+
}
152+
153+
// Do not incrementally derive checksum for ManualUpdate operations since it may
154+
// include actions that violate delta protocol invariants.
155+
if (operationName == DeltaOperations.ManualUpdate.name) {
156+
return Left("INVALID_OPERATION_MANUAL_UPDATE")
157+
}
158+
159+
// Try to incrementally compute a VersionChecksum for the just-committed snapshot.
160+
val expectedVersion = versionToCompute - 1
161+
val (oldVersionChecksum, oldSnapshot) = previousVersionState match {
162+
case Right(checksum) => checksum -> None
163+
case Left(snapshot) if snapshot.version == expectedVersion =>
164+
// The original snapshot is still fresh so use it directly. Note this could trigger
165+
// a state reconstruction if there is not an existing checksumOpt in the snapshot
166+
// or if the existing checksumOpt contains missing information e.g.
167+
// a null valued metadata or protocol. However, if we do not obtain a checksum here,
168+
// then we cannot incrementally derive a new checksum for the new snapshot.
169+
logInfo(log"Incremental commit: starting with snapshot version " +
170+
log"${MDC(DeltaLogKeys.VERSION, expectedVersion)}")
171+
val snapshotChecksum = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum)
172+
snapshotChecksum.copy(numMetadata = 1, numProtocol = 1) -> Some(snapshot)
173+
case _ =>
174+
previousVersionState.swap.foreach { snapshot =>
175+
// Occurs when snapshot is no longer fresh due to concurrent writers.
176+
// Read CRC file and validate checksum information is complete.
177+
recordDeltaEvent(deltaLog, opType = "delta.commit.snapshotAgedOut", data = Map(
178+
"snapshotVersion" -> snapshot.version,
179+
"commitAttemptVersion" -> versionToCompute
180+
))
181+
}
182+
val oldCrcOpt = deltaLog.readChecksum(expectedVersion)
183+
if (oldCrcOpt.isEmpty) {
184+
return Left("MISSING_OLD_CRC")
185+
}
186+
val oldCrcFiltered = oldCrcOpt
187+
.filterNot(_.metadata == null)
188+
.filterNot(_.protocol == null)
189+
190+
val oldCrc = oldCrcFiltered.getOrElse {
191+
return Left("OLD_CRC_INCOMPLETE")
192+
}
193+
oldCrc -> None
194+
}
195+
196+
// Incrementally compute the new version checksum, if the old one is available.
197+
val ignoreAddFilesInOperation =
198+
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
199+
200+
computeNewChecksum(
201+
versionToCompute,
202+
operationName,
203+
txnIdOpt,
204+
oldVersionChecksum,
205+
oldSnapshot,
206+
actions,
207+
ignoreAddFilesInOperation
208+
)
209+
}
210+
211+
/**
212+
* Incrementally derive new checksum from old checksum + actions.
213+
*
214+
* @param attemptVersion commit attempt version for which we want to generate CRC.
215+
* @param operationName operation name for the attempted commit.
216+
* @param txnId transaction identifier.
217+
* @param oldVersionChecksum from previous commit (attemptVersion - 1).
218+
* @param oldSnapshot snapshot representing previous commit version (i.e. attemptVersion - 1),
219+
* None if not available.
220+
* @param actions used to incrementally compute new checksum.
221+
* @param ignoreAddFiles for transactions whose add file actions refer to already-existing files
222+
* e.g., [[DeltaOperations.ComputeStats]] transactions.
223+
* @return Either the new checksum or error code string if the checksum could not be computed
224+
* incrementally due to some reason.
225+
*/
226+
// scalastyle:off argcount
227+
private[delta] def computeNewChecksum(
228+
attemptVersion: Long,
229+
operationName: String,
230+
txnIdOpt: Option[String],
231+
oldVersionChecksum: VersionChecksum,
232+
oldSnapshot: Option[Snapshot],
233+
actions: Seq[Action],
234+
ignoreAddFiles: Boolean
235+
) : Either[String, VersionChecksum] = {
236+
// scalastyle:on argcount
237+
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
238+
var tableSizeBytes = oldVersionChecksum.tableSizeBytes
239+
var numFiles = oldVersionChecksum.numFiles
240+
var protocol = oldVersionChecksum.protocol
241+
var metadata = oldVersionChecksum.metadata
242+
243+
var inCommitTimestamp : Option[Long] = None
244+
actions.foreach {
245+
case a: AddFile if !ignoreAddFiles =>
246+
tableSizeBytes += a.size
247+
numFiles += 1
248+
249+
250+
// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
251+
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
252+
val size = r.size.get
253+
tableSizeBytes -= size
254+
numFiles -= 1
255+
256+
257+
case r: RemoveFile =>
258+
// Report the failure to usage logs.
259+
val msg = s"A remove action with a missing file size was detected in file ${r.path} " +
260+
"causing incremental commit to fallback to state reconstruction."
261+
recordDeltaEvent(
262+
this.deltaLog,
263+
"delta.checksum.compute",
264+
data = Map("error" -> msg))
265+
return Left("ENCOUNTERED_REMOVE_FILE_MISSING_SIZE")
266+
case p: Protocol =>
267+
protocol = p
268+
case m: Metadata =>
269+
metadata = m
270+
case ci: CommitInfo =>
271+
inCommitTimestamp = ci.inCommitTimestamp
272+
case _ =>
273+
}
274+
275+
Right(VersionChecksum(
276+
txnId = txnIdOpt,
277+
tableSizeBytes = tableSizeBytes,
278+
numFiles = numFiles,
279+
numMetadata = 1,
280+
numProtocol = 1,
281+
inCommitTimestampOpt = inCommitTimestamp,
282+
metadata = metadata,
283+
protocol = protocol,
284+
setTransactions = None,
285+
domainMetadata = None,
286+
histogramOpt = None,
287+
allFiles = None
288+
))
289+
}
290+
291+
}
292+
293+
object RecordChecksum {
294+
// Operations where we should ignore AddFiles in the incremental checksum computation.
295+
val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set(
296+
// The transaction that computes stats is special -- it re-adds files that already exist, in
297+
// order to update their min/max stats. We should not count those against the totals.
298+
DeltaOperations.ComputeStats(Seq.empty).name,
299+
// Backfill/Tagging re-adds existing AddFiles without changing the underlying data files.
300+
// Incremental commits should ignore backfill commits.
301+
DeltaOperations.RowTrackingBackfill().name
302+
)
118303
}
119304

120305
/**

spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ class DeltaLog private(
142142
def maxSnapshotLineageLength: Int =
143143
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH)
144144

145+
private[delta] def incrementalCommitEnabled: Boolean = {
146+
DeltaLog.incrementalCommitEnableConfigs.forall(conf => spark.conf.get(conf))
147+
}
148+
145149
/** The unique identifier for this table. */
146150
def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes
147151

@@ -720,6 +724,10 @@ object DeltaLog extends DeltaLogging {
720724
.maximumSize(cacheSize)
721725
}
722726

727+
val incrementalCommitEnableConfigs = Seq(
728+
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED
729+
)
730+
723731

724732
/**
725733
* Creates a [[LogicalRelation]] for a given [[DeltaLogFileIndex]], with all necessary file source

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ object OptimisticTransaction {
250250
trait OptimisticTransactionImpl extends TransactionalWrite
251251
with SQLMetricsReporting
252252
with DeltaScanGenerator
253-
with DeltaLogging {
253+
with DeltaLogging
254+
with RecordChecksum {
254255

255256
import org.apache.spark.sql.delta.util.FileNames._
256257

@@ -2475,7 +2476,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
24752476
}
24762477
val commitFile = writeCommitFileImpl(
24772478
attemptVersion, jsonActions, commitCoordinatorClient, currentTransactionInfo)
2478-
(None, commitFile)
2479+
val newChecksumOpt = incrementallyDeriveChecksum(attemptVersion, currentTransactionInfo)
2480+
(newChecksumOpt, commitFile)
24792481
}
24802482

24812483
protected def writeCommitFileImpl(
@@ -2501,6 +2503,33 @@ trait OptimisticTransactionImpl extends TransactionalWrite
25012503
commitResponse.getCommit
25022504
}
25032505

2506+
2507+
/**
2508+
* Given an attemptVersion, obtain checksum for previous snapshot version
2509+
* (i.e., attemptVersion - 1) and incrementally derives a new checksum from
2510+
* the actions of the current transaction.
2511+
*
2512+
* @param attemptVersion that the current transaction is committing
2513+
* @param currentTransactionInfo containing actions of the current transaction
2514+
* @return
2515+
*/
2516+
protected def incrementallyDeriveChecksum(
2517+
attemptVersion: Long,
2518+
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = {
2519+
2520+
incrementallyDeriveChecksum(
2521+
spark,
2522+
deltaLog,
2523+
attemptVersion,
2524+
actions = currentTransactionInfo.finalActionsToCommit,
2525+
metadata = currentTransactionInfo.metadata,
2526+
protocol = currentTransactionInfo.protocol,
2527+
operationName = currentTransactionInfo.op.name,
2528+
txnIdOpt = Some(currentTransactionInfo.txnId),
2529+
previousVersionState = scala.Left(snapshot)
2530+
).toOption
2531+
}
2532+
25042533
/**
25052534
* Looks at actions that have happened since the txn started and checks for logical
25062535
* conflicts with the read/writes. Resolve conflicts and returns a tuple representing

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,10 @@ class Snapshot(
518518
numProtocol = numOfProtocol,
519519
inCommitTimestampOpt = getInCommitTimestampOpt,
520520
setTransactions = checksumOpt.flatMap(_.setTransactions),
521-
domainMetadata = domainMetadatasIfKnown,
521+
domainMetadata = checksumOpt.flatMap(_.domainMetadata),
522522
metadata = metadata,
523523
protocol = protocol,
524-
histogramOpt = fileSizeHistogram,
524+
histogramOpt = checksumOpt.flatMap(_.histogramOpt),
525525
allFiles = checksumOpt.flatMap(_.allFiles))
526526

527527
/** Returns the data schema of the table, used for reading stats */

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,6 +1027,14 @@ trait DeltaSQLConfBase {
10271027
.booleanConf
10281028
.createWithDefault(false)
10291029

1030+
val INCREMENTAL_COMMIT_ENABLED =
1031+
buildConf("incremental.commit.enabled")
1032+
.internal()
1033+
.doc("If true, Delta will incrementally compute the content of the commit checksum " +
1034+
"file, which avoids the full state reconstruction that would otherwise be required.")
1035+
.booleanConf
1036+
.createWithDefault(true)
1037+
10301038
val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
10311039
buildConf("checkpoint.exceptionThrowing.enabled")
10321040
.internal()

spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import org.apache.spark.sql.delta.util.FileNames
2626
import org.apache.hadoop.fs.Path
2727

2828
import org.apache.spark.sql.QueryTest
29+
import org.apache.spark.sql.SaveMode
30+
import org.apache.spark.sql.functions.col
2931
import org.apache.spark.sql.test.SharedSparkSession
3032

3133
class ChecksumSuite
@@ -60,4 +62,72 @@ class ChecksumSuite
6062
testChecksumFile(writeChecksumEnabled = true)
6163
testChecksumFile(writeChecksumEnabled = false)
6264
}
65+
66+
test("Incremental checksums: post commit snapshot should have a checksum " +
67+
"without triggering state reconstruction") {
68+
for (incrementalCommitEnabled <- BOOLEAN_DOMAIN) {
69+
withSQLConf(
70+
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false",
71+
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString) {
72+
withTempDir { tempDir =>
73+
val df = spark.range(1)
74+
df.write.format("delta").mode("append").save(tempDir.getCanonicalPath)
75+
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
76+
log
77+
.startTransaction()
78+
.commit(Seq(createTestAddFile()), DeltaOperations.Write(SaveMode.Append))
79+
val postCommitSnapshot = log.snapshot
80+
assert(postCommitSnapshot.version == 1)
81+
assert(!postCommitSnapshot.stateReconstructionTriggered)
82+
assert(postCommitSnapshot.checksumOpt.isDefined == incrementalCommitEnabled)
83+
84+
postCommitSnapshot.checksumOpt.foreach { incrementalChecksum =>
85+
val checksumFromStateReconstruction = postCommitSnapshot.computeChecksum
86+
assert(incrementalChecksum.copy(txnId = None) == checksumFromStateReconstruction)
87+
}
88+
}
89+
}
90+
}
91+
}
92+
93+
def testIncrementalChecksumWrites(tableMutationOperation: String => Unit): Unit = {
94+
withTempDir { tempDir =>
95+
withSQLConf(
96+
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
97+
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key ->"true") {
98+
val df = spark.range(10).withColumn("id2", col("id") % 2)
99+
df.write
100+
.format("delta")
101+
.partitionBy("id")
102+
.mode("append")
103+
.save(tempDir.getCanonicalPath)
104+
105+
tableMutationOperation(tempDir.getCanonicalPath)
106+
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
107+
val checksumOpt = log.snapshot.checksumOpt
108+
assert(checksumOpt.isDefined)
109+
val checksum = checksumOpt.get
110+
val computedChecksum = log.snapshot.computeChecksum
111+
assert(checksum.copy(txnId = None) === computedChecksum)
112+
}
113+
}
114+
}
115+
116+
test("Incremental checksums: INSERT") {
117+
testIncrementalChecksumWrites { tablePath =>
118+
sql(s"INSERT INTO delta.`$tablePath` SELECT *, 1 FROM range(10, 20)")
119+
}
120+
}
121+
122+
test("Incremental checksums: UPDATE") {
123+
testIncrementalChecksumWrites { tablePath =>
124+
sql(s"UPDATE delta.`$tablePath` SET id2 = id + 1 WHERE id % 2 = 0")
125+
}
126+
}
127+
128+
test("Incremental checksums: DELETE") {
129+
testIncrementalChecksumWrites { tablePath =>
130+
sql(s"DELETE FROM delta.`$tablePath` WHERE id % 2 = 0")
131+
}
132+
}
63133
}

0 commit comments

Comments
 (0)