Skip to content

Commit adce7a4

Browse files
authored
[Spark] Some refactoring to clean up OptimisticTransaction (delta-io#4735)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR does the following: - Change how `needsCheckpoint` is set. - Avoid using `var`s in `DeltaTransaction` trait. - `DeltaTransaction` will be renamed later and become a helper trait to contains miscellaneous stuff, moving some coordinated commits helper methods to there now. ## How was this patch tested? Existing UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent 0d1600e commit adce7a4

File tree

2 files changed

+78
-77
lines changed

2 files changed

+78
-77
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTransaction.scala

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.util.control.NonFatal
2121

2222
import org.apache.spark.sql.delta.DeltaOperations.Operation
2323
import org.apache.spark.sql.delta.actions.{AddFile, CommitInfo, Metadata, Protocol}
24+
import org.apache.spark.sql.delta.coordinatedcommits.{CatalogOwnedTableUtils, TableCommitCoordinatorClient}
2425
import org.apache.spark.sql.delta.hooks.PostCommitHook
2526
import org.apache.spark.sql.delta.logging.DeltaLogKeys
2627
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -63,15 +64,6 @@ trait DeltaTransaction extends DeltaLogging {
6364
DeltaConfigs.ISOLATION_LEVEL.fromMetaData(metadata)
6465
}
6566

66-
/** Whether the txn should trigger a checkpoint after the commit */
67-
private[delta] var needsCheckpoint = false
68-
69-
/** The set of distinct partitions that contain added files by current transaction. */
70-
protected[delta] var partitionsAddedToOpt: Option[mutable.HashSet[Map[String, String]]] = None
71-
72-
/** True if this transaction is a blind append. This is only valid after commit. */
73-
protected[delta] var isBlindAppend: Boolean = false
74-
7567
/**
7668
* Return the user-defined metadata for the operation.
7769
*/
@@ -86,13 +78,74 @@ trait DeltaTransaction extends DeltaLogging {
8678
/** The current spark session */
8779
protected def spark: SparkSession = SparkSession.active
8880

81+
private[delta] lazy val readSnapshotTableCommitCoordinatorClientOpt:
82+
Option[TableCommitCoordinatorClient] = {
83+
if (snapshot.isCatalogOwned) {
84+
// Catalog owned table's commit coordinator is always determined by the catalog.
85+
CatalogOwnedTableUtils.populateTableCommitCoordinatorFromCatalog(
86+
spark, catalogTable, snapshot)
87+
} else {
88+
// The commit-coordinator of a table shouldn't change. If it is changed by a concurrent
89+
// commit, then it will be detected as a conflict and the transaction will anyway fail.
90+
snapshot.getTableCommitCoordinatorForWrites
91+
}
92+
}
93+
94+
def createCoordinatedCommitsStats(newProtocol: Option[Protocol]) : CoordinatedCommitsStats = {
95+
val (coordinatedCommitsType, metadataToUse) =
96+
readSnapshotTableCommitCoordinatorClientOpt match {
97+
// TODO: Capture the CO -> FS downgrade case when we start
98+
// supporting downgrade for CO.
99+
case Some(_) if snapshot.isCatalogOwned => // CO commit
100+
(CoordinatedCommitType.CO_COMMIT, snapshot.metadata)
101+
case Some(_) if metadata.coordinatedCommitsCoordinatorName.isEmpty => // CC -> FS
102+
(CoordinatedCommitType.CC_TO_FS_DOWNGRADE_COMMIT, snapshot.metadata)
103+
// Only the 0th commit to a table can be a FS -> CO upgrade for now.
104+
// Upgrading an existing FS table to CO through ALTER TABLE is not supported yet.
105+
case None if newProtocol.exists(_.readerAndWriterFeatureNames
106+
.contains(CatalogOwnedTableFeature.name)) => // FS -> CO
107+
(CoordinatedCommitType.FS_TO_CO_UPGRADE_COMMIT, metadata)
108+
case None if metadata.coordinatedCommitsCoordinatorName.isDefined => // FS -> CC
109+
(CoordinatedCommitType.FS_TO_CC_UPGRADE_COMMIT, metadata)
110+
case Some(_) => // CC commit
111+
(CoordinatedCommitType.CC_COMMIT, snapshot.metadata)
112+
case None => // FS commit
113+
(CoordinatedCommitType.FS_COMMIT, snapshot.metadata)
114+
// Errors out in rest of the cases.
115+
case _ =>
116+
throw new IllegalStateException(
117+
"Unexpected state found when trying " +
118+
s"to generate CoordinatedCommitsStats for table ${deltaLog.logPath}. " +
119+
s"$readSnapshotTableCommitCoordinatorClientOpt, " +
120+
s"$metadata, $snapshot, $catalogTable")
121+
}
122+
CoordinatedCommitsStats(
123+
coordinatedCommitsType = coordinatedCommitsType.toString,
124+
commitCoordinatorName = if (Set(CoordinatedCommitType.CO_COMMIT,
125+
CoordinatedCommitType.FS_TO_CO_UPGRADE_COMMIT).contains(coordinatedCommitsType)) {
126+
// The catalog for FS -> CO upgrade commit would be
127+
// "CATALOG_EMPTY" because `catalogTable` is not available
128+
// for the 0th FS commit.
129+
catalogTable.flatMap { ct =>
130+
CatalogOwnedTableUtils.getCatalogName(
131+
spark,
132+
identifier = ct.identifier)
133+
}.getOrElse("CATALOG_MISSING")
134+
} else {
135+
metadataToUse.coordinatedCommitsCoordinatorName.getOrElse("NONE")
136+
},
137+
// For Catalog-Owned table, the coordinator conf for UC-CC is [[Map.empty]]
138+
// so we don't distinguish between CO/CC here.
139+
commitCoordinatorConf = metadataToUse.coordinatedCommitsCoordinatorConf)
140+
}
141+
89142
/**
90-
* Sets needsCheckpoint if we should checkpoint the version that has just been committed.
143+
* Determines if we should checkpoint the version that has just been committed.
91144
*/
92-
protected def setNeedsCheckpoint(
93-
committedVersion: Long, postCommitSnapshot: Snapshot): Unit = {
145+
protected def isCheckpointNeeded(
146+
committedVersion: Long, postCommitSnapshot: Snapshot): Boolean = {
94147
def checkpointInterval = deltaLog.checkpointInterval(postCommitSnapshot.metadata)
95-
needsCheckpoint = committedVersion != 0 && committedVersion % checkpointInterval == 0
148+
committedVersion != 0 && committedVersion % checkpointInterval == 0
96149
}
97150

98151
/** Runs a post-commit hook, handling any exceptions that occur. */

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 12 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,9 @@ trait OptimisticTransactionImpl extends DeltaTransaction
334334

335335
protected var commitInfo: CommitInfo = _
336336

337+
/** Whether the txn should trigger a checkpoint after the commit */
338+
private[delta] var needsCheckpoint = false
339+
337340
// Whether this transaction is creating a new table.
338341
private var isCreatingNewTable: Boolean = false
339342

@@ -462,26 +465,19 @@ trait OptimisticTransactionImpl extends DeltaTransaction
462465
}
463466
}
464467

468+
/** The set of distinct partitions that contain added files by current transaction. */
469+
protected[delta] var partitionsAddedToOpt: Option[mutable.HashSet[Map[String, String]]] = None
470+
471+
/** True if this transaction is a blind append. This is only valid after commit. */
472+
protected[delta] var isBlindAppend: Boolean = false
473+
465474
/**
466475
* The logSegment of the snapshot prior to the commit.
467476
* Will be updated only when retrying due to a conflict.
468477
*/
469478
private[delta] var preCommitLogSegment: LogSegment =
470479
snapshot.logSegment.copy(checkpointProvider = snapshot.checkpointProvider)
471480

472-
private[delta] val readSnapshotTableCommitCoordinatorClientOpt:
473-
Option[TableCommitCoordinatorClient] = {
474-
if (snapshot.isCatalogOwned) {
475-
// Catalog owned table's commit coordinator is always determined by the catalog.
476-
CatalogOwnedTableUtils.populateTableCommitCoordinatorFromCatalog(
477-
spark, catalogTable, snapshot)
478-
} else {
479-
// The commit-coordinator of a table shouldn't change. If it is changed by a concurrent
480-
// commit, then it will be detected as a conflict and the transaction will anyway fail.
481-
snapshot.getTableCommitCoordinatorForWrites
482-
}
483-
}
484-
485481
/**
486482
* Generates a timestamp which is greater than the commit timestamp
487483
* of the last snapshot. Note that this is only needed when the
@@ -1801,7 +1797,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
18011797
numDistinctPartitionsInAdd = -1, // not tracking distinct partitions as of now
18021798
numPartitionColumnsInTable = postCommitSnapshot.metadata.partitionColumns.size,
18031799
isolationLevel = Serializable.toString,
1804-
coordinatedCommitsInfo = createCoordinatedCommitsStats(),
1800+
coordinatedCommitsInfo = createCoordinatedCommitsStats(newProtocol),
18051801
numOfDomainMetadatas = numOfDomainMetadatas,
18061802
txnId = Some(txnId))
18071803

@@ -1840,54 +1836,6 @@ trait OptimisticTransactionImpl extends DeltaTransaction
18401836
}
18411837
}
18421838

1843-
def createCoordinatedCommitsStats(): CoordinatedCommitsStats = {
1844-
val (coordinatedCommitsType, metadataToUse) =
1845-
readSnapshotTableCommitCoordinatorClientOpt match {
1846-
// TODO: Capture the CO -> FS downgrade case when we start
1847-
// supporting downgrade for CO.
1848-
case Some(_) if snapshot.isCatalogOwned => // CO commit
1849-
(CoordinatedCommitType.CO_COMMIT, snapshot.metadata)
1850-
case Some(_) if metadata.coordinatedCommitsCoordinatorName.isEmpty => // CC -> FS
1851-
(CoordinatedCommitType.CC_TO_FS_DOWNGRADE_COMMIT, snapshot.metadata)
1852-
// Only the 0th commit to a table can be a FS -> CO upgrade for now.
1853-
// Upgrading an existing FS table to CO through ALTER TABLE is not supported yet.
1854-
case None if this.newProtocol.exists(_.readerAndWriterFeatureNames
1855-
.contains(CatalogOwnedTableFeature.name)) => // FS -> CO
1856-
(CoordinatedCommitType.FS_TO_CO_UPGRADE_COMMIT, metadata)
1857-
case None if metadata.coordinatedCommitsCoordinatorName.isDefined => // FS -> CC
1858-
(CoordinatedCommitType.FS_TO_CC_UPGRADE_COMMIT, metadata)
1859-
case Some(_) => // CC commit
1860-
(CoordinatedCommitType.CC_COMMIT, snapshot.metadata)
1861-
case None => // FS commit
1862-
(CoordinatedCommitType.FS_COMMIT, snapshot.metadata)
1863-
// Errors out in rest of the cases.
1864-
case _ =>
1865-
throw new IllegalStateException(
1866-
"Unexpected state found when trying " +
1867-
s"to generate CoordinatedCommitsStats for table ${deltaLog.logPath}. " +
1868-
s"$readSnapshotTableCommitCoordinatorClientOpt, " +
1869-
s"$metadata, $snapshot, $catalogTable")
1870-
}
1871-
CoordinatedCommitsStats(
1872-
coordinatedCommitsType = coordinatedCommitsType.toString,
1873-
commitCoordinatorName = if (Set(CoordinatedCommitType.CO_COMMIT,
1874-
CoordinatedCommitType.FS_TO_CO_UPGRADE_COMMIT).contains(coordinatedCommitsType)) {
1875-
// The catalog for FS -> CO upgrade commit would be
1876-
// "CATALOG_EMPTY" because `catalogTable` is not available
1877-
// for the 0th FS commit.
1878-
catalogTable.flatMap { ct =>
1879-
CatalogOwnedTableUtils.getCatalogName(
1880-
spark,
1881-
identifier = ct.identifier)
1882-
}.getOrElse("CATALOG_MISSING")
1883-
} else {
1884-
metadataToUse.coordinatedCommitsCoordinatorName.getOrElse("NONE")
1885-
},
1886-
// For Catalog-Owned table, the coordinator conf for UC-CC is [[Map.empty]]
1887-
// so we don't distinguish between CO/CC here.
1888-
commitCoordinatorConf = metadataToUse.coordinatedCommitsCoordinatorConf)
1889-
}
1890-
18911839
/**
18921840
* Splits a transaction into smaller child transactions that operate on disjoint sets of the files
18931841
* read by the parent transaction. This function is typically used when you want to break a large
@@ -2463,7 +2411,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
24632411
collectAutoOptimizeStats(numAdd, numRemove, actions.iterator)
24642412
val info = currentTransactionInfo.commitInfo
24652413
.map(_.copy(readVersion = None, isolationLevel = None)).orNull
2466-
setNeedsCheckpoint(attemptVersion, postCommitSnapshot)
2414+
needsCheckpoint = isCheckpointNeeded(attemptVersion, postCommitSnapshot)
24672415
val doCollectCommitStats =
24682416
needsCheckpoint || spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_FORCE_ALL_COMMIT_STATS)
24692417

@@ -2499,7 +2447,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
24992447
numDistinctPartitionsInAdd = distinctPartitions.size,
25002448
numPartitionColumnsInTable = postCommitSnapshot.metadata.partitionColumns.size,
25012449
isolationLevel = isolationLevel.toString,
2502-
coordinatedCommitsInfo = createCoordinatedCommitsStats(),
2450+
coordinatedCommitsInfo = createCoordinatedCommitsStats(newProtocol),
25032451
numOfDomainMetadatas = numOfDomainMetadatas,
25042452
txnId = Some(txnId))
25052453
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats)

0 commit comments

Comments
 (0)