Skip to content

Commit c23746b

Browse files
authored
[Kernel] [CatalogManaged] [UC] UCCatalogManagedClient loadTable tests (delta-io#4838)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4838/files) to review incremental changes. - [**stack/uc_kernel_catalog_managed_3**](delta-io#4838) [[Files changed](https://github.com/delta-io/delta/pull/4838/files)] --------- #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Use the UCCatalogManagedClient, instantiated with an InMemoryUCClient, to load a ResolvedTable from a "real" UC catalog-managed table (a published delta and two staged deltas that the InMemoryUCClient acknowledges as ratified commits). ## How was this patch tested? See above -- test only changes. ## Does this PR introduce _any_ user-facing changes? No.
1 parent daf2a40 commit c23746b

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

unity/src/test/scala/io/delta/unity/InMemoryUCClient.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ object InMemoryUCClient {
3737
* internal mutable state. This class is designed to be safely accessed by multiple threads
3838
* concurrently.
3939
*/
40-
class TableData {
41-
private var maxRatifiedVersion = -1L
42-
private val commits: ArrayBuffer[Commit] = ArrayBuffer.empty
40+
class TableData(
41+
private var maxRatifiedVersion: Long = -1L,
42+
private val commits: ArrayBuffer[Commit] = ArrayBuffer.empty) {
4343

4444
/** @return the maximum ratified version, or -1 if no commits have been made. */
4545
def getMaxRatifiedVersion: Long = synchronized {
@@ -152,6 +152,13 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
152152

153153
override def close(): Unit = {}
154154

155+
private[unity] def createTableIfNotExistsOrThrow(
156+
ucTableId: String,
157+
tableData: TableData): Unit = {
158+
Option(tables.putIfAbsent(ucTableId, tableData))
159+
.foreach(_ => throw new IllegalArgumentException(s"Table $ucTableId already exists"))
160+
}
161+
155162
private[unity] def getTablesCopy: Map[String, TableData] = {
156163
tables.asScala.toMap
157164
}

unity/src/test/scala/io/delta/unity/UCCatalogManagedClientSuite.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,56 @@
1717
package io.delta.unity
1818

1919
import scala.collection.JavaConverters._
20+
import scala.collection.mutable.ArrayBuffer
2021

2122
import io.delta.kernel.internal.files.ParsedLogData.ParsedLogType
23+
import io.delta.kernel.internal.table.ResolvedTableInternal
24+
import io.delta.kernel.internal.tablefeatures.TableFeatures.{CATALOG_MANAGED_R_W_FEATURE_PREVIEW, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}
25+
import io.delta.kernel.internal.util.FileNames
26+
import io.delta.storage.commit.Commit
2227

28+
import org.apache.hadoop.conf.Configuration
29+
import org.apache.hadoop.fs.{FileSystem, Path}
2330
import org.scalatest.funsuite.AnyFunSuite
2431

2532
/** Unit tests for [[UCCatalogManagedClient]]. */
2633
class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
2734

35+
private def testCatalogManagedTable(versionToLoad: Long): Unit = {
36+
// Step 1: Create the in-memory table data (ratified commits v1, v2)
37+
val tablePath = getTestResourceFilePath("catalog-owned-preview")
38+
val ucClient = new InMemoryUCClient("ucMetastoreId")
39+
val fs = FileSystem.get(new Configuration())
40+
val catalogCommits = Seq(
41+
// scalastyle:off line.size.limit
42+
getTestResourceFilePath("catalog-owned-preview/_delta_log/_staged_commits/00000000000000000001.4cb9708e-b478-44de-b203-53f9ba9b2876.json"),
43+
getTestResourceFilePath("catalog-owned-preview/_delta_log/_staged_commits/00000000000000000002.5b9bba4a-0085-430d-a65e-b0d38c1afbe9.json"))
44+
// scalastyle:on line.size.limit
45+
.map { path => fs.getFileStatus(new Path(path)) }
46+
.map { fileStatus =>
47+
new Commit(
48+
FileNames.deltaVersion(fileStatus.getPath.toString),
49+
fileStatus,
50+
fileStatus.getModificationTime)
51+
}
52+
val tableData = new InMemoryUCClient.TableData(2, ArrayBuffer(catalogCommits: _*))
53+
ucClient.createTableIfNotExistsOrThrow("ucTableId", tableData)
54+
55+
// Step 2: Load the table using UCCatalogManagedClient at the desired versionToLoad
56+
val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient)
57+
val resolvedTable = ucCatalogManagedClient
58+
.loadTable(defaultEngine, "ucTableId", tablePath, versionToLoad)
59+
.asInstanceOf[ResolvedTableInternal]
60+
61+
// Step 3: Validate
62+
val protocol = resolvedTable.getProtocol
63+
assert(resolvedTable.getVersion == versionToLoad)
64+
assert(protocol.getMinReaderVersion == TABLE_FEATURES_MIN_READER_VERSION)
65+
assert(protocol.getMinWriterVersion == TABLE_FEATURES_MIN_WRITER_VERSION)
66+
assert(protocol.getReaderFeatures.contains(CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName()))
67+
assert(protocol.getWriterFeatures.contains(CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName()))
68+
}
69+
2870
test("constructor throws on invalid input") {
2971
assertThrows[NullPointerException] {
3072
new UCCatalogManagedClient(null)
@@ -49,6 +91,26 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU
4991
}
5092
}
5193

94+
test("loadTable throws if version to load is greater than max ratified version") {
95+
val exMsg = intercept[IllegalArgumentException] {
96+
testCatalogManagedTable(versionToLoad = 9L)
97+
}.getMessage
98+
99+
assert(exMsg.contains("Cannot load table version 9 as the latest version ratified by UC is 2"))
100+
}
101+
102+
test("loadTable correctly loads a UC table -- versionToLoad is a ratified commit (the max)") {
103+
testCatalogManagedTable(versionToLoad = 2L)
104+
}
105+
106+
test("loadTable correctly loads a UC table -- versionToLoad is a ratified commit (not the max)") {
107+
testCatalogManagedTable(versionToLoad = 1L)
108+
}
109+
110+
test("loadTable correctly loads a UC table -- versionToLoad is a published commit") {
111+
testCatalogManagedTable(versionToLoad = 0L)
112+
}
113+
52114
test("converts UC Commit into Kernel ParsedLogData.RATIFIED_STAGED_COMMIT") {
53115
val ucCommit = createCommit(1)
54116
val hadoopFS = ucCommit.getFileStatus

0 commit comments

Comments
 (0)