Skip to content

Commit 6ae4b62

Browse files
authored
[Kernel] [CC Refactor #2] Add TableDescriptor and CommitCoordinatorClient API (delta-io#3797)
This is a stacked PR. Please view this PR's diff here: - scottsand-db/delta@delta_kernel_cc_1...delta_kernel_cc_2 #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Adds new `TableDescriptor` and `CommitCoordinatorClient` API. Adds a new `getCommitCoordinatorClient` API to the `Engine` (with a default implementation that throws an exception). ## How was this patch tested? N/A trivial. ## Does this PR introduce _any_ user-facing changes? Yes. See the above.
1 parent 024dadb commit 6ae4b62

File tree

5 files changed

+374
-1
lines changed

5 files changed

+374
-1
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/TableIdentifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public boolean equals(Object o) {
6161
if (o == null || getClass() != o.getClass()) {
6262
return false;
6363
}
64-
TableIdentifier that = (TableIdentifier) o;
64+
final TableIdentifier that = (TableIdentifier) o;
6565
return Arrays.equals(getNamespace(), that.getNamespace()) && getName().equals(that.getName());
6666
}
6767

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.coordinatedcommits;
18+
19+
import io.delta.kernel.TableIdentifier;
20+
import io.delta.kernel.annotation.Evolving;
21+
import io.delta.kernel.data.Row;
22+
import io.delta.kernel.engine.Engine;
23+
import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
24+
import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
25+
import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
26+
import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
27+
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
28+
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
29+
import io.delta.kernel.utils.CloseableIterator;
30+
import java.io.IOException;
31+
import java.util.Map;
32+
import java.util.Optional;
33+
34+
/**
35+
* The CommitCoordinatorClient is responsible for communicating with the commit coordinator and
36+
* backfilling commits. It has four main APIs that need to be implemented:
37+
*
38+
* <ul>
39+
* <li>{@link #registerTable}: Determine the table config during commit coordinator registration.
40+
* <li>{@link #commit}: Commit a new version of the table.
41+
* <li>{@link #getCommits}: Tracks and returns unbackfilled commits.
42+
* <li>{@link #backfillToVersion}: Ensure that commits are backfilled if/when needed.
43+
* </ul>
44+
*
45+
* @since 3.3.0
46+
*/
47+
@Evolving
48+
public interface CommitCoordinatorClient {
49+
50+
/**
51+
* Register the table represented by the given {@code logPath} at the provided {@code
52+
* currentVersion} with the commit coordinator this commit coordinator client represents.
53+
*
54+
* <p>This API is called when the table is being converted from an existing file system table to a
55+
* coordinated-commit table.
56+
*
57+
* <p>When a new coordinated-commit table is being created, the {@code currentVersion} will be -1
58+
* and the upgrade commit needs to be a file system commit which will write the backfilled file
59+
* directly.
60+
*
61+
* @param engine The {@link Engine} instance to use, if needed.
62+
* @param logPath The path to the delta log of the table that should be converted.
63+
* @param tableIdentifier The table identifier for the table, or {@link Optional#empty()} if the
64+
* table doesn't use any identifier (i.e. it is path-based).
65+
* @param currentVersion The version of the table just before conversion. currentVersion + 1
66+
* represents the commit that will do the conversion. This must be backfilled atomically.
67+
* currentVersion + 2 represents the first commit after conversion. This will go through the
68+
* CommitCoordinatorClient and the client is free to choose when it wants to backfill this
69+
* commit.
70+
* @param currentMetadata The metadata of the table at currentVersion
71+
* @param currentProtocol The protocol of the table at currentVersion
72+
* @return A map of key-value pairs which is issued by the commit coordinator to uniquely identify
73+
* the table. This should be stored in the table's metadata for table property {@link
74+
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}. This information
75+
* needs to be passed to the {@link #commit}, {@link #getCommits}, and {@link
76+
* #backfillToVersion} APIs to identify the table.
77+
*/
78+
Map<String, String> registerTable(
79+
Engine engine,
80+
String logPath,
81+
Optional<TableIdentifier> tableIdentifier,
82+
long currentVersion,
83+
AbstractMetadata currentMetadata,
84+
AbstractProtocol currentProtocol);
85+
86+
/**
87+
* Commit the given set of actions to the table represented by {@code tableDescriptor}.
88+
*
89+
* @param engine The {@link Engine} instance to use. This gives client implementations access to
90+
* {@link io.delta.kernel.engine.JsonHandler#writeJsonFileAtomically} in order to write the
91+
* given set of actions to an unbackfilled Delta file.
92+
* @param tableDescriptor The descriptor for the table.
93+
* @param commitVersion The version of the commit that is being committed.
94+
* @param actions The set of actions to be committed
95+
* @param updatedActions Additional information for the commit, including:
96+
* <ul>
97+
* <li>Commit info
98+
* <li>Metadata changes
99+
* <li>Protocol changes
100+
* </ul>
101+
*
102+
* @return {@link CommitResponse} containing the file status of the committed file. Note: If the
103+
* commit is already backfilled, the file status may be omitted, and the client can retrieve
104+
* this information independently.
105+
* @throws CommitFailedException if the commit operation fails
106+
*/
107+
CommitResponse commit(
108+
Engine engine,
109+
TableDescriptor tableDescriptor,
110+
long commitVersion,
111+
CloseableIterator<Row> actions,
112+
UpdatedActions updatedActions)
113+
throws CommitFailedException;
114+
115+
/**
116+
* Get the unbackfilled commits for the table represented by the given tableDescriptor. Commits
117+
* older than startVersion (if given) or newer than endVersion (if given) are ignored. The
118+
* returned commits are contiguous and in ascending version order.
119+
*
120+
* <p>Note that the first version returned by this API may not be equal to startVersion. This
121+
* happens when some versions starting from startVersion have already been backfilled and so the
122+
* commit coordinator may have stopped tracking them.
123+
*
124+
* <p>The returned latestTableVersion is the maximum commit version ratified by the commit
125+
* coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
126+
* coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
127+
*
128+
* @param engine The {@link Engine} instance to use, if needed.
129+
* @param tableDescriptor The descriptor for the table.
130+
* @param startVersion The minimum version of the commit that should be returned, or {@link
131+
* Optional#empty()} if there is no minimum.
132+
* @param endVersion The maximum version of the commit that should be returned, or {@link
133+
* Optional#empty()} if there is no maximum.
134+
* @return {@link GetCommitsResponse} which has a list of {@link
135+
* io.delta.kernel.engine.coordinatedcommits.Commit}s and the latestTableVersion which is
136+
* tracked by the {@link CommitCoordinatorClient}.
137+
*/
138+
GetCommitsResponse getCommits(
139+
Engine engine,
140+
TableDescriptor tableDescriptor,
141+
Optional<Long> startVersion,
142+
Optional<Long> endVersion);
143+
144+
/**
145+
* Backfill all commits up to {@code version} and notify the commit coordinator.
146+
*
147+
* <p>If this API returns successfully, that means the backfill must have been completed, although
148+
* the commit coordinator may not be aware of it yet.
149+
*
150+
* @param engine The {@link Engine} instance to use, if needed.
151+
* @param tableDescriptor The descriptor for the table.
152+
* @param version The version until which the commit coordinator client should backfill.
153+
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
154+
* was called. If it is {@link Optional#empty()}, then the commit coordinator client should
155+
* backfill from the beginning of the table.
156+
* @throws IOException if there is an IO error while backfilling the commits.
157+
*/
158+
void backfillToVersion(
159+
Engine engine,
160+
TableDescriptor tableDescriptor,
161+
long version,
162+
Optional<Long> lastKnownBackfilledVersion)
163+
throws IOException;
164+
165+
/**
166+
* Checks if this CommitCoordinatorClient is semantically equal to another
167+
* CommitCoordinatorClient.
168+
*/
169+
boolean semanticEquals(CommitCoordinatorClient other);
170+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.coordinatedcommits;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
import io.delta.kernel.TableIdentifier;
22+
import io.delta.kernel.annotation.Evolving;
23+
import java.util.Map;
24+
import java.util.Objects;
25+
import java.util.Optional;
26+
27+
/**
28+
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
29+
* identifier, and table CC configuration.
30+
*
31+
* @since 3.3.0
32+
*/
33+
@Evolving
34+
public class TableDescriptor {
35+
36+
private final String logPath;
37+
private final Optional<TableIdentifier> tableIdOpt;
38+
private final Map<String, String> tableConf;
39+
40+
public TableDescriptor(
41+
String logPath, Optional<TableIdentifier> tableIdOpt, Map<String, String> tableConf) {
42+
this.logPath = requireNonNull(logPath, "logPath is null");
43+
this.tableIdOpt = requireNonNull(tableIdOpt, "tableIdOpt is null");
44+
this.tableConf = requireNonNull(tableConf, "tableConf is null");
45+
}
46+
47+
/** Returns the Delta log path of the table. */
48+
public String getLogPath() {
49+
return logPath;
50+
}
51+
52+
/** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */
53+
public Optional<TableIdentifier> getTableIdentifierOpt() {
54+
return tableIdOpt;
55+
}
56+
57+
/**
58+
* Returns the Coordinated Commits table configuration.
59+
*
60+
* <p>This is the parsed value of the Delta table property {@link
61+
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
62+
* configuration properties for describing the Delta table to commit-coordinator.
63+
*/
64+
public Map<String, String> getTableConf() {
65+
return tableConf;
66+
}
67+
68+
@Override
69+
public boolean equals(Object o) {
70+
if (this == o) {
71+
return true;
72+
}
73+
if (o == null || getClass() != o.getClass()) {
74+
return false;
75+
}
76+
final TableDescriptor that = (TableDescriptor) o;
77+
return getLogPath().equals(that.getLogPath())
78+
&& tableIdOpt.equals(that.tableIdOpt)
79+
&& getTableConf().equals(that.getTableConf());
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(getLogPath(), tableIdOpt, getTableConf());
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "TableDescriptor{"
90+
+ "logPath='"
91+
+ logPath
92+
+ '\''
93+
+ ", tableIdOpt="
94+
+ tableIdOpt
95+
+ ", tableConf="
96+
+ tableConf
97+
+ '}';
98+
}
99+
}

kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.delta.kernel.engine;
1818

1919
import io.delta.kernel.annotation.Evolving;
20+
import io.delta.kernel.coordinatedcommits.CommitCoordinatorClient;
2021
import java.util.Map;
2122

2223
/**
@@ -56,6 +57,23 @@ public interface Engine {
5657
*/
5758
ParquetHandler getParquetHandler();
5859

60+
/**
61+
* Retrieves a {@link CommitCoordinatorClient} for the specified commit coordinator name.
62+
*
63+
* @param commitCoordinatorName The name (identifier) of the underlying commit coordinator client
64+
* to instantiate
65+
* @param commitCoordinatorConf The configuration settings for the underlying commit coordinator
66+
* client, taken directly from the Delta table property {@link
67+
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}
68+
* @return A {@link CommitCoordinatorClient} implementation corresponding to the specified commit
69+
* coordinator name
70+
* @since 3.3.0
71+
*/
72+
default CommitCoordinatorClient getCommitCoordinatorClient(
73+
String commitCoordinatorName, Map<String, String> commitCoordinatorConf) {
74+
throw new UnsupportedOperationException("Not implemented");
75+
}
76+
5977
/**
6078
* Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
6179
*
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.coordinatedcommits
18+
19+
import org.scalatest.funsuite.AnyFunSuite
20+
import io.delta.kernel.TableIdentifier
21+
import java.util.Optional
22+
23+
import scala.collection.JavaConverters._
24+
25+
class TableDescriptorSuite extends AnyFunSuite {
26+
27+
test("TableDescriptor should throw NullPointerException for null constructor arguments") {
28+
assertThrows[NullPointerException] {
29+
new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava)
30+
}
31+
assertThrows[NullPointerException] {
32+
new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava)
33+
}
34+
assertThrows[NullPointerException] {
35+
new TableDescriptor("/delta/logPath", Optional.empty(), null)
36+
}
37+
}
38+
39+
test("TableDescriptor should return the correct logPath, tableIdOpt, and tableConf") {
40+
val logPath = "/delta/logPath"
41+
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
42+
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava
43+
44+
val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)
45+
46+
assert(tableDescriptor.getLogPath == logPath)
47+
assert(tableDescriptor.getTableIdentifierOpt == tableIdOpt)
48+
assert(tableDescriptor.getTableConf == tableConf)
49+
}
50+
51+
test("TableDescriptors with the same values should be equal") {
52+
val logPath = "/delta/logPath"
53+
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
54+
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava
55+
56+
val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf)
57+
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf)
58+
59+
assert(tableDescriptor1 == tableDescriptor2)
60+
assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode)
61+
}
62+
63+
test("TableDescriptor with different values should not be equal") {
64+
val logPath = "/delta/logPath"
65+
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
66+
val tableConf1 = Map("key1" -> "value1").asJava
67+
val tableConf2 = Map("key1" -> "value2").asJava
68+
69+
val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf1)
70+
val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf2)
71+
72+
assert(tableDescriptor1 != tableDescriptor2)
73+
}
74+
75+
test("TableDescriptor toString format") {
76+
val logPath = "/delta/logPath"
77+
val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
78+
val tableConf = Map("key1" -> "value1").asJava
79+
80+
val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf)
81+
val expectedString = "TableDescriptor{logPath='/delta/logPath', " +
82+
"tableIdOpt=Optional[TableIdentifier{catalog.schema.table}], " +
83+
"tableConf={key1=value1}}"
84+
assert(tableDescriptor.toString == expectedString)
85+
}
86+
}

0 commit comments

Comments
 (0)