Skip to content

Commit 4b2a0e6

Browse files
authored
[Spark] Extends ChecksumSuite w/ CatalogOwned & Migrates to name-based testing (delta-io#4840)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR extends `ChecksumSuite` w/ CatalogOwned & migrates existing UTs to name-based. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Through existing and newly added suites. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? N/A <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent b12a8b2 commit 4b2a0e6

File tree

2 files changed

+72
-31
lines changed

2 files changed

+72
-31
lines changed

spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,46 @@ import java.util.TimeZone
2121

2222
import com.databricks.spark.util.Log4jUsageLogger
2323
import org.apache.spark.sql.delta.DeltaTestUtils._
24+
import org.apache.spark.sql.delta.coordinatedcommits.CatalogOwnedTestBaseSuite
2425
import org.apache.spark.sql.delta.sources.DeltaSQLConf
25-
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
26+
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
2627
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
2728
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
28-
import org.apache.hadoop.fs.Path
2929

3030
import org.apache.spark.SparkConf
3131
import org.apache.spark.sql.QueryTest
3232
import org.apache.spark.sql.SaveMode
33+
import org.apache.spark.sql.catalyst.TableIdentifier
3334
import org.apache.spark.sql.functions.col
3435
import org.apache.spark.sql.test.SharedSparkSession
3536

3637
class ChecksumSuite
3738
extends QueryTest
3839
with SharedSparkSession
3940
with DeltaTestUtilsBase
40-
with DeltaSQLCommandTest {
41+
with DeltaSQLCommandTest
42+
with DeltaSQLTestUtils
43+
with CatalogOwnedTestBaseSuite {
4144

4245
override def sparkConf: SparkConf = super.sparkConf
4346
.set(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS, false)
4447

4548
test(s"A Checksum should be written after every commit when " +
4649
s"${DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key} is true") {
4750
def testChecksumFile(writeChecksumEnabled: Boolean): Unit = {
48-
withTempDir { tempDir =>
51+
// Set up the log by explicitly creating the table otherwise we can't
52+
// construct the DeltaLog via the table name.
53+
withTempTable(createTable = true) { tableName =>
4954
withSQLConf(
5055
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> writeChecksumEnabled.toString) {
5156
def checksumExists(deltaLog: DeltaLog, version: Long): Boolean = {
5257
val checksumFile = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
5358
checksumFile.exists()
5459
}
5560

56-
// Setup the log
57-
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
58-
val initialTxn = log.startTransaction()
59-
initialTxn.commitManually(createTestAddFile())
60-
6161
// Commit the txn
62-
val txn = log.startTransaction()
62+
val log = DeltaLog.forTable(spark, TableIdentifier(tableName))
63+
val txn = log.startTransaction(log.initialCatalogTable)
6364
val txnCommitVersion = txn.commit(Seq.empty, DeltaOperations.Truncate())
6465
assert(checksumExists(log, txnCommitVersion) == writeChecksumEnabled)
6566
}
@@ -82,13 +83,13 @@ class ChecksumSuite
8283
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false",
8384
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString
8485
) {
85-
withTempDir { tempDir =>
86+
withTempTable(createTable = false) { tableName =>
8687
// Set the timezone to UTC to avoid triggering force verification of all files in CRC
8788
// for non utc environments.
8889
setTimeZone("UTC")
8990
val df = spark.range(1)
90-
df.write.format("delta").mode("append").save(tempDir.getCanonicalPath)
91-
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
91+
df.write.format("delta").mode("append").saveAsTable(tableName)
92+
val log = DeltaLog.forTable(spark, TableIdentifier(tableName))
9293
log
9394
.startTransaction()
9495
.commit(Seq(createTestAddFile()), DeltaOperations.Write(SaveMode.Append))
@@ -107,7 +108,7 @@ class ChecksumSuite
107108
}
108109

109110
def testIncrementalChecksumWrites(tableMutationOperation: String => Unit): Unit = {
110-
withTempDir { tempDir =>
111+
withTempTable(createTable = false) { tableName =>
111112
withSQLConf(
112113
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
113114
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key ->"true") {
@@ -116,10 +117,10 @@ class ChecksumSuite
116117
.format("delta")
117118
.partitionBy("id")
118119
.mode("append")
119-
.save(tempDir.getCanonicalPath)
120+
.saveAsTable(tableName)
120121

121-
tableMutationOperation(tempDir.getCanonicalPath)
122-
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
122+
tableMutationOperation(tableName)
123+
val log = DeltaLog.forTable(spark, TableIdentifier(tableName))
123124
val checksumOpt = log.snapshot.checksumOpt
124125
assert(checksumOpt.isDefined)
125126
val checksum = checksumOpt.get
@@ -130,20 +131,20 @@ class ChecksumSuite
130131
}
131132

132133
test("Incremental checksums: INSERT") {
133-
testIncrementalChecksumWrites { tablePath =>
134-
sql(s"INSERT INTO delta.`$tablePath` SELECT *, 1 FROM range(10, 20)")
134+
testIncrementalChecksumWrites { tableName =>
135+
sql(s"INSERT INTO $tableName SELECT *, 1 FROM range(10, 20)")
135136
}
136137
}
137138

138139
test("Incremental checksums: UPDATE") {
139-
testIncrementalChecksumWrites { tablePath =>
140-
sql(s"UPDATE delta.`$tablePath` SET id2 = id + 1 WHERE id % 2 = 0")
140+
testIncrementalChecksumWrites { tableName =>
141+
sql(s"UPDATE $tableName SET id2 = id + 1 WHERE id % 2 = 0")
141142
}
142143
}
143144

144145
test("Incremental checksums: DELETE") {
145-
testIncrementalChecksumWrites { tablePath =>
146-
sql(s"DELETE FROM delta.`$tablePath` WHERE id % 2 = 0")
146+
testIncrementalChecksumWrites { tableName =>
147+
sql(s"DELETE FROM $tableName WHERE id % 2 = 0")
147148
}
148149
}
149150

@@ -155,14 +156,18 @@ class ChecksumSuite
155156
// or Metadata will trigger a failure earlier than the full validation.
156157
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "false"
157158
) {
158-
withTempDir { tempDir =>
159-
spark.range(10).write.format("delta").save(tempDir.getCanonicalPath)
159+
withTempTable(createTable = false) { tableName =>
160+
spark
161+
.range(10)
162+
.write
163+
.format("delta")
164+
.saveAsTable(tableName)
160165
spark.range(1)
161166
.write
162167
.format("delta")
163168
.mode("append")
164-
.save(tempDir.getCanonicalPath)
165-
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
169+
.saveAsTable(tableName)
170+
val log = DeltaLog.forTable(spark, TableIdentifier(tableName))
166171
val checksumOpt = log.readChecksum(1)
167172
assert(checksumOpt.isDefined)
168173
val checksum = checksumOpt.get
@@ -181,7 +186,7 @@ class ChecksumSuite
181186
Seq(corruptedChecksumJson).toIterator,
182187
overwrite = true)
183188
DeltaLog.clearCache()
184-
val log2 = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
189+
val log2 = DeltaLog.forTable(spark, TableIdentifier(tableName))
185190
val usageLogs = Log4jUsageLogger.track {
186191
intercept[DeltaIllegalStateException] {
187192
log2.checkpoint()
@@ -216,7 +221,9 @@ class ChecksumSuite
216221
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false",
217222
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
218223
) {
219-
withTempDir { tempDir =>
224+
// Explicitly create the table w/o any AddFile for the subsequent
225+
// DeltaLog construction.
226+
withTempTable(createTable = true) { tableName =>
220227
import testImplicits._
221228
val numAddFiles = 10
222229

@@ -232,9 +239,12 @@ class ChecksumSuite
232239
// 5. Create a new snapshot and manually validate the .crc
233240

234241
val files = (1 to numAddFiles).map(i => createTestAddFile(encodedPath = i.toString))
235-
DeltaLog.forTable(spark, tempDir).startTransaction().commitWriteAppend(files: _*)
242+
DeltaLog
243+
.forTable(spark, TableIdentifier(tableName))
244+
.startTransaction()
245+
.commitWriteAppend(files: _*)
236246

237-
val log = DeltaLog.forTable(spark, tempDir)
247+
val log = DeltaLog.forTable(spark, TableIdentifier(tableName))
238248
val txn = log.startTransaction()
239249
val expected =
240250
s"""Table size (bytes) - Expected: ${2*numAddFiles} Computed: $numAddFiles
@@ -252,3 +262,15 @@ class ChecksumSuite
252262
}
253263
}
254264
}
265+
266+
class ChecksumWithCatalogOwnedBatch1Suite extends ChecksumSuite {
267+
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(1)
268+
}
269+
270+
class ChecksumWithCatalogOwnedBatch2Suite extends ChecksumSuite {
271+
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(2)
272+
}
273+
274+
class ChecksumWithCatalogOwnedBatch100Suite extends ChecksumSuite {
275+
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(100)
276+
}

spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql.delta.test
1818

1919
import java.io.File
20+
import java.util.UUID
2021

2122
import scala.util.Random
2223

@@ -85,6 +86,24 @@ trait DeltaSQLTestUtils extends SQLTestUtils {
8586
}
8687
}
8788

89+
/**
90+
* Creates a temporary table with a unique name for testing and executes a function with it.
91+
* The table is automatically cleaned up after the function completes.
92+
*
93+
* @param createTable Whether to create an empty table.
94+
* @param f The function to execute with the generated table name.
95+
*/
96+
protected def withTempTable(createTable: Boolean)(f: String => Unit): Unit = {
97+
val tableName = s"test_table_${UUID.randomUUID().toString.filterNot(_ == '-')}"
98+
99+
withTable(tableName) {
100+
if (createTable) {
101+
spark.sql(s"CREATE TABLE $tableName (id LONG) USING delta")
102+
}
103+
f(tableName)
104+
}
105+
}
106+
88107
/** Returns random alphanumberic string to be used as a unique table name. */
89108
def uniqueTableName: String = Random.alphanumeric.take(10).mkString
90109

0 commit comments

Comments
 (0)