Skip to content

Commit 700bdaf

Browse files
[Kernel] Add Domain Metadata support to Delta Kernel (delta-io#3835)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR adds support for domain metadata to Delta Kernel as described in the [Delta Protocal](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata). In particular, it adds the following to Delta Kernel: - `DomainMetadata` Class - Used to represent a [domain metadata action](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata) as described in the Delta Protocol. - Includes necessary utility functions, such as creating a `DomainMetadata` instance from `Row`/`ColumnVector` and creating a action `Row` from a `DomainMetadata` instance for committing. - Transaction Support - Checks for duplicate domain metadata and protocol support prior to committing them. - Adds an internal `addDomainMetadata` API to `TransactionImpl` for testing purposes. In real scenarios, domain metadata will be constructed by feature-specific code within `TransactionImpl`. A future PR introducing Row Tracking will provide a concrete example of domain metadata usage in practice. - Checkpointing - Domain metadata is maintained during checkpointing. - Log Replay. - Currently, domain metadata is lazily load in a separate pass of reply when requested. - We might want to improve this in the future by caching domain metadata during the initial Protocol & Metadata replay. - Conflict Resolution. - Two overlapping transactions conflict if they include domain metadata actions for the same metadata domain. - Future features can implement custom conflict resolution logic as needed. - Adds `domainMetadata` to `SUPPORTED_WRITER_FEATURES` ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Added tests covering operations involving DomainMetadata in `DomainMetadataSuite`. - Unit tests for committing, log replaying, checkpointing, and conflict resolution related to domain metadata. Negative tests for missing writer feature in the protocol and duplicate domain metadata actions. - Integration tests where a table with domain metadata is write by Spark and read by Kernel, and vice versa. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No. Domain metadata is currently intended for internal use by kernel developers to support specific table features. We don't plan to allow users to create their own domain metadata in the near future. So this PR only involves changes to internal APIs with no additions/modifications to public APIs. --------- Co-authored-by: Johan Lasperas <[email protected]>
1 parent ec0ab0d commit 700bdaf

File tree

12 files changed

+1010
-30
lines changed

12 files changed

+1010
-30
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static java.lang.String.format;
1919

2020
import io.delta.kernel.exceptions.*;
21+
import io.delta.kernel.internal.actions.DomainMetadata;
2122
import io.delta.kernel.types.DataType;
2223
import io.delta.kernel.types.StructType;
2324
import io.delta.kernel.utils.DataFileStatus;
@@ -274,6 +275,24 @@ public static KernelException invalidConfigurationValueException(
274275
return new InvalidConfigurationValueException(key, value, helpMessage);
275276
}
276277

278+
public static KernelException domainMetadataUnsupported() {
279+
String message =
280+
"Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' "
281+
+ "is not supported on this table.";
282+
return new KernelException(message);
283+
}
284+
285+
public static ConcurrentWriteException concurrentDomainMetadataAction(
286+
DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) {
287+
String message =
288+
String.format(
289+
"A concurrent writer added a domainMetadata action for the same domain: %s. "
290+
+ "No domain-specific conflict resolution is available for this domain. "
291+
+ "Attempted domainMetadata: %s. Winning domainMetadata: %s",
292+
domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata);
293+
return new ConcurrentWriteException(message);
294+
}
295+
277296
/* ------------------------ HELPER METHODS ----------------------------- */
278297
private static String formatTimestamp(long millisSinceEpochUTC) {
279298
return new Timestamp(millisSinceEpochUTC).toInstant().toString();

kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
2424
import io.delta.kernel.engine.Engine;
2525
import io.delta.kernel.internal.actions.CommitInfo;
26+
import io.delta.kernel.internal.actions.DomainMetadata;
2627
import io.delta.kernel.internal.actions.Metadata;
2728
import io.delta.kernel.internal.actions.Protocol;
2829
import io.delta.kernel.internal.fs.Path;
@@ -31,6 +32,7 @@
3132
import io.delta.kernel.internal.snapshot.LogSegment;
3233
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
3334
import io.delta.kernel.types.StructType;
35+
import java.util.Map;
3436
import java.util.Optional;
3537

3638
/** Implementation of {@link Snapshot}. */
@@ -83,6 +85,17 @@ public Protocol getProtocol() {
8385
return protocol;
8486
}
8587

88+
/**
89+
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
90+
* domain metadata actions, resolving them to produce the current state of the domain metadata.
91+
*
92+
* @return A map where the keys are domain names and the values are {@link DomainMetadata}
93+
* objects.
94+
*/
95+
public Map<String, DomainMetadata> getDomainMetadataMap() {
96+
return logReplay.getDomainMetadataMap();
97+
}
98+
8699
public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
87100
long minFileRetentionTimestampMillis =
88101
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class TableFeatures {
3939
add("columnMapping");
4040
add("typeWidening-preview");
4141
add("typeWidening");
42+
add(DOMAIN_METADATA_FEATURE_NAME);
4243
}
4344
});
4445

@@ -57,6 +58,12 @@ public class TableFeatures {
5758
}
5859
});
5960

61+
/** The feature name for domain metadata. */
62+
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";
63+
64+
/** The minimum writer version required to support table features. */
65+
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;
66+
6067
////////////////////
6168
// Helper Methods //
6269
////////////////////
@@ -93,7 +100,7 @@ public static void validateReadSupportedTable(
93100
* <li>protocol writer version 1.
94101
* <li>protocol writer version 2 only with appendOnly feature enabled.
95102
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
96-
* columnMapping}, {@code typeWidening} feature enabled.
103+
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
97104
* </ul>
98105
*
99106
* @param protocol Table protocol
@@ -125,20 +132,8 @@ public static void validateWriteSupportedTable(
125132
throw unsupportedWriterProtocol(tablePath, minWriterVersion);
126133
case 7:
127134
for (String writerFeature : protocol.getWriterFeatures()) {
128-
switch (writerFeature) {
129-
// Only supported writer features as of today in Kernel
130-
case "appendOnly":
131-
break;
132-
case "inCommitTimestamp":
133-
break;
134-
case "columnMapping":
135-
break;
136-
case "typeWidening-preview":
137-
break;
138-
case "typeWidening":
139-
break;
140-
default:
141-
throw unsupportedWriterFeature(tablePath, writerFeature);
135+
if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
136+
throw unsupportedWriterFeature(tablePath, writerFeature);
142137
}
143138
}
144139
break;
@@ -187,6 +182,21 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
187182
.collect(Collectors.toSet());
188183
}
189184

185+
/**
186+
* Checks if the table protocol supports the "domainMetadata" writer feature.
187+
*
188+
* @param protocol the protocol to check
189+
* @return true if the "domainMetadata" feature is supported, false otherwise
190+
*/
191+
public static boolean isDomainMetadataSupported(Protocol protocol) {
192+
List<String> writerFeatures = protocol.getWriterFeatures();
193+
if (writerFeatures == null) {
194+
return false;
195+
}
196+
return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME)
197+
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
198+
}
199+
190200
/**
191201
* Get the minimum reader version required for a feature.
192202
*

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@
3232
import io.delta.kernel.internal.fs.Path;
3333
import io.delta.kernel.internal.replay.ConflictChecker;
3434
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
35-
import io.delta.kernel.internal.util.Clock;
36-
import io.delta.kernel.internal.util.ColumnMapping;
37-
import io.delta.kernel.internal.util.FileNames;
38-
import io.delta.kernel.internal.util.InCommitTimestampUtils;
39-
import io.delta.kernel.internal.util.VectorUtils;
35+
import io.delta.kernel.internal.util.*;
4036
import io.delta.kernel.types.StructType;
4137
import io.delta.kernel.utils.CloseableIterable;
4238
import io.delta.kernel.utils.CloseableIterator;
@@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction {
7369
private final Optional<SetTransaction> setTxnOpt;
7470
private final boolean shouldUpdateProtocol;
7571
private final Clock clock;
72+
private final List<DomainMetadata> domainMetadatas = new ArrayList<>();
7673
private Metadata metadata;
7774
private boolean shouldUpdateMetadata;
7875

@@ -120,6 +117,23 @@ public StructType getSchema(Engine engine) {
120117
return readSnapshot.getSchema(engine);
121118
}
122119

120+
public Optional<SetTransaction> getSetTxnOpt() {
121+
return setTxnOpt;
122+
}
123+
124+
/**
125+
* Internal API to add domain metadata actions for this transaction. Visible for testing.
126+
*
127+
* @param domainMetadatas List of domain metadata to be added to the transaction.
128+
*/
129+
public void addDomainMetadatas(List<DomainMetadata> domainMetadatas) {
130+
this.domainMetadatas.addAll(domainMetadatas);
131+
}
132+
133+
public List<DomainMetadata> getDomainMetadatas() {
134+
return domainMetadatas;
135+
}
136+
123137
@Override
124138
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
125139
throws ConcurrentWriteException {
@@ -221,6 +235,12 @@ private TransactionCommitResult doCommit(
221235
}
222236
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));
223237

238+
// Check for duplicate domain metadata and if the protocol supports
239+
DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol);
240+
241+
domainMetadatas.forEach(
242+
dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow())));
243+
224244
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
225245
// Create a new CloseableIterator that will return the metadata actions followed by the
226246
// data actions.
@@ -265,10 +285,6 @@ public boolean isBlindAppend() {
265285
return true;
266286
}
267287

268-
public Optional<SetTransaction> getSetTxnOpt() {
269-
return setTxnOpt;
270-
}
271-
272288
/**
273289
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can
274290
* result in an additional file read and that this will only happen if ICT is enabled.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.actions;
17+
18+
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
19+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
20+
import static java.util.Objects.requireNonNull;
21+
22+
import io.delta.kernel.data.ColumnVector;
23+
import io.delta.kernel.data.Row;
24+
import io.delta.kernel.internal.data.GenericRow;
25+
import io.delta.kernel.types.BooleanType;
26+
import io.delta.kernel.types.StringType;
27+
import io.delta.kernel.types.StructType;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
/** Delta log action representing an `DomainMetadata` action */
32+
public class DomainMetadata {
33+
/** Full schema of the {@link DomainMetadata} action in the Delta Log. */
34+
public static final StructType FULL_SCHEMA =
35+
new StructType()
36+
.add("domain", StringType.STRING, false /* nullable */)
37+
.add("configuration", StringType.STRING, false /* nullable */)
38+
.add("removed", BooleanType.BOOLEAN, false /* nullable */);
39+
40+
public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) {
41+
if (vector.isNullAt(rowId)) {
42+
return null;
43+
}
44+
return new DomainMetadata(
45+
requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId),
46+
requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId),
47+
requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId));
48+
}
49+
50+
/**
51+
* Creates a {@link DomainMetadata} instance from a Row with the schema being {@link
52+
* DomainMetadata#FULL_SCHEMA}.
53+
*
54+
* @param row the Row object containing the DomainMetadata action
55+
* @return a DomainMetadata instance or null if the row is null
56+
* @throws IllegalArgumentException if the schema of the row does not match {@link
57+
* DomainMetadata#FULL_SCHEMA}
58+
*/
59+
public static DomainMetadata fromRow(Row row) {
60+
if (row == null) {
61+
return null;
62+
}
63+
checkArgument(
64+
row.getSchema().equals(FULL_SCHEMA),
65+
"Expected schema: %s, found: %s",
66+
FULL_SCHEMA,
67+
row.getSchema());
68+
return new DomainMetadata(
69+
requireNonNull(row, 0, "domain").getString(0),
70+
requireNonNull(row, 1, "configuration").getString(1),
71+
requireNonNull(row, 2, "removed").getBoolean(2));
72+
}
73+
74+
private final String domain;
75+
private final String configuration;
76+
private final boolean removed;
77+
78+
/**
79+
* The domain metadata action contains a configuration string for a named metadata domain. Two
80+
* overlapping transactions conflict if they both contain a domain metadata action for the same
81+
* metadata domain. Per-domain conflict resolution logic can be implemented.
82+
*
83+
* @param domain A string used to identify a specific domain.
84+
* @param configuration A string containing configuration for the metadata domain.
85+
* @param removed If it is true it serves as a tombstone to logically delete a {@link
86+
* DomainMetadata} action.
87+
*/
88+
public DomainMetadata(String domain, String configuration, boolean removed) {
89+
this.domain = requireNonNull(domain, "domain is null");
90+
this.configuration = requireNonNull(configuration, "configuration is null");
91+
this.removed = removed;
92+
}
93+
94+
public String getDomain() {
95+
return domain;
96+
}
97+
98+
public String getConfiguration() {
99+
return configuration;
100+
}
101+
102+
public boolean isRemoved() {
103+
return removed;
104+
}
105+
106+
/**
107+
* Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}.
108+
*
109+
* @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}
110+
*/
111+
public Row toRow() {
112+
Map<Integer, Object> domainMetadataMap = new HashMap<>();
113+
domainMetadataMap.put(0, domain);
114+
domainMetadataMap.put(1, configuration);
115+
domainMetadataMap.put(2, removed);
116+
117+
return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap);
118+
}
119+
120+
@Override
121+
public String toString() {
122+
return String.format(
123+
"DomainMetadata{domain='%s', configuration='%s', removed='%s'}",
124+
domain, configuration, removed);
125+
}
126+
127+
@Override
128+
public boolean equals(Object obj) {
129+
if (this == obj) return true;
130+
if (obj == null || getClass() != obj.getClass()) return false;
131+
DomainMetadata that = (DomainMetadata) obj;
132+
return removed == that.removed
133+
&& domain.equals(that.domain)
134+
&& configuration.equals(that.configuration);
135+
}
136+
137+
@Override
138+
public int hashCode() {
139+
return java.util.Objects.hash(domain, configuration, removed);
140+
}
141+
}

0 commit comments

Comments
 (0)