Skip to content

Commit b12a8b2

Browse files
authored
[Kernel] [CatalogManaged] [UC] InMemoryUCClient (delta-io#4835)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4835/files) to review incremental changes. - [**stack/uc_kernel_catalog_managed_2_in_memory_test_client_b**](delta-io#4835) [[Files changed](https://github.com/delta-io/delta/pull/4835/files)] - [stack/uc_kernel_catalog_managed_3](delta-io#4838) [[Files changed](https://github.com/delta-io/delta/pull/4838/files/e9eeef9043b562c6e7eac81f95da0b3e94f44a9c..c3662f9ddfb192244e89f9c7d15c56dbe063cb45)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Create an `InMemoryUCClient` and a simple test suite ## How was this patch tested? New test suite ## Does this PR introduce _any_ user-facing changes? No
1 parent 46c09f7 commit b12a8b2

File tree

5 files changed

+334
-6
lines changed

5 files changed

+334
-6
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ lazy val unity = (project in file("unity"))
732732
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided",
733733
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
734734
),
735-
unidocSourceFilePatterns += SourceFilePattern("io/delta/unity/"),
735+
unidocSourceFilePatterns += SourceFilePattern("src/main/java/io/delta/unity/"),
736736
).configureUnidoc()
737737

738738
// TODO javastyle tests
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.unity
18+
19+
import java.lang.{Long => JLong}
20+
import java.net.URI
21+
import java.util.Optional
22+
import java.util.concurrent.ConcurrentHashMap
23+
24+
import scala.collection.JavaConverters._
25+
import scala.collection.mutable.ArrayBuffer
26+
27+
import io.delta.storage.commit.{Commit, CommitFailedException, GetCommitsResponse}
28+
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
29+
import io.delta.storage.commit.uccommitcoordinator.{InvalidTargetTableException, UCClient}
30+
31+
object InMemoryUCClient {
32+
33+
/**
34+
* Internal data structure to track table state including commits and version information.
35+
*
36+
* Thread Safety: All public methods are synchronized to ensure thread-safe access to the
37+
* internal mutable state. This class is designed to be safely accessed by multiple threads
38+
* concurrently.
39+
*/
40+
class TableData {
41+
private var maxRatifiedVersion = -1L
42+
private val commits: ArrayBuffer[Commit] = ArrayBuffer.empty
43+
44+
/** @return the maximum ratified version, or -1 if no commits have been made. */
45+
def getMaxRatifiedVersion: Long = synchronized {
46+
maxRatifiedVersion
47+
}
48+
49+
/** @return An immutable list of all commits. */
50+
def getCommits: List[Commit] = synchronized { commits.toList }
51+
52+
/** @return commits filtered by version range. */
53+
def getCommitsInRange(
54+
startVersion: Optional[JLong],
55+
endVersion: Optional[JLong]): List[Commit] = synchronized {
56+
commits
57+
.filter { commit =>
58+
startVersion.orElse(0L) <= commit.getVersion &&
59+
commit.getVersion <= endVersion.orElse(Long.MaxValue)
60+
}
61+
.toList
62+
}
63+
64+
/** Appends a new commit to this table. */
65+
def appendCommit(commit: Commit): Unit = synchronized {
66+
val expectedCommitVersion = maxRatifiedVersion + 1
67+
68+
if (commit.getVersion != expectedCommitVersion) {
69+
throw new CommitFailedException(
70+
false, /* retryable */
71+
false, /* conflict */
72+
s"Expected commit version $expectedCommitVersion but got ${commit.getVersion}")
73+
}
74+
75+
commits += commit
76+
maxRatifiedVersion += 1
77+
}
78+
}
79+
}
80+
81+
/**
82+
* In-memory Unity Catalog client implementation for testing.
83+
*
84+
* Provides a mock implementation of UCClient that stores all table data in memory. This is useful
85+
* for unit tests that need to simulate Unity Catalog operations without connecting to an actual UC
86+
* service.
87+
*
88+
* Thread Safety: This implementation is thread-safe for concurrent access. Multiple threads can
89+
* safely perform operations on different tables simultaneously. Operations on the same table are
90+
* internally synchronized by the [[TableData]] class.
91+
*/
92+
class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
93+
94+
import InMemoryUCClient._
95+
96+
/** Map from UC_TABLE_ID to TABLE_DATA */
97+
private val tables = new ConcurrentHashMap[String, TableData]()
98+
99+
override def getMetastoreId: String = ucMetastoreId
100+
101+
/** Convenience method for tests to commit with default parameters. */
102+
def commitWithDefaults(
103+
tableId: String,
104+
tableUri: URI,
105+
commit: Optional[Commit],
106+
lastKnownBackfilledVersion: Optional[JLong] = Optional.empty(),
107+
disown: Boolean = false,
108+
newMetadata: Optional[AbstractMetadata] = Optional.empty(),
109+
newProtocol: Optional[AbstractProtocol] = Optional.empty()): Unit = {
110+
this.commit(
111+
tableId,
112+
tableUri,
113+
commit,
114+
lastKnownBackfilledVersion,
115+
disown,
116+
newMetadata,
117+
newProtocol)
118+
}
119+
120+
override def commit(
121+
tableId: String,
122+
tableUri: URI,
123+
commit: Optional[Commit] = Optional.empty(),
124+
lastKnownBackfilledVersion: Optional[JLong],
125+
disown: Boolean,
126+
newMetadata: Optional[AbstractMetadata],
127+
newProtocol: Optional[AbstractProtocol]): Unit = {
128+
Seq(
129+
(lastKnownBackfilledVersion.isPresent, "lastKnownBackfilledVersion"),
130+
(disown, "disown"),
131+
(newMetadata.isPresent, "newMetadata"),
132+
(newProtocol.isPresent, "newProtocol")).foreach { case (isUnsupported, name) =>
133+
if (isUnsupported) {
134+
throw new UnsupportedOperationException(s"$name not supported yet in InMemoryUCClient")
135+
}
136+
}
137+
138+
if (!commit.isPresent) return
139+
140+
getOrCreateTableIfNotExists(tableId).appendCommit(commit.get())
141+
}
142+
143+
override def getCommits(
144+
tableId: String,
145+
tableUri: URI,
146+
startVersion: Optional[JLong],
147+
endVersion: Optional[JLong]): GetCommitsResponse = {
148+
val tableData = getTableDataElseThrow(tableId)
149+
val filteredCommits = tableData.getCommitsInRange(startVersion, endVersion)
150+
new GetCommitsResponse(filteredCommits.asJava, tableData.getMaxRatifiedVersion)
151+
}
152+
153+
override def close(): Unit = {}
154+
155+
private[unity] def getTablesCopy: Map[String, TableData] = {
156+
tables.asScala.toMap
157+
}
158+
159+
/** Retrieves the table data for the given table ID, creating it if it does not exist. */
160+
private def getOrCreateTableIfNotExists(tableId: String): TableData = {
161+
tables.computeIfAbsent(tableId, _ => new TableData)
162+
}
163+
164+
/** Retrieves table data for the given table ID or throws an exception if not found. */
165+
private def getTableDataElseThrow(tableId: String): TableData = {
166+
Option(tables.get(tableId))
167+
.getOrElse(throw new InvalidTargetTableException(s"Table not found: $tableId"))
168+
}
169+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.unity
18+
19+
import java.lang.{Long => JLong}
20+
import java.net.URI
21+
import java.util.Optional
22+
23+
import scala.collection.JavaConverters._
24+
25+
import io.delta.storage.commit.CommitFailedException
26+
import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException
27+
28+
import org.scalatest.funsuite.AnyFunSuite
29+
30+
/** Unit tests for [[InMemoryUCClient]]. */
31+
class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
32+
33+
private def testGetCommitsFiltering(
34+
allVersions: Seq[Long],
35+
startVersionOpt: Optional[JLong],
36+
endVersionOpt: Optional[JLong],
37+
expectedVersions: Seq[Long]): Unit = {
38+
val client = getInMemoryUCClientWithCommitsForTableId("tableId", allVersions)
39+
val response = client.getCommits("tableId", fakeURI, startVersionOpt, endVersionOpt)
40+
val actualVersions = response.getCommits.asScala.map(_.getVersion)
41+
42+
assert(actualVersions == expectedVersions)
43+
}
44+
45+
test("TableData::appendCommit throws if commit version is not maxRatifiedVersion + 1") {
46+
val tableData = new InMemoryUCClient.TableData
47+
val commit = createCommit(1L)
48+
49+
val exMsg = intercept[CommitFailedException] {
50+
tableData.appendCommit(commit)
51+
}.getMessage
52+
53+
assert(exMsg.contains("Expected commit version 0 but got 1"))
54+
}
55+
56+
test("TableData::appendCommit appends the commit and updates the maxRatifiedVersion") {
57+
val tableData = new InMemoryUCClient.TableData
58+
tableData.appendCommit(createCommit(0L))
59+
60+
assert(tableData.getMaxRatifiedVersion == 0L)
61+
assert(tableData.getCommits.size == 1)
62+
assert(tableData.getCommits.head.getVersion == 0L)
63+
64+
tableData.appendCommit(createCommit(1L))
65+
assert(tableData.getMaxRatifiedVersion == 1L)
66+
assert(tableData.getCommits.size == 2)
67+
assert(tableData.getCommits.last.getVersion == 1L)
68+
}
69+
70+
test("getCommits throws InvalidTargetTableException for non-existent table") {
71+
val client = new InMemoryUCClient("ucMetastoreId")
72+
val exception = intercept[InvalidTargetTableException] {
73+
client.getCommits("abcd", new URI("s3://bucket/table"), Optional.empty(), Optional.empty())
74+
}
75+
assert(exception.getMessage.contains(s"Table not found: abcd"))
76+
}
77+
78+
test("getCommits returns all commits if no startVersion or endVersion filter") {
79+
testGetCommitsFiltering(
80+
allVersions = 0L to 5L,
81+
startVersionOpt = Optional.empty(),
82+
endVersionOpt = Optional.empty(),
83+
expectedVersions = 0L to 5L)
84+
}
85+
86+
test("getCommits filters by startVersion") {
87+
testGetCommitsFiltering(
88+
allVersions = 0L to 5L,
89+
startVersionOpt = Optional.of(2L),
90+
endVersionOpt = Optional.empty(),
91+
expectedVersions = 2L to 5L)
92+
}
93+
94+
test("getCommits filters by endVersion") {
95+
testGetCommitsFiltering(
96+
allVersions = 0L to 5L,
97+
startVersionOpt = Optional.empty(),
98+
endVersionOpt = Optional.of(3L),
99+
expectedVersions = 0L to 3L)
100+
}
101+
102+
test("getCommits filters by startVersion and endVersion") {
103+
testGetCommitsFiltering(
104+
allVersions = 0L to 5L,
105+
startVersionOpt = Optional.of(2L),
106+
endVersionOpt = Optional.of(4L),
107+
expectedVersions = 2L to 4L)
108+
}
109+
110+
test("concurrent table creation (via committing version 0) => only one commit succeeds") {
111+
val client = new InMemoryUCClient("ucMetastoreId")
112+
val tableId = "race-table"
113+
114+
val results = (0 until 10).par.map { _ =>
115+
try {
116+
client.commitWithDefaults(tableId, fakeURI, Optional.of(createCommit(0L)))
117+
"success"
118+
} catch {
119+
case _: CommitFailedException => "failed"
120+
}
121+
}
122+
123+
assert(results.count(_ == "success") == 1) // Only one should succeed in committing version 0
124+
assert(results.count(_ == "failed") == 9)
125+
assert(client.getTablesCopy.size == 1)
126+
assert(client.getTablesCopy(tableId).getMaxRatifiedVersion == 0L)
127+
}
128+
129+
}

unity/src/test/scala/io/delta/unity/UCCatalogManagedClientSuite.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ package io.delta.unity
1919
import scala.collection.JavaConverters._
2020

2121
import io.delta.kernel.internal.files.ParsedLogData.ParsedLogType
22-
import io.delta.kernel.internal.util.FileNames
23-
import io.delta.storage.commit.Commit
2422

25-
import org.apache.hadoop.fs.{FileStatus => HadoopFileStatus, Path}
2623
import org.scalatest.funsuite.AnyFunSuite
2724

2825
/** Unit tests for [[UCCatalogManagedClient]]. */
@@ -34,7 +31,23 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU
3431
}
3532
}
3633

37-
// TODO: loadTable throws on invalid input. We need a non-null UCClient in order to test this.
34+
test("loadTable throws on invalid input") {
35+
val ucClient = new InMemoryUCClient("ucMetastoreId")
36+
val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient)
37+
38+
assertThrows[NullPointerException] {
39+
ucCatalogManagedClient.loadTable(null, "ucTableId", "tablePath", 0L) // engine is null
40+
}
41+
assertThrows[NullPointerException] {
42+
ucCatalogManagedClient.loadTable(defaultEngine, null, "tablePath", 0L) // ucTableId is null
43+
}
44+
assertThrows[NullPointerException] {
45+
ucCatalogManagedClient.loadTable(defaultEngine, "ucTableId", null, 0L) // tablePath is null
46+
}
47+
assertThrows[IllegalArgumentException] {
48+
ucCatalogManagedClient.loadTable(defaultEngine, "ucTableId", "tablePath", -1L) // version < 0
49+
}
50+
}
3851

3952
test("converts UC Commit into Kernel ParsedLogData.RATIFIED_STAGED_COMMIT") {
4053
val ucCommit = createCommit(1)

unity/src/test/scala/io/delta/unity/UCCatalogManagedTestUtils.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616

1717
package io.delta.unity
1818

19+
import java.net.URI
20+
import java.util.Optional
21+
22+
import io.delta.kernel.defaults.utils.TestUtils
1923
import io.delta.kernel.internal.util.FileNames
2024
import io.delta.storage.commit.Commit
2125

2226
import org.apache.hadoop.fs.{FileStatus => HadoopFileStatus, Path}
2327

24-
trait UCCatalogManagedTestUtils {
28+
trait UCCatalogManagedTestUtils extends TestUtils {
29+
val fakeURI = new URI("s3://bucket/table")
30+
2531
def hadoopCommitFileStatus(version: Long): HadoopFileStatus = {
2632
val filePath = FileNames.stagedCommitFile("fake/logPath", version)
2733

@@ -37,4 +43,15 @@ trait UCCatalogManagedTestUtils {
3743
def createCommit(version: Long): Commit = {
3844
new Commit(version, hadoopCommitFileStatus(version), version) // version, fileStatus, timestamp
3945
}
46+
47+
/** Creates an InMemoryUCClient with the given tableId and commits for the specified versions. */
48+
def getInMemoryUCClientWithCommitsForTableId(
49+
tableId: String,
50+
versions: Seq[Long]): InMemoryUCClient = {
51+
val client = new InMemoryUCClient("ucMetastoreId")
52+
versions.foreach { v =>
53+
client.commitWithDefaults(tableId, fakeURI, Optional.of(createCommit(v)))
54+
}
55+
client
56+
}
4057
}

0 commit comments

Comments
 (0)