Skip to content

Commit d719f1e

Browse files
authored
[Delta] Migrate DeltaLogMinorCompactionSuite to CatalogOwned (delta-io#4751)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR migrates DeltaLogMinorCompactionSuite to CatalogOwned. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing UTs. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 5d40e18 commit d719f1e

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ package org.apache.spark.sql.delta
1818

1919
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
2020
import org.apache.spark.sql.delta.actions._
21-
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
21+
import org.apache.spark.sql.delta.coordinatedcommits.{CatalogOwnedTableUtils, CatalogOwnedTestBaseSuite}
2222
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2323
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2424
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
2525
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
2626
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
2727
import org.apache.hadoop.fs.Path
2828

29+
import org.apache.spark.SparkConf
2930
import org.apache.spark.sql._
3031
import org.apache.spark.sql.functions.col
3132
import org.apache.spark.sql.test.SharedSparkSession
@@ -35,16 +36,19 @@ class DeltaLogMinorCompactionSuite extends QueryTest
3536
with SharedSparkSession
3637
with DeltaSQLCommandTest
3738
with DeltaSQLTestUtils
38-
with CoordinatedCommitsBaseSuite {
39+
with CatalogOwnedTestBaseSuite {
3940

4041
/** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */
4142
private def minorCompactDeltaLog(
4243
tablePath: String,
4344
startVersion: Long,
4445
endVersion: Long): Unit = {
4546
val deltaLog = DeltaLog.forTable(spark, tablePath)
46-
deltaLog.update().tableCommitCoordinatorClientOpt.foreach { tableCommitCoordinatorClient =>
47-
tableCommitCoordinatorClient.backfillToVersion(endVersion)
47+
CatalogOwnedTableUtils.populateTableCommitCoordinatorFromCatalog(
48+
spark,
49+
catalogTableOpt = None,
50+
snapshot = deltaLog.unsafeVolatileSnapshot).foreach { tcc =>
51+
tcc.backfillToVersion(endVersion)
4852
}
4953
val logReplay = new InMemoryLogReplay(
5054
minFileRetentionTimestamp = 0,
@@ -441,17 +445,26 @@ class DeltaLogMinorCompactionSuite extends QueryTest
441445
}
442446
}
443447

444-
class DeltaLogMinorCompactionWithCoordinatedCommitsBatch1Suite
448+
class DeltaLogMinorCompactionWithCatalogOwnedBatch1Suite
445449
extends DeltaLogMinorCompactionSuite {
446-
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
450+
override val catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(1)
451+
452+
override protected def sparkConf: SparkConf = super.sparkConf
453+
.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, "false")
447454
}
448455

449-
class DeltaLogMinorCompactionWithCoordinatedCommitsBatch2Suite
456+
class DeltaLogMinorCompactionWithCatalogOwnedBatch2Suite
450457
extends DeltaLogMinorCompactionSuite {
451-
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(2)
458+
override val catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(2)
459+
460+
override protected def sparkConf: SparkConf = super.sparkConf
461+
.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, "false")
452462
}
453463

454-
class DeltaLogMinorCompactionWithCoordinatedCommitsBatch100Suite
464+
class DeltaLogMinorCompactionWithCatalogOwnedBatch100Suite
455465
extends DeltaLogMinorCompactionSuite {
456-
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
466+
override val catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(100)
467+
468+
override protected def sparkConf: SparkConf = super.sparkConf
469+
.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, "false")
457470
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,7 @@ class DeltaVacuumSuite extends DeltaVacuumSuiteBase with DeltaSQLCommandTest {
14911491
}
14921492

14931493
test("running vacuum on a catalog owned managed table should fail") {
1494+
CatalogOwnedCommitCoordinatorProvider.clearBuilders()
14941495
CatalogOwnedCommitCoordinatorProvider.registerBuilder(
14951496
"spark_catalog", TrackingInMemoryCommitCoordinatorBuilder(batchSize = 3))
14961497
withTable("t1") {

spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.apache.spark.sql.delta.coordinatedcommits
1818

19+
import java.io.File
1920
import java.net.URI
2021
import java.util.{Optional, UUID}
2122

@@ -57,10 +58,20 @@ trait UCCommitCoordinatorClientSuiteBase extends CommitCoordinatorClientImplSuit
5758
protected var ucClient: UCClient = _
5859

5960
@Mock
60-
protected val mockFactory: UCClientFactory = mock(classOf[UCClientFactory])
61+
protected var mockFactory: UCClientFactory = _
6162

6263
protected var ucCommitCoordinator: InMemoryUCCommitCoordinator = _
6364

65+
protected override def beforeAll(): Unit = {
66+
val tmpDirName = System.getProperty("java.io.tmpdir")
67+
val tmpDir = new File(tmpDirName)
68+
if (!tmpDir.exists()) {
69+
tmpDir.mkdirs()
70+
}
71+
super.beforeAll()
72+
mockFactory = mock(classOf[UCClientFactory])
73+
}
74+
6475
override def beforeEach(): Unit = {
6576
super.beforeEach()
6677
tableUUID = UUID.randomUUID()

0 commit comments

Comments
 (0)