Skip to content

Commit 6cd345a

Browse files
authored
[Spark] Rename DeltaTransaction to TransactionHelper (delta-io#4741)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Continue from delta-io#4735. This PR renames `DeltaTransaction` to `TransactionHelper` and some additional clean-ups. ## How was this patch tested? Existing UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent d5f7bdf commit 6cd345a

File tree

4 files changed

+61
-55
lines changed

4 files changed

+61
-55
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.delta.redirect.{RedirectFeature, TableRedirectConfig
4444
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
4545
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
4646
import org.apache.spark.sql.delta.stats._
47-
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils}
47+
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils, TransactionHelper}
4848
import org.apache.spark.sql.util.ScalaExtensions._
4949
import io.delta.storage.commit._
5050
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
@@ -254,7 +254,7 @@ object OptimisticTransaction {
254254
*
255255
* This trait is not thread-safe.
256256
*/
257-
trait OptimisticTransactionImpl extends DeltaTransaction
257+
trait OptimisticTransactionImpl extends TransactionHelper
258258
with TransactionalWrite
259259
with SQLMetricsReporting
260260
with DeltaScanGenerator
@@ -478,18 +478,6 @@ trait OptimisticTransactionImpl extends DeltaTransaction
478478
private[delta] var preCommitLogSegment: LogSegment =
479479
snapshot.logSegment.copy(checkpointProvider = snapshot.checkpointProvider)
480480

481-
/**
482-
* Generates a timestamp which is greater than the commit timestamp
483-
* of the last snapshot. Note that this is only needed when the
484-
* feature `inCommitTimestamps` is enabled.
485-
*/
486-
protected[delta] def generateInCommitTimestampForFirstCommitAttempt(
487-
currentTimestamp: Long): Option[Long] =
488-
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
489-
val lastCommitTimestamp = snapshot.timestamp
490-
math.max(currentTimestamp, lastCommitTimestamp + 1)
491-
}
492-
493481
/** The end to end execution time of this transaction. */
494482
def txnExecutionTimeMs: Option[Long] = if (commitEndNano == -1) {
495483
None
@@ -2591,14 +2579,6 @@ trait OptimisticTransactionImpl extends DeltaTransaction
25912579
protected def incrementallyDeriveChecksum(
25922580
attemptVersion: Long,
25932581
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = {
2594-
// Don't include [[AddFile]]s in CRC if this commit is modifying the schema of table in some
2595-
// way. This is to make sure we don't carry any DROPPED column from previous CRC to this CRC
2596-
// forever and can start fresh from next commit.
2597-
// If the oldSnapshot itself is missing, we don't incrementally compute the checksum.
2598-
val allFilesInCrcWritePathEnabled =
2599-
Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot) &&
2600-
(snapshot.version == -1 || snapshot.metadata.schema == metadata.schema)
2601-
26022582
incrementallyDeriveChecksum(
26032583
spark,
26042584
deltaLog,
@@ -2609,7 +2589,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
26092589
operationName = currentTransactionInfo.op.name,
26102590
txnIdOpt = Some(currentTransactionInfo.txnId),
26112591
previousVersionState = scala.Left(snapshot),
2612-
includeAddFilesInCrc = allFilesInCrcWritePathEnabled
2592+
includeAddFilesInCrc = Snapshot.shouldIncludeAddFilesInCrc(spark, snapshot, metadata)
26132593
).toOption
26142594
}
26152595

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -608,22 +608,7 @@ class Snapshot(
608608

609609
/** Return the set of properties of the table. */
610610
def getProperties: mutable.Map[String, String] = {
611-
val base = new mutable.LinkedHashMap[String, String]()
612-
metadata.configuration.foreach { case (k, v) =>
613-
if (k != "path") {
614-
base.put(k, v)
615-
}
616-
}
617-
base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString)
618-
base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString)
619-
if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
620-
val features = protocol.readerAndWriterFeatureNames.map(name =>
621-
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" ->
622-
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)
623-
base ++ features.toSeq.sorted
624-
} else {
625-
base
626-
}
611+
Snapshot.getProperties(metadata, protocol)
627612
}
628613

629614
/** The [[CheckpointProvider]] for the underlying checkpoint */
@@ -797,6 +782,40 @@ object Snapshot extends DeltaLogging {
797782
val shouldVerify = verificationConfEnabled || allFilesInCrcVerificationForceEnabled(spark)
798783
allFilesInCrcWritePathEnabled(spark, snapshot) && shouldVerify
799784
}
785+
786+
/**
787+
* Don't include [[AddFile]]s in CRC if this commit is modifying the schema of table in some
788+
* way. This is to make sure we don't carry any DROPPED column from previous CRC to this CRC
789+
* forever and can start fresh from next commit.
790+
* If the oldSnapshot itself is missing, we don't incrementally compute the checksum.
791+
*/
792+
private[delta] def shouldIncludeAddFilesInCrc(
793+
spark: SparkSession, snapshot: Snapshot, metadata: Metadata): Boolean = {
794+
allFilesInCrcWritePathEnabled(spark, snapshot) &&
795+
(snapshot.version == -1 || snapshot.metadata.schema == metadata.schema)
796+
}
797+
798+
/**
799+
* Return the set of properties for a given metadata and protocol.
800+
*/
801+
def getProperties(metadata: Metadata, protocol: Protocol): mutable.Map[String, String] = {
802+
val base = new mutable.LinkedHashMap[String, String]()
803+
metadata.configuration.foreach { case (k, v) =>
804+
if (k != "path") {
805+
base.put(k, v)
806+
}
807+
}
808+
base.put(Protocol.MIN_READER_VERSION_PROP, protocol.minReaderVersion.toString)
809+
base.put(Protocol.MIN_WRITER_VERSION_PROP, protocol.minWriterVersion.toString)
810+
if (protocol.supportsReaderFeatures || protocol.supportsWriterFeatures) {
811+
val features = protocol.readerAndWriterFeatureNames.map(name =>
812+
s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}$name" ->
813+
TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED)
814+
base ++ features.toSeq.sorted
815+
} else {
816+
base
817+
}
818+
}
800819
}
801820

802821
/**

spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompactUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.hooks
1919
import scala.collection.mutable
2020

2121
// scalastyle:off import.ordering.noEmptyLine
22-
import org.apache.spark.sql.delta.{DeltaLog, DeltaTransaction, Snapshot}
22+
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
2323
import org.apache.spark.sql.delta.CommittedTransaction
2424
import org.apache.spark.sql.delta.metering.DeltaLogging
2525
import org.apache.spark.sql.delta.sources.DeltaSQLConf

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTransaction.scala renamed to spark/src/main/scala/org/apache/spark/sql/delta/util/TransactionHelper.scala

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,36 +14,34 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.apache.spark.sql.delta
17+
package org.apache.spark.sql.delta.util
1818

19-
import scala.collection.mutable
2019
import scala.util.control.NonFatal
2120

21+
import org.apache.spark.sql.delta.{CatalogOwnedTableFeature, CommittedTransaction, CoordinatedCommitsStats, CoordinatedCommitType, DeltaConfigs, DeltaLog, IsolationLevel, Snapshot}
2222
import org.apache.spark.sql.delta.DeltaOperations.Operation
23-
import org.apache.spark.sql.delta.actions.{AddFile, CommitInfo, Metadata, Protocol}
23+
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
2424
import org.apache.spark.sql.delta.coordinatedcommits.{CatalogOwnedTableUtils, TableCommitCoordinatorClient}
2525
import org.apache.spark.sql.delta.hooks.PostCommitHook
2626
import org.apache.spark.sql.delta.logging.DeltaLogKeys
2727
import org.apache.spark.sql.delta.metering.DeltaLogging
2828
import org.apache.spark.sql.delta.sources.DeltaSQLConf
29+
import org.apache.spark.sql.util.ScalaExtensions._
2930

3031
import org.apache.spark.internal.MDC
3132
import org.apache.spark.sql.SparkSession
3233
import org.apache.spark.sql.catalyst.catalog.CatalogTable
3334

3435
/**
35-
* Represents a transaction that maps to a delta table commit.
36-
*
37-
* An instance of this trait tracks the reads and writes as well as accumulates additional
38-
* information such as statistics of a single table throughout the life of a transaction.
36+
* Contains helper methods for Delta transactions.
3937
*/
40-
trait DeltaTransaction extends DeltaLogging {
41-
val deltaLog: DeltaLog
42-
val catalogTable: Option[CatalogTable]
43-
val snapshot: Snapshot
38+
trait TransactionHelper extends DeltaLogging {
39+
def deltaLog: DeltaLog
40+
def catalogTable: Option[CatalogTable]
41+
def snapshot: Snapshot
4442

4543
/** Unique identifier for the transaction */
46-
val txnId: String
44+
def txnId: String
4745

4846
/**
4947
* Returns the metadata for this transaction. The metadata refers to the metadata of the snapshot
@@ -54,9 +52,6 @@ trait DeltaTransaction extends DeltaLogging {
5452
/** The protocol of the snapshot that this transaction is reading at. */
5553
def protocol: Protocol
5654

57-
/** The end to end execution time of this transaction. */
58-
def txnExecutionTimeMs: Option[Long]
59-
6055
/**
6156
* Default [[IsolationLevel]] as set in table metadata.
6257
*/
@@ -168,4 +163,16 @@ trait DeltaTransaction extends DeltaLogging {
168163
hook.handleError(spark, e, version)
169164
}
170165
}
166+
167+
/**
168+
* Generates a timestamp which is greater than the commit timestamp
169+
* of the last snapshot. Note that this is only needed when the
170+
* feature `inCommitTimestamps` is enabled.
171+
*/
172+
protected[delta] def generateInCommitTimestampForFirstCommitAttempt(
173+
currentTimestamp: Long): Option[Long] =
174+
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
175+
val lastCommitTimestamp = snapshot.timestamp
176+
math.max(currentTimestamp, lastCommitTimestamp + 1)
177+
}
171178
}

0 commit comments

Comments
 (0)