Skip to content

Commit 4619af7

Browse files
[SPARK] Managed Commit support for cold and hot snapshot update (delta-io#2755)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR adds support for cold and hot snapshot update for managed-commits. ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent a11b92d commit 4619af7

File tree

7 files changed

+493
-95
lines changed

7 files changed

+493
-95
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

Lines changed: 102 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ trait SnapshotManagement { self: DeltaLog =>
5959

6060
@volatile private[delta] var asyncUpdateTask: Future[Unit] = _
6161

62+
/** Use ReentrantLock to allow us to call `lockInterruptibly` */
63+
protected val snapshotLock = new ReentrantLock()
64+
6265
/**
6366
* Cached fileStatus for the latest CRC file seen in the deltaLog.
6467
*/
6568
@volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None
6669
@volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit
6770

68-
/** Use ReentrantLock to allow us to call `lockInterruptibly` */
69-
protected val snapshotLock = new ReentrantLock()
70-
7171
/**
7272
* Run `body` inside `snapshotLock` lock using `lockInterruptibly` so that the thread
7373
* can be interrupted when waiting for the lock.
@@ -81,20 +81,6 @@ trait SnapshotManagement { self: DeltaLog =>
8181
}
8282
}
8383

84-
/**
85-
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
86-
* initialization, or None if the directory was empty/missing.
87-
*
88-
* @param startingCheckpoint A checkpoint that we can start our listing from
89-
*/
90-
protected def getLogSegmentFrom(
91-
startingCheckpoint: Option[LastCheckpointInfo]): Option[LogSegment] = {
92-
getLogSegmentForVersion(
93-
versionToLoad = None,
94-
lastCheckpointInfo = startingCheckpoint
95-
)
96-
}
97-
9884
/** Get an iterator of files in the _delta_log directory starting with the startVersion. */
9985
private[delta] def listFrom(startVersion: Long): Iterator[FileStatus] = {
10086
store.listFrom(listingPrefix(logPath, startVersion), newDeltaHadoopConf())
@@ -231,11 +217,11 @@ trait SnapshotManagement { self: DeltaLog =>
231217
* @return Some LogSegment to build a Snapshot if files do exist after the given
232218
* startCheckpoint. None, if the directory was missing or empty.
233219
*/
234-
protected def getLogSegmentForVersion(
220+
protected def createLogSegment(
235221
versionToLoad: Option[Long] = None,
236222
oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None,
237-
lastCheckpointInfo: Option[LastCheckpointInfo] = None,
238-
commitStoreOpt: Option[CommitStore] = None): Option[LogSegment] = {
223+
commitStoreOpt: Option[CommitStore] = None,
224+
lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment] = {
239225
// List based on the last known checkpoint version.
240226
// if that is -1, list from version 0L
241227
val lastCheckpointVersion = getCheckpointVersion(lastCheckpointInfo, oldCheckpointProviderOpt)
@@ -253,6 +239,12 @@ trait SnapshotManagement { self: DeltaLog =>
253239
)
254240
}
255241

242+
private def createLogSegment(previousSnapshot: Snapshot): Option[LogSegment] = {
243+
createLogSegment(
244+
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
245+
commitStoreOpt = previousSnapshot.commitStoreOpt)
246+
}
247+
256248
/**
257249
* Returns the last known checkpoint version based on [[LastCheckpointInfo]] or
258250
* [[CheckpointProvider]].
@@ -324,7 +316,7 @@ trait SnapshotManagement { self: DeltaLog =>
324316
// deleting files. Either way, we can't safely continue.
325317
//
326318
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
327-
// recursive call to [[getLogSegmentForVersion]] below.
319+
// recursive call to [[createLogSegment]] below.
328320
Array.empty[FileStatus]
329321
}
330322

@@ -335,7 +327,7 @@ trait SnapshotManagement { self: DeltaLog =>
335327
} else if (newFiles.isEmpty) {
336328
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
337329
// singleton, so try listing from the first version
338-
return getLogSegmentForVersion(versionToLoad = versionToLoad)
330+
return createLogSegment(versionToLoad = versionToLoad)
339331
}
340332
val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile)
341333
val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile)
@@ -498,30 +490,20 @@ trait SnapshotManagement { self: DeltaLog =>
498490
* file as a hint on where to start listing the transaction log directory. If the _delta_log
499491
* directory doesn't exist, this method will return an `InitialSnapshot`.
500492
*/
501-
protected def getSnapshotAtInit: CapturedSnapshot = {
493+
protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly {
502494
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
503-
val currentTimestamp = clock.getTimeMillis()
495+
val snapshotInitWallclockTime = clock.getTimeMillis()
504496
val lastCheckpointOpt = readLastCheckpointFile()
505-
createSnapshotAtInitInternal(
506-
initSegment = getLogSegmentFrom(lastCheckpointOpt),
507-
timestamp = currentTimestamp
508-
)
509-
}
510-
}
511-
512-
protected def createSnapshotAtInitInternal(
513-
initSegment: Option[LogSegment],
514-
timestamp: Long): CapturedSnapshot = {
515-
val snapshot = initSegment.map { segment =>
516-
val snapshot = createSnapshot(
517-
initSegment = segment,
518-
checksumOpt = None)
519-
snapshot
520-
}.getOrElse {
521-
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
522-
new InitialSnapshot(logPath, this)
497+
val initialSegmentForNewSnapshot = createLogSegment(
498+
versionToLoad = None,
499+
lastCheckpointInfo = lastCheckpointOpt)
500+
val snapshot = getUpdatedSnapshot(
501+
oldSnapshotOpt = None,
502+
initialSegmentForNewSnapshot = initialSegmentForNewSnapshot,
503+
initialCommitStore = None,
504+
isAsync = false)
505+
CapturedSnapshot(snapshot, snapshotInitWallclockTime)
523506
}
524-
CapturedSnapshot(snapshot, timestamp)
525507
}
526508

527509
/**
@@ -696,7 +678,7 @@ trait SnapshotManagement { self: DeltaLog =>
696678
* Instead, just do a general update to the latest available version. The racing commits
697679
* can then use the version check short-circuit to avoid constructing a new snapshot.
698680
*/
699-
getLogSegmentForVersion(
681+
createLogSegment(
700682
oldCheckpointProviderOpt = Some(oldCheckpointProvider),
701683
commitStoreOpt = commitStoreOpt
702684
).getOrElse {
@@ -907,48 +889,95 @@ trait SnapshotManagement { self: DeltaLog =>
907889
*/
908890
protected def updateInternal(isAsync: Boolean): Snapshot =
909891
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
910-
val updateTimestamp = clock.getTimeMillis()
892+
val updateStartTimeMs = clock.getTimeMillis()
911893
val previousSnapshot = currentSnapshot.snapshot
912-
val segmentOpt = getLogSegmentForVersion(
913-
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
914-
commitStoreOpt = previousSnapshot.commitStoreOpt)
915-
installLogSegmentInternal(previousSnapshot, segmentOpt, updateTimestamp, isAsync)
894+
val segmentOpt = createLogSegment(previousSnapshot)
895+
val newSnapshot = getUpdatedSnapshot(
896+
oldSnapshotOpt = Some(previousSnapshot),
897+
initialSegmentForNewSnapshot = segmentOpt,
898+
initialCommitStore = previousSnapshot.commitStoreOpt,
899+
isAsync = isAsync)
900+
installSnapshot(newSnapshot, updateStartTimeMs)
901+
}
902+
903+
/**
904+
* Updates and installs a new snapshot in the `currentSnapshot`.
905+
* This method takes care of recursively creating new snapshots if the commit store has changed.
906+
* @param oldSnapshotOpt The previous snapshot, if any.
907+
* @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot
908+
* @param initialCommitStore the Commit Store used for constructing the
909+
* `initialSegmentForNewSnapshot`
910+
* @param isAsync Whether the update is async.
911+
* @return The new snapshot.
912+
*/
913+
protected def getUpdatedSnapshot(
914+
oldSnapshotOpt: Option[Snapshot],
915+
initialSegmentForNewSnapshot: Option[LogSegment],
916+
initialCommitStore: Option[CommitStore],
917+
isAsync: Boolean): Snapshot = {
918+
var commitStoreUsed = initialCommitStore
919+
var newSnapshot = getSnapshotForLogSegmentInternal(
920+
oldSnapshotOpt,
921+
initialSegmentForNewSnapshot,
922+
isAsync
923+
)
924+
// If the commit store has changed, we need to again invoke updateSnapshot so that we
925+
// could get the latest commits from the new commit store. We need to do it only once as
926+
// the delta spec mandates the commit which changes the commit owner to be backfilled.
927+
if (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) {
928+
commitStoreUsed = newSnapshot.commitStoreOpt
929+
val segmentOpt = createLogSegment(newSnapshot)
930+
newSnapshot = getSnapshotForLogSegmentInternal(Some(newSnapshot), segmentOpt, isAsync)
916931
}
932+
newSnapshot
933+
}
917934

918-
/** Install the provided segmentOpt as the currentSnapshot on the cluster */
919-
protected def installLogSegmentInternal(
920-
previousSnapshot: Snapshot,
935+
/** Creates a Snapshot for the given `segmentOpt` */
936+
protected def getSnapshotForLogSegmentInternal(
937+
previousSnapshotOpt: Option[Snapshot],
921938
segmentOpt: Option[LogSegment],
922-
updateTimestamp: Long,
923939
isAsync: Boolean): Snapshot = {
924940
segmentOpt.map { segment =>
925-
if (segment == previousSnapshot.logSegment) {
926-
// If no changes were detected, just refresh the timestamp
927-
val timestampToUse = math.max(updateTimestamp, currentSnapshot.updateTimestamp)
928-
currentSnapshot = currentSnapshot.copy(updateTimestamp = timestampToUse)
941+
if (previousSnapshotOpt.exists(_.logSegment == segment)) {
942+
previousSnapshotOpt.get
929943
} else {
930944
val newSnapshot = createSnapshot(
931945
initSegment = segment,
932946
checksumOpt = None)
933-
logMetadataTableIdChange(previousSnapshot, newSnapshot)
947+
previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot))
934948
logInfo(s"Updated snapshot to $newSnapshot")
935-
replaceSnapshot(newSnapshot, updateTimestamp)
949+
newSnapshot
936950
}
937951
}.getOrElse {
938-
logInfo(s"No delta log found for the Delta table at $logPath")
939-
replaceSnapshot(new InitialSnapshot(logPath, this), updateTimestamp)
952+
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
953+
new InitialSnapshot(logPath, this)
940954
}
941-
currentSnapshot.snapshot
942955
}
943956

944-
/** Replace the given snapshot with the provided one. */
945-
protected def replaceSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Unit = {
957+
/** Installs the given `newSnapshot` as the `currentSnapshot` */
958+
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
946959
if (!snapshotLock.isHeldByCurrentThread) {
960+
if (Utils.isTesting) {
961+
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
962+
}
947963
recordDeltaEvent(this, "delta.update.unsafeReplace")
948964
}
949-
val oldSnapshot = currentSnapshot.snapshot
950-
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
951-
oldSnapshot.uncache()
965+
if (currentSnapshot == null) {
966+
// cold snapshot initialization
967+
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
968+
return newSnapshot
969+
}
970+
val CapturedSnapshot(oldSnapshot, oldTimestamp) = currentSnapshot
971+
if (oldSnapshot eq newSnapshot) {
972+
// Same snapshot as before, so just refresh the timestamp
973+
val timestampToUse = math.max(updateTimestamp, oldTimestamp)
974+
currentSnapshot = CapturedSnapshot(newSnapshot, timestampToUse)
975+
} else {
976+
// Install the new snapshot and uncache the old one
977+
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
978+
oldSnapshot.uncache()
979+
}
980+
newSnapshot
952981
}
953982

954983
/** Log a change in the metadata's table id whenever we install a newer version of a snapshot */
@@ -1022,8 +1051,7 @@ trait SnapshotManagement { self: DeltaLog =>
10221051
committedVersion)
10231052
logMetadataTableIdChange(previousSnapshot, newSnapshot)
10241053
logInfo(s"Updated snapshot to $newSnapshot")
1025-
replaceSnapshot(newSnapshot, updateTimestamp)
1026-
currentSnapshot.snapshot
1054+
installSnapshot(newSnapshot, updateTimestamp)
10271055
}
10281056
}
10291057

@@ -1045,7 +1073,7 @@ trait SnapshotManagement { self: DeltaLog =>
10451073
// fallback to the other overload.
10461074
return getSnapshotAt(version)
10471075
}
1048-
val segment = getLogSegmentForVersion(
1076+
val segment = createLogSegment(
10491077
versionToLoad = Some(version),
10501078
oldCheckpointProviderOpt = Some(lastCheckpointProvider)
10511079
).getOrElse {
@@ -1073,7 +1101,7 @@ trait SnapshotManagement { self: DeltaLog =>
10731101
.collect { case ci if ci.version <= version => ci }
10741102
.orElse(findLastCompleteCheckpointBefore(version))
10751103
.map(manuallyLoadCheckpoint)
1076-
getLogSegmentForVersion(
1104+
createLogSegment(
10771105
versionToLoad = Some(version),
10781106
lastCheckpointInfo = lastCheckpointInfoHint
10791107
).map { segment =>
@@ -1085,6 +1113,9 @@ trait SnapshotManagement { self: DeltaLog =>
10851113
throw DeltaErrors.emptyDirectoryException(logPath.toString)
10861114
}
10871115
}
1116+
1117+
// Visible for testing
1118+
private[delta] def getCapturedSnapshot(): CapturedSnapshot = currentSnapshot
10881119
}
10891120

10901121
object SnapshotManagement {

spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
4343
* Commit a given `commitFile` to the table represented by given `logPath` at the
4444
* given `commitVersion`
4545
*/
46-
protected def commitImpl(
46+
private[delta] def commitImpl(
4747
logStore: LogStore,
4848
hadoopConf: Configuration,
4949
logPath: Path,

spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}
2727

2828
class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingCommitStore {
2929

30-
private[managedcommit] class PerTableData {
30+
private[managedcommit] class PerTableData(var maxCommitVersion: Long = -1) {
3131
// Map from version to Commit data
3232
val commitsMap: mutable.SortedMap[Long, Commit] = mutable.SortedMap.empty
3333
// We maintain maxCommitVersion explicitly since commitsMap might be empty
3434
// if all commits for a table have been backfilled.
35-
var maxCommitVersion: Long = -1
3635
val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
3736
}
3837

@@ -71,7 +70,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
7170
* @throws CommitFailedException if the commit version is not the expected next version,
7271
* indicating a version conflict.
7372
*/
74-
protected def commitImpl(
73+
private[delta] def commitImpl(
7574
logStore: LogStore,
7675
hadoopConf: Configuration,
7776
logPath: Path,
@@ -126,6 +125,15 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
126125
versionsToRemove.foreach(tableData.commitsMap.remove)
127126
}
128127
}
128+
129+
def registerTable(
130+
logPath: Path,
131+
maxCommitVersion: Long): Unit = {
132+
val newPerTableData = new PerTableData(maxCommitVersion)
133+
if (perTableMap.putIfAbsent(logPath, newPerTableData) != null) {
134+
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
135+
}
136+
}
129137
}
130138

131139
/**

spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -936,13 +936,6 @@ class CheckpointsSuite
936936
assert(filterUsageRecords(usageRecords2, "delta.log.cleanup").size > 0)
937937
}
938938
}
939-
940-
protected def filterUsageRecords(
941-
usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
942-
usageRecords.filter { r =>
943-
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
944-
}
945-
}
946939
}
947940

948941
/**

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.concurrent
2525
import scala.reflect.ClassTag
2626
import scala.util.matching.Regex
2727

28+
import com.databricks.spark.util.UsageRecord
2829
import org.apache.spark.sql.delta.DeltaTestUtils.Plans
2930
import org.apache.spark.sql.delta.actions._
3031
import org.apache.spark.sql.delta.commands.cdc.CDCReader
@@ -155,6 +156,13 @@ trait DeltaTestUtilsBase {
155156
jobs.values.count(_ > 0)
156157
}
157158

159+
/** Filter `usageRecords` by the `opType` tag or field. */
160+
def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
161+
usageRecords.filter { r =>
162+
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
163+
}
164+
}
165+
158166
protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
159167
// The expected plan for touched file computation is of the format below.
160168
// The data column should be pruned from both leaves.

0 commit comments

Comments
 (0)