Skip to content

Commit ec0ab0d

Browse files
authored
[Spark][Version Checksum] Incrementally compute VersionChecksum setTransactions and domainMetadata (delta-io#3895)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Follow up for delta-io#3828. Adds support for incrementally computing the set transactions and domain metadata actions based on the current commit and the last version checksum. Incremental computation for both these action types have thresholds so that we don't store them if they are too long (tests have been added for the same). ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added new tests in DomainMetadataSuite and a new suite called `DeltaIncrementalSetTransactionsSuite` ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 68275d1 commit ec0ab0d

File tree

7 files changed

+715
-13
lines changed

7 files changed

+715
-13
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,7 @@ trait RecordChecksum extends DeltaLogging {
7878
private lazy val writer =
7979
CheckpointFileManager.create(deltaLog.logPath, deltaLog.newDeltaHadoopConf())
8080

81-
private def getChecksum(snapshot: Snapshot): VersionChecksum = {
82-
snapshot.checksumOpt.getOrElse(snapshot.computeChecksum)
83-
}
81+
private def getChecksum(snapshot: Snapshot): VersionChecksum = snapshot.computeChecksum
8482

8583
protected def writeChecksumFile(txnId: String, snapshot: Snapshot): Unit = {
8684
if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) {
@@ -277,6 +275,12 @@ trait RecordChecksum extends DeltaLogging {
277275
case _ =>
278276
}
279277

278+
val setTransactions = incrementallyComputeSetTransactions(
279+
oldSnapshot, oldVersionChecksum, attemptVersion, actions)
280+
281+
val domainMetadata = incrementallyComputeDomainMetadatas(
282+
oldSnapshot, oldVersionChecksum, attemptVersion, actions)
283+
280284
Right(VersionChecksum(
281285
txnId = txnIdOpt,
282286
tableSizeBytes = tableSizeBytes,
@@ -286,13 +290,106 @@ trait RecordChecksum extends DeltaLogging {
286290
inCommitTimestampOpt = inCommitTimestamp,
287291
metadata = metadata,
288292
protocol = protocol,
289-
setTransactions = None,
290-
domainMetadata = None,
293+
setTransactions = setTransactions,
294+
domainMetadata = domainMetadata,
291295
histogramOpt = None,
292296
allFiles = None
293297
))
294298
}
295299

300+
/**
301+
* Incrementally compute [[Snapshot.setTransactions]] for the commit `attemptVersion`.
302+
*
303+
* @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1
304+
* @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1
305+
* @param attemptVersion - version which we want to commit
306+
* @param actionsToCommit - actions for commit `attemptVersion`
307+
* @return Optional sequence of incrementally computed [[SetTransaction]]s for commit
308+
* `attemptVersion`.
309+
*/
310+
private def incrementallyComputeSetTransactions(
311+
oldSnapshot: Option[Snapshot],
312+
oldVersionChecksum: VersionChecksum,
313+
attemptVersion: Long,
314+
actionsToCommit: Seq[Action]): Option[Seq[SetTransaction]] = {
315+
// Check-1: check conf
316+
if (!spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) {
317+
return None
318+
}
319+
320+
// Check-2: check `minSetTransactionRetentionTimestamp` is not set
321+
val newMetadataToCommit = actionsToCommit.collectFirst { case m: Metadata => m }
322+
// TODO: Add support for incrementally computing [[SetTransaction]]s even when
323+
// `minSetTransactionRetentionTimestamp` is set.
324+
// We don't incrementally compute [[SetTransaction]]s when user has configured
325+
// `minSetTransactionRetentionTimestamp` as it makes verification non-deterministic.
326+
// Check all places to figure out whether `minSetTransactionRetentionTimestamp` is set:
327+
// 1. oldSnapshot corresponding to `attemptVersion - 1`
328+
// 2. old VersionChecksum's MetaData (corresponding to `attemptVersion-1`)
329+
// 3. new VersionChecksum's MetaData (corresponding to `attemptVersion`)
330+
val setTransactionRetentionTimestampConfigured =
331+
(oldSnapshot.map(_.metadata) ++ Option(oldVersionChecksum.metadata) ++ newMetadataToCommit)
332+
.exists(DeltaLog.minSetTransactionRetentionInterval(_).nonEmpty)
333+
if (setTransactionRetentionTimestampConfigured) return None
334+
335+
// Check-3: Check old setTransactions are available so that we can incrementally compute new.
336+
val oldSetTransactions = oldVersionChecksum.setTransactions
337+
.getOrElse { return None }
338+
339+
// Check-4: old/new setTransactions are within the threshold.
340+
val setTransactionsToCommit = actionsToCommit.filter(_.isInstanceOf[SetTransaction])
341+
val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC)
342+
if (Math.max(setTransactionsToCommit.size, oldSetTransactions.size) > threshold) return None
343+
344+
// We currently don't attempt incremental [[SetTransaction]] when
345+
// `minSetTransactionRetentionTimestamp` is set. So passing this as None here explicitly.
346+
// We can also ignore file retention because that only affects [[RemoveFile]] actions.
347+
val logReplay = new InMemoryLogReplay(
348+
minFileRetentionTimestamp = 0,
349+
minSetTransactionRetentionTimestamp = None)
350+
351+
logReplay.append(attemptVersion - 1, oldSetTransactions.toIterator)
352+
logReplay.append(attemptVersion, setTransactionsToCommit.toIterator)
353+
Some(logReplay.getTransactions.toSeq).filter(_.size <= threshold)
354+
}
355+
356+
/**
357+
* Incrementally compute [[Snapshot.domainMetadata]] for the commit `attemptVersion`.
358+
*
359+
* @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1
360+
* @param attemptVersion - version which we want to commit
361+
* @param actionsToCommit - actions for commit `attemptVersion`
362+
* @return Sequence of incrementally computed [[DomainMetadata]]s for commit
363+
* `attemptVersion`.
364+
*/
365+
private def incrementallyComputeDomainMetadatas(
366+
oldSnapshot: Option[Snapshot],
367+
oldVersionChecksum: VersionChecksum,
368+
attemptVersion: Long,
369+
actionsToCommit: Seq[Action]): Option[Seq[DomainMetadata]] = {
370+
// Check old DomainMetadatas are available so that we can incrementally compute new.
371+
val oldDomainMetadatas = oldVersionChecksum.domainMetadata
372+
.getOrElse { return None }
373+
val newDomainMetadatas = actionsToCommit.filter(_.isInstanceOf[DomainMetadata])
374+
375+
// We only work with DomainMetadata, so RemoveFile and SetTransaction retention don't matter.
376+
val logReplay = new InMemoryLogReplay(
377+
minFileRetentionTimestamp = 0,
378+
minSetTransactionRetentionTimestamp = None)
379+
380+
val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC)
381+
382+
logReplay.append(attemptVersion - 1, oldDomainMetadatas.iterator)
383+
logReplay.append(attemptVersion, newDomainMetadatas.iterator)
384+
// We don't truncate the set of DomainMetadata actions. Instead, we either store all of them or
385+
// none of them. The advantage of this is that you can then determine presence based on the
386+
// checksum, i.e. if the checksum contains domain metadatas but it doesn't contain the one you
387+
// are looking for, then it's not there.
388+
//
389+
// It's also worth noting that we can distinguish "no domain metadatas" versus
390+
// "domain metadatas not stored" as [[Some]] vs. [[None]].
391+
Some(logReplay.getDomainMetadatas.toSeq).filter(_.size <= threshold)
392+
}
296393
}
297394

298395
object RecordChecksum {

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -513,17 +513,29 @@ class Snapshot(
513513
*/
514514
def computeChecksum: VersionChecksum = VersionChecksum(
515515
txnId = None,
516-
tableSizeBytes = sizeInBytes,
517-
numFiles = numOfFiles,
518-
numMetadata = numOfMetadata,
519-
numProtocol = numOfProtocol,
520516
inCommitTimestampOpt = getInCommitTimestampOpt,
521-
setTransactions = checksumOpt.flatMap(_.setTransactions),
522-
domainMetadata = checksumOpt.flatMap(_.domainMetadata),
523517
metadata = metadata,
524518
protocol = protocol,
525-
histogramOpt = checksumOpt.flatMap(_.histogramOpt),
526-
allFiles = checksumOpt.flatMap(_.allFiles))
519+
allFiles = checksumOpt.flatMap(_.allFiles),
520+
tableSizeBytes = checksumOpt.map(_.tableSizeBytes).getOrElse(sizeInBytes),
521+
numFiles = checksumOpt.map(_.numFiles).getOrElse(numOfFiles),
522+
numMetadata = checksumOpt.map(_.numMetadata).getOrElse(numOfMetadata),
523+
numProtocol = checksumOpt.map(_.numProtocol).getOrElse(numOfProtocol),
524+
// Only return setTransactions and domainMetadata if they are either already present
525+
// in the checksum or if they have already been computed in the current snapshot.
526+
setTransactions = checksumOpt.flatMap(_.setTransactions)
527+
.orElse {
528+
Option.when(_computedStateTriggered &&
529+
// Only extract it from the current snapshot if set transaction
530+
// writes are enabled.
531+
spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) {
532+
setTransactions
533+
}
534+
},
535+
domainMetadata = checksumOpt.flatMap(_.domainMetadata)
536+
.orElse(Option.when(_computedStateTriggered)(domainMetadata)),
537+
histogramOpt = checksumOpt.flatMap(_.histogramOpt)
538+
)
527539

528540
/** Returns the data schema of the table, used for reading stats */
529541
def tableSchema: StructType = metadata.dataSchema
@@ -704,6 +716,7 @@ class DummySnapshot(
704716

705717
override protected lazy val computedState: SnapshotState = initialState(metadata, protocol)
706718
override protected lazy val getInCommitTimestampOpt: Option[Long] = None
719+
_computedStateTriggered = true
707720

708721
// The [[InitialSnapshot]] is not backed by any external commit-coordinator.
709722
override val tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,7 @@ trait SnapshotManagement { self: DeltaLog =>
12711271
// a checksum based on state reconstruction. Disable incremental commit to avoid
12721272
// further error triggers in this session.
12731273
spark.sessionState.conf.setConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED, false)
1274+
spark.sessionState.conf.setConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC, false)
12741275
return createSnapshotAfterCommit(
12751276
initSegment,
12761277
newChecksumOpt = None,

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
6565

6666
// For implicits which re-use Encoder:
6767
import implicits._
68+
/** Whether computedState is already computed or not */
69+
@volatile protected var _computedStateTriggered: Boolean = false
70+
6871

6972
/** A map to look up transaction version by appId. */
7073
lazy val transactions: Map[String, Long] = setTransactions.map(t => t.appId -> t.version).toMap
@@ -114,6 +117,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
114117
throw DeltaErrors.actionNotFoundException("metadata", version)
115118
}
116119

120+
_computedStateTriggered = true
117121
_computedState
118122
}
119123
}

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,31 @@ trait DeltaSQLConfBase {
10731073
.booleanConf
10741074
.createWithDefault(true)
10751075

1076+
val DELTA_WRITE_SET_TRANSACTIONS_IN_CRC =
1077+
buildConf("setTransactionsInCrc.writeOnCommit")
1078+
.internal()
1079+
.doc("When enabled, each commit will incrementally compute and cache all SetTransaction" +
1080+
" actions in the .crc file. Note that this only happens when incremental commits" +
1081+
s" are enabled (${INCREMENTAL_COMMIT_ENABLED.key})")
1082+
.booleanConf
1083+
.createWithDefault(true)
1084+
1085+
val DELTA_MAX_SET_TRANSACTIONS_IN_CRC =
1086+
buildConf("setTransactionsInCrc.maxAllowed")
1087+
.internal()
1088+
.doc("Threshold of the number of SetTransaction actions below which this optimization" +
1089+
" should be enabled")
1090+
.longConf
1091+
.createWithDefault(100)
1092+
1093+
val DELTA_MAX_DOMAIN_METADATAS_IN_CRC =
1094+
buildConf("domainMetadatasInCrc.maxAllowed")
1095+
.internal()
1096+
.doc("Threshold of the number of DomainMetadata actions below which this optimization" +
1097+
" should be enabled")
1098+
.longConf
1099+
.createWithDefault(10)
1100+
10761101
val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
10771102
buildConf("checkpoint.exceptionThrowing.enabled")
10781103
.internal()

0 commit comments

Comments
 (0)