Skip to content

Commit 024dadb

Browse files
authored
[Kernel] [CC Refactor #1] Add TableIdentifier API (delta-io#3795)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description - adds a new `TableIdentifier` class, that kernel will pass on to Commit Coordinator Client - adds a new `Table::forPathWithTableId(engine, path, tableId)` interface - the tableId is stored as an `Optional` in the `Table`, and this PR does **not** propagate that value into SnapshotManager, Snapshot, etc. Future PRs can take care of that. ## How was this patch tested? TableIdentifier UTs ## Does this PR introduce _any_ user-facing changes? Yes. See the above.
1 parent 9765528 commit 024dadb

File tree

5 files changed

+244
-21
lines changed

5 files changed

+244
-21
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Table.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.delta.kernel.exceptions.TableNotFoundException;
2323
import io.delta.kernel.internal.TableImpl;
2424
import java.io.IOException;
25+
import java.util.Optional;
2526

2627
/**
2728
* Represents the Delta Lake table for a given path.
@@ -57,6 +58,24 @@ static Table forPath(Engine engine, String path) {
5758
return TableImpl.forPath(engine, path);
5859
}
5960

61+
/**
62+
* Instantiate a table object for the Delta Lake table at the given path and associate it with the
63+
* given {@link TableIdentifier}.
64+
*
65+
* <p>See {@link #forPath(Engine, String)} for more details on behavior when the table path does
66+
* or does not exist.
67+
*
68+
* @param engine the {@link Engine} instance to use in Delta Kernel.
69+
* @param path location of the table. Path is resolved to fully qualified path using the given
70+
* {@code engine}.
71+
* @param tableId the {@link TableIdentifier} to associate with the {@link Table}
72+
* @return an instance of {@link Table} representing the Delta table at the given path and
73+
* associated with the given {@link TableIdentifier}
74+
*/
75+
static Table forPathWithTableId(Engine engine, String path, TableIdentifier tableId) {
76+
return TableImpl.forPathWithTableId(engine, path, tableId);
77+
}
78+
6079
/**
6180
* The fully qualified path of this {@link Table} instance.
6281
*
@@ -66,6 +85,14 @@ static Table forPath(Engine engine, String path) {
6685
*/
6786
String getPath(Engine engine);
6887

88+
/**
89+
* The table identifier of this {@link Table} instance.
90+
*
91+
* @return the table identifier, or {@link Optional#empty()} if none is set.
92+
* @since 3.3.0
93+
*/
94+
Optional<TableIdentifier> getTableIdentifier();
95+
6996
/**
7097
* Get the latest snapshot of the table.
7198
*
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel;
18+
19+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
20+
import static java.util.Objects.requireNonNull;
21+
22+
import io.delta.kernel.annotation.Evolving;
23+
import java.util.Arrays;
24+
import java.util.Objects;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* Identifier for a table. e.g. $catalog / $schema / $table
29+
*
30+
* @since 3.3
31+
*/
32+
@Evolving
33+
public class TableIdentifier {
34+
/** The namespace of the table. */
35+
private final String[] namespace;
36+
37+
/** The name of the table. */
38+
private final String name;
39+
40+
public TableIdentifier(String[] namespace, String name) {
41+
checkArgument(namespace != null && namespace.length > 0, "namespace cannot be null or empty");
42+
this.namespace = namespace;
43+
this.name = requireNonNull(name, "name is null");
44+
}
45+
46+
/** @return The namespace of the table. e.g. $catalog / $schema */
47+
public String[] getNamespace() {
48+
return namespace;
49+
}
50+
51+
/** @return The name of the table. */
52+
public String getName() {
53+
return name;
54+
}
55+
56+
@Override
57+
public boolean equals(Object o) {
58+
if (this == o) {
59+
return true;
60+
}
61+
if (o == null || getClass() != o.getClass()) {
62+
return false;
63+
}
64+
TableIdentifier that = (TableIdentifier) o;
65+
return Arrays.equals(getNamespace(), that.getNamespace()) && getName().equals(that.getName());
66+
}
67+
68+
@Override
69+
public int hashCode() {
70+
return 31 * Objects.hash(getName()) + Arrays.hashCode(getNamespace());
71+
}
72+
73+
@Override
74+
public String toString() {
75+
final String quotedNamespace =
76+
Arrays.stream(namespace).map(this::quoteIdentifier).collect(Collectors.joining("."));
77+
return "TableIdentifier{" + quotedNamespace + "." + quoteIdentifier(name) + "}";
78+
}
79+
80+
/** Escapes back-ticks within the identifier name with double-back-ticks. */
81+
private String quoteIdentifier(String identifier) {
82+
return identifier.replace("`", "``");
83+
}
84+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444

4545
public class TableImpl implements Table {
4646

47+
//////////////////////////////
48+
// Static Methods / Members //
49+
//////////////////////////////
50+
4751
private static final Logger logger = LoggerFactory.getLogger(TableImpl.class);
4852

4953
public static Table forPath(Engine engine, String path) {
@@ -60,34 +64,60 @@ public static Table forPath(Engine engine, String path) {
6064
* @return an instance of {@link Table} representing the Delta table at the given path
6165
*/
6266
public static Table forPath(Engine engine, String path, Clock clock) {
63-
String resolvedPath;
67+
return forPathWithTableId(engine, path, Optional.empty(), clock);
68+
}
69+
70+
public static Table forPathWithTableId(Engine engine, String path, TableIdentifier tableId) {
71+
return forPathWithTableId(engine, path, Optional.of(tableId), System::currentTimeMillis);
72+
}
73+
74+
public static Table forPathWithTableId(
75+
Engine engine, String path, Optional<TableIdentifier> tableIdOpt, Clock clock) {
6476
try {
65-
resolvedPath =
77+
final String resolvedPath =
6678
wrapEngineExceptionThrowsIO(
6779
() -> engine.getFileSystemClient().resolvePath(path), "Resolving path %s", path);
80+
return new TableImpl(resolvedPath, tableIdOpt, clock);
6881
} catch (IOException io) {
6982
throw new UncheckedIOException(io);
7083
}
71-
return new TableImpl(resolvedPath, clock);
7284
}
7385

74-
private final SnapshotManager snapshotManager;
86+
////////////////////////////////
87+
// Instance Methods / Members //
88+
////////////////////////////////
89+
7590
private final String tablePath;
91+
private final Optional<TableIdentifier> tableIdOpt;
7692
private final Clock clock;
7793

78-
public TableImpl(String tablePath, Clock clock) {
94+
private final Path dataPath;
95+
private final Path logPath;
96+
private final SnapshotManager snapshotManager;
97+
98+
private TableImpl(String tablePath, Optional<TableIdentifier> tableIdOpt, Clock clock) {
7999
this.tablePath = tablePath;
80-
final Path dataPath = new Path(tablePath);
81-
final Path logPath = new Path(dataPath, "_delta_log");
82-
this.snapshotManager = new SnapshotManager(logPath, dataPath);
100+
this.tableIdOpt = tableIdOpt;
83101
this.clock = clock;
102+
this.dataPath = new Path(tablePath);
103+
this.logPath = new Path(dataPath, "_delta_log");
104+
this.snapshotManager = new SnapshotManager(logPath, dataPath);
84105
}
85106

107+
/////////////////
108+
// Public APIs //
109+
/////////////////
110+
86111
@Override
87112
public String getPath(Engine engine) {
88113
return tablePath;
89114
}
90115

116+
@Override
117+
public Optional<TableIdentifier> getTableIdentifier() {
118+
return tableIdOpt;
119+
}
120+
91121
@Override
92122
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
93123
return snapshotManager.buildLatestSnapshot(engine);
@@ -169,8 +199,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
169199
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
170200
if (!protocolVector.isNullAt(rowId)) {
171201
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
172-
TableFeatures.validateReadSupportedTable(
173-
protocol, getDataPath().toString(), Optional.empty());
202+
TableFeatures.validateReadSupportedTable(protocol, tablePath, Optional.empty());
174203
}
175204
}
176205
if (shouldDropProtocolColumn) {
@@ -181,14 +210,6 @@ public CloseableIterator<ColumnarBatch> getChanges(
181210
});
182211
}
183212

184-
protected Path getDataPath() {
185-
return new Path(tablePath);
186-
}
187-
188-
protected Path getLogPath() {
189-
return new Path(tablePath, "_delta_log");
190-
}
191-
192213
/**
193214
* Returns the latest version that was committed before or at {@code millisSinceEpochUTC}. If no
194215
* version exists, throws a {@link KernelException}
@@ -269,6 +290,22 @@ public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC
269290
}
270291
}
271292

293+
////////////////////
294+
// Protected APIs //
295+
////////////////////
296+
297+
protected Path getDataPath() {
298+
return dataPath;
299+
}
300+
301+
protected Path getLogPath() {
302+
return logPath;
303+
}
304+
305+
////////////////////////////
306+
// Private Helper Methods //
307+
////////////////////////////
308+
272309
/**
273310
* Returns the raw delta actions for each version between startVersion and endVersion. Only reads
274311
* the actions requested in actionSet from the JSON log files.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel
18+
19+
import org.scalatest.funsuite.AnyFunSuite
20+
21+
class TableIdentifierSuite extends AnyFunSuite {
22+
23+
test("TableIdentifier should throw IllegalArgumentException for null or empty namespace") {
24+
assertThrows[IllegalArgumentException] {
25+
new TableIdentifier(null, "table")
26+
}
27+
assertThrows[IllegalArgumentException] {
28+
new TableIdentifier(Array(), "table")
29+
}
30+
}
31+
32+
test("TableIdentifier should throw NullPointerException for null table name") {
33+
assertThrows[NullPointerException] {
34+
new TableIdentifier(Array("catalog", "schema"), null)
35+
}
36+
}
37+
38+
test("TableIdentifier should return the correct namespace and name") {
39+
val namespace = Array("catalog", "schema")
40+
val name = "testTable"
41+
val tid = new TableIdentifier(namespace, name)
42+
43+
assert(tid.getNamespace.sameElements(namespace))
44+
assert(tid.getName == name)
45+
}
46+
47+
test("TableIdentifiers with same namespace and name should be equal") {
48+
val tid1 = new TableIdentifier(Array("catalog", "schema"), "table")
49+
val tid2 = new TableIdentifier(Array("catalog", "schema"), "table")
50+
51+
assert(tid1 == tid2)
52+
assert(tid1.hashCode == tid2.hashCode)
53+
}
54+
55+
test("TableIdentifiers with different namespace or name should not be equal") {
56+
val tid1 = new TableIdentifier(Array("catalog", "schema1"), "table1")
57+
val tid2 = new TableIdentifier(Array("catalog", "schema2"), "table1")
58+
val tid3 = new TableIdentifier(Array("catalog", "schema1"), "table2")
59+
60+
assert(tid1 != tid2)
61+
assert(tid1 != tid3)
62+
}
63+
64+
test("TableIdentifier toString") {
65+
// Normal case
66+
val tidNormal = new TableIdentifier(Array("catalog", "schema"), "table")
67+
val expectedNormal = "TableIdentifier{catalog.schema.table}"
68+
assert(tidNormal.toString == expectedNormal)
69+
70+
// Special case: should escape backticks
71+
val tidSpecial = new TableIdentifier(Array("catalog", "sche`ma"), "tab`le")
72+
val expectedSpecial = "TableIdentifier{catalog.sche``ma.tab``le}"
73+
assert(tidSpecial.toString == expectedSpecial)
74+
}
75+
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
236236

237237
test("Enablement tracking works when ICT is enabled post commit 0") {
238238
withTempDirAndEngine { (tablePath, engine) =>
239-
val table = TableImpl.forPath(engine, tablePath)
239+
val table = Table.forPath(engine, tablePath)
240240
val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE)
241241

242242
val txn = txnBuilder
@@ -319,7 +319,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
319319

320320
test("Metadata toString should work with ICT enabled") {
321321
withTempDirAndEngine { (tablePath, engine) =>
322-
val table = TableImpl.forPath(engine, tablePath)
322+
val table = Table.forPath(engine, tablePath)
323323
val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE)
324324

325325
val txn = txnBuilder
@@ -356,7 +356,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
356356

357357
test("Table with ICT enabled is readable") {
358358
withTempDirAndEngine { (tablePath, engine) =>
359-
val table = TableImpl.forPath(engine, tablePath)
359+
val table = Table.forPath(engine, tablePath)
360360
val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE)
361361

362362
val txn = txnBuilder

0 commit comments

Comments
 (0)