Skip to content

Commit 8eeaeea

Browse files
authored
[Spark] Add drop support for InCommitTimestamp table feature (delta-io#2873)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Adds drop feature support for InCommitTimestamps. When the user runs DROP FEATURE, the following will happen if ICT is present in the PROTOCOL: 1. If any of the ICT-related properties are present, the first commit will: - Set `IN_COMMIT_TIMESTAMPS_ENABLED` = true - Remove `IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION` and `IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP` 2. A second commit will remove ICT from the protocol. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> New tests in the DeltaProtocolVersionSuite for the following scenarios: 1. When ICT is enabled from commit 0 onwards. 2. When ICT is enabled in some commit after 0. 3. Dropping when the feature is not there in protocol 4. Dropping when only one provenance property is present and even the enablement property is not present 5. Dropping when none of the table properties are present ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes. Users will now be able to run ALTER TABLE <> DROP FEATURE inCommitTimestamps-dev on their tables.
1 parent 45ad641 commit 8eeaeea

File tree

3 files changed

+260
-2
lines changed

3 files changed

+260
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
2121
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
2222
import org.apache.spark.sql.delta.metering.DeltaLogging
2323
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
24+
import org.apache.spark.sql.util.ScalaExtensions._
2425

2526
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
2627

@@ -129,6 +130,63 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2)
129130
}
130131
}
131132

133+
case class InCommitTimestampsPreDowngradeCommand(table: DeltaTableV2)
134+
extends PreDowngradeTableFeatureCommand
135+
with DeltaLogging {
136+
/**
137+
* We disable the feature by:
138+
* - Removing the table properties:
139+
* 1. DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP
140+
* 2. DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION
141+
* - Setting the table property DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED to false.
142+
* Technically, only setting IN_COMMIT_TIMESTAMPS_ENABLED to false is enough to disable the
143+
* feature. However, we can use this opportunity to clean up the metadata.
144+
*
145+
* @return true if any change to the metadata (the three properties listed above) was made.
146+
* False otherwise.
147+
*/
148+
override def removeFeatureTracesIfNeeded(): Boolean = {
149+
val startTimeNs = System.nanoTime()
150+
val currentMetadata = table.initialSnapshot.metadata
151+
val currentTableProperties = currentMetadata.configuration
152+
153+
val enablementProperty = DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
154+
val ictEnabledInMetadata = enablementProperty.fromMetaData(currentMetadata)
155+
val provenanceProperties = Seq(
156+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key,
157+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key)
158+
val propertiesToRemove = provenanceProperties.filter(currentTableProperties.contains)
159+
160+
val traceRemovalNeeded = propertiesToRemove.nonEmpty || ictEnabledInMetadata
161+
if (traceRemovalNeeded) {
162+
val propertiesToDisable =
163+
Option.when(ictEnabledInMetadata)(enablementProperty.key -> "false")
164+
val desiredTableProperties = currentTableProperties
165+
.filterNot{ case (k, _) => propertiesToRemove.contains(k) } ++ propertiesToDisable
166+
167+
val deltaOperation = DeltaOperations.UnsetTableProperties(
168+
(propertiesToRemove ++ propertiesToDisable.map(_._1)).toSeq, ifExists = true)
169+
table.startTransaction().commit(
170+
Seq(currentMetadata.copy(configuration = desiredTableProperties.toMap)), deltaOperation)
171+
}
172+
173+
val provenancePropertiesPresenceLogs = provenanceProperties.map { prop =>
174+
prop -> currentTableProperties.contains(prop).toString
175+
}
176+
recordDeltaEvent(
177+
table.deltaLog,
178+
opType = "delta.inCommitTimestampFeatureRemovalMetrics",
179+
data = Map(
180+
"downgradeTimeMs" -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs),
181+
"traceRemovalNeeded" -> traceRemovalNeeded.toString,
182+
enablementProperty.key -> ictEnabledInMetadata
183+
) ++ provenancePropertiesPresenceLogs
184+
185+
)
186+
traceRemovalNeeded
187+
}
188+
}
189+
132190
case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
133191
extends PreDowngradeTableFeatureCommand
134192
with DeltaLogging {

spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,8 @@ object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening
671671
*/
672672
object InCommitTimestampTableFeature
673673
extends WriterFeature(name = "inCommitTimestamp-dev")
674-
with FeatureAutomaticallyEnabledByMetadata {
674+
with FeatureAutomaticallyEnabledByMetadata
675+
with RemovableFeature {
675676

676677
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
677678

@@ -680,6 +681,31 @@ object InCommitTimestampTableFeature
680681
spark: SparkSession): Boolean = {
681682
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)
682683
}
684+
685+
override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
686+
InCommitTimestampsPreDowngradeCommand(table)
687+
688+
689+
/**
690+
* As per the spec, we can disable ICT by just setting
691+
* [[DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED]] to `false`. There is no need to remove the
692+
* provenance properties. However, [[InCommitTimestampsPreDowngradeCommand]] will try to remove
693+
* these properties because they can be removed as part of the same metadata update that sets
694+
* [[DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED]] to `false`. We check all three properties here
695+
* as well for consistency.
696+
*/
697+
override def validateRemoval(snapshot: Snapshot): Boolean = {
698+
val provenancePropertiesAbsent = Seq(
699+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key,
700+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key)
701+
.forall(!snapshot.metadata.configuration.contains(_))
702+
val ictEnabledInMetadata =
703+
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot.metadata)
704+
provenancePropertiesAbsent && !ictEnabledInMetadata
705+
}
706+
707+
// Writer features should directly return false, as it is only used for reader+writer features.
708+
override def actionUsesFeature(action: Action): Boolean = false
683709
}
684710

685711
/**

spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.nio.file.{Files, Paths, StandardOpenOption}
2222
import java.util.Locale
2323
import java.util.concurrent.TimeUnit
2424

25-
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions}
25+
import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord}
2626
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
2727
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
2828
import org.apache.spark.sql.delta.actions._
@@ -3534,6 +3534,180 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
35343534
testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true)
35353535
}
35363536

3537+
private def validateICTRemovalMetrics(
3538+
usageLogs: Seq[UsageRecord],
3539+
expectEnablementProperty: Boolean,
3540+
expectProvenanceTimestampProperty: Boolean,
3541+
expectProvenanceVersionProperty: Boolean): Unit = {
3542+
val dropFeatureBlob = usageLogs
3543+
.find(_.tags.get("opType").contains("delta.inCommitTimestampFeatureRemovalMetrics"))
3544+
.getOrElse(fail("Expected a log for inCommitTimestampFeatureRemovalMetrics"))
3545+
val blob = JsonUtils.fromJson[Map[String, String]](dropFeatureBlob.blob)
3546+
assert(blob.contains("downgradeTimeMs"))
3547+
val traceRemovalNeeded = expectEnablementProperty || expectProvenanceTimestampProperty ||
3548+
expectProvenanceVersionProperty
3549+
assert(blob.get("traceRemovalNeeded").contains(traceRemovalNeeded.toString))
3550+
assert(blob
3551+
.get(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key)
3552+
.contains(expectEnablementProperty.toString))
3553+
assert(blob
3554+
.get(DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key)
3555+
.contains(expectProvenanceTimestampProperty.toString))
3556+
assert(blob
3557+
.get(DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key)
3558+
.contains(expectProvenanceVersionProperty.toString))
3559+
}
3560+
3561+
test("drop InCommitTimestamp -- ICT enabled from commit 0") {
3562+
withTempDir { dir =>
3563+
val featureEnablementKey = DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key
3564+
spark.sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta" +
3565+
s" TBLPROPERTIES ('${featureEnablementKey}' = 'true')")
3566+
val deltaLog = DeltaLog.forTable(spark, dir)
3567+
val featurePropertyKey = InCommitTimestampTableFeature.name
3568+
3569+
val usageLogs = Log4jUsageLogger.track {
3570+
AlterTableDropFeatureDeltaCommand(
3571+
DeltaTableV2(spark, deltaLog.dataPath),
3572+
featurePropertyKey)
3573+
.run(spark)
3574+
}
3575+
3576+
val snapshot = deltaLog.update()
3577+
// Writer feature is removed from the writer features set.
3578+
assert(!snapshot.protocol.writerFeatureNames.contains(featurePropertyKey))
3579+
assert(!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot.metadata))
3580+
validateICTRemovalMetrics(
3581+
usageLogs,
3582+
expectEnablementProperty = true,
3583+
expectProvenanceTimestampProperty = false,
3584+
expectProvenanceVersionProperty = false)
3585+
3586+
// Running the command again should throw an exception.
3587+
val e = intercept[DeltaTableFeatureException] {
3588+
AlterTableDropFeatureDeltaCommand(
3589+
DeltaTableV2(spark, deltaLog.dataPath),
3590+
featurePropertyKey)
3591+
.run(spark)
3592+
}
3593+
assert(e.getErrorClass == "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")
3594+
}
3595+
}
3596+
3597+
test("drop InCommitTimestamp -- ICT enabled after commit 0") {
3598+
withTempDir { dir =>
3599+
val featureEnablementKey = DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key
3600+
val featurePropertyKey = InCommitTimestampTableFeature.name
3601+
sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta " +
3602+
s"TBLPROPERTIES ('${featureEnablementKey}' = 'false')")
3603+
val deltaLog = DeltaLog.forTable(spark, dir)
3604+
assert(!deltaLog.snapshot.metadata.configuration.contains(featurePropertyKey))
3605+
3606+
sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` " +
3607+
s"SET TBLPROPERTIES ('${featureEnablementKey}' = 'true')")
3608+
val snapshotV1 = deltaLog.update()
3609+
assert(snapshotV1.protocol.writerFeatureNames.contains(featurePropertyKey))
3610+
assert(snapshotV1.metadata.configuration.contains(featureEnablementKey))
3611+
val ictProvenanceProperties = Seq(
3612+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key,
3613+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key)
3614+
ictProvenanceProperties.foreach(prop =>
3615+
assert(snapshotV1.metadata.configuration.contains(prop)))
3616+
3617+
val usageLogs = Log4jUsageLogger.track {
3618+
AlterTableDropFeatureDeltaCommand(
3619+
DeltaTableV2(spark, deltaLog.dataPath),
3620+
featurePropertyKey)
3621+
.run(spark)
3622+
}
3623+
3624+
val snapshot = deltaLog.update()
3625+
// Writer feature is removed from the writer features set.
3626+
assert(!snapshot.protocol.writerFeatureNames.contains(featurePropertyKey))
3627+
assert(!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot.metadata))
3628+
// The provenance properties should also have been removed.
3629+
ictProvenanceProperties.foreach(prop =>
3630+
assert(!snapshot.metadata.configuration.contains(prop)))
3631+
validateICTRemovalMetrics(
3632+
usageLogs,
3633+
expectEnablementProperty = true,
3634+
expectProvenanceTimestampProperty = true,
3635+
expectProvenanceVersionProperty = true)
3636+
}
3637+
}
3638+
3639+
test("drop InCommitTimestamp --- only one table property") {
3640+
withTempDir { dir =>
3641+
// Dropping the ICT table feature should also remove any ICT provenance
3642+
// table properties even when the ICT enablement table property is not present.
3643+
spark.sql(
3644+
s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta" +
3645+
s" TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')")
3646+
val deltaLog = DeltaLog.forTable(spark, dir)
3647+
// Remove the enablement property.
3648+
AlterTableUnsetPropertiesDeltaCommand(
3649+
DeltaTableV2(spark, deltaLog.dataPath),
3650+
Seq(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key),
3651+
ifExists = true).run(spark)
3652+
// Set the IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION property.
3653+
AlterTableSetPropertiesDeltaCommand(
3654+
DeltaTableV2(spark, deltaLog.dataPath),
3655+
Map(DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key -> "1")).run(spark)
3656+
val snapshot1 = deltaLog.update()
3657+
assert(snapshot1.protocol.writerFeatureNames.contains(InCommitTimestampTableFeature.name))
3658+
// Ensure that the enablement property is not set.
3659+
assert(!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot1.metadata))
3660+
assert(snapshot1.metadata.configuration.contains(
3661+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key))
3662+
3663+
val usageLogs = Log4jUsageLogger.track {
3664+
AlterTableDropFeatureDeltaCommand(
3665+
DeltaTableV2(spark, deltaLog.dataPath),
3666+
InCommitTimestampTableFeature.name)
3667+
.run(spark)
3668+
}
3669+
val snapshot2 = deltaLog.update()
3670+
assert(!snapshot2.protocol.writerFeatureNames.contains(InCommitTimestampTableFeature.name))
3671+
assert(!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot2.metadata))
3672+
assert(!snapshot2.metadata.configuration.contains(
3673+
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key))
3674+
validateICTRemovalMetrics(
3675+
usageLogs,
3676+
expectEnablementProperty = false,
3677+
expectProvenanceTimestampProperty = false,
3678+
expectProvenanceVersionProperty = true)
3679+
}
3680+
}
3681+
3682+
test("drop InCommitTimestamp --- no table property") {
3683+
withTempDir { dir =>
3684+
spark.sql(
3685+
s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta" +
3686+
s" TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')")
3687+
val deltaLog = DeltaLog.forTable(spark, dir)
3688+
// Remove the enablement property.
3689+
AlterTableUnsetPropertiesDeltaCommand(
3690+
DeltaTableV2(spark, deltaLog.dataPath),
3691+
Seq(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key),
3692+
ifExists = true).run(spark)
3693+
3694+
val usageLogs = Log4jUsageLogger.track {
3695+
AlterTableDropFeatureDeltaCommand(
3696+
DeltaTableV2(spark, deltaLog.dataPath),
3697+
InCommitTimestampTableFeature.name)
3698+
.run(spark)
3699+
}
3700+
val snapshot = deltaLog.update()
3701+
assert(!snapshot.protocol.writerFeatureNames.contains(InCommitTimestampTableFeature.name))
3702+
assert(!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(snapshot.metadata))
3703+
validateICTRemovalMetrics(
3704+
usageLogs,
3705+
expectEnablementProperty = false,
3706+
expectProvenanceTimestampProperty = false,
3707+
expectProvenanceVersionProperty = false)
3708+
}
3709+
}
3710+
35373711
// Create a table for testing that has an unsupported feature.
35383712
private def withTestTableWithUnsupportedWriterFeature(
35393713
emptyTable: Boolean)(testCode: String => Unit): Unit = {

0 commit comments

Comments
 (0)