Skip to content

Commit 87e1f7c

Browse files
authored
[Kernel] Add ability to set table features via configuration properties. (delta-io#4340)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description This allows features to be enabled by adding a key value pair of "delta.feature.$feature_name"->"supported" in the table properties when creating a transaction, and have those features enabled for the table. Disablement is not done in this PR because it is a less common use-case. Resolves delta-io#4331 ## How was this patch tested? Unit tests: 1. Detailed test for new method added on TableFeature 2. Added a more e2e test in DeltaTableFeaturesSuite 3. Introduced a new MetadataSuite for Metadata action method changes. ## Does this PR introduce _any_ user-facing changes? Yes. Features can now be turned on in DeltaLake by using `"delta.feature.<feature_name>": "true"` in table properties. These keys are removed from the configuration so they don't show up in the Metadata action. --------- Signed-off-by: Micah Kornfield <[email protected]>
1 parent e510df7 commit 87e1f7c

File tree

7 files changed

+341
-36
lines changed

7 files changed

+341
-36
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
1919
import io.delta.kernel.exceptions.UnknownConfigurationException;
2020
import io.delta.kernel.internal.actions.Metadata;
21+
import io.delta.kernel.internal.tablefeatures.TableFeatures;
2122
import io.delta.kernel.internal.util.*;
2223
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
2324
import java.util.*;
@@ -318,7 +319,12 @@ public static Map<String, String> validateDeltaProperties(Map<String, String> ne
318319
String key = kv.getKey().toLowerCase(Locale.ROOT);
319320
String value = kv.getValue();
320321

321-
if (key.startsWith("delta.")) {
322+
boolean isTableFeatureOverrideKey =
323+
key.startsWith(TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX);
324+
boolean isTableConfigKey = key.startsWith("delta.");
325+
// TableFeature override properties validation is handled separately in TransactionBuilder.
326+
boolean shouldValidateProperties = isTableConfigKey && !isTableFeatureOverrideKey;
327+
if (shouldValidateProperties) {
322328
// If it is a delta table property, make sure it is a supported property and editable
323329
if (!VALID_PROPERTIES.containsKey(key)) {
324330
throw DeltaErrors.unknownConfigurationException(kv.getKey());

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,21 @@ public Transaction build(Engine engine) {
167167
/* ----- 2: Update the PROTOCOL based on the table properties or schema ----- */
168168
// This is the only place we update the protocol action; takes care of any dependent features
169169
// Ex: We enable feature `icebergCompatV2` plus dependent features `columnMapping`
170+
Set<TableFeature> manuallyEnabledFeatures = new HashSet<>();
171+
if (needDomainMetadataSupport) {
172+
manuallyEnabledFeatures.add(TableFeatures.DOMAIN_METADATA_W_FEATURE);
173+
}
174+
175+
Tuple2<Set<TableFeature>, Optional<Metadata>> newFeaturesAndMetadata =
176+
TableFeatures.extractFeaturePropertyOverrides(newMetadata.orElse(snapshotMetadata));
177+
manuallyEnabledFeatures.addAll(newFeaturesAndMetadata._1);
178+
if (newFeaturesAndMetadata._2.isPresent()) {
179+
newMetadata = newFeaturesAndMetadata._2;
180+
}
181+
170182
Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
171183
TableFeatures.autoUpgradeProtocolBasedOnMetadata(
172-
newMetadata.orElse(snapshotMetadata),
173-
needDomainMetadataSupport,
174-
/* needClusteringTableFeature = */ false,
175-
snapshotProtocol);
184+
newMetadata.orElse(snapshotMetadata), manuallyEnabledFeatures, snapshotProtocol);
176185
if (newProtocolAndFeatures.isPresent()) {
177186
logger.info(
178187
"Automatically enabling table features: {}",

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,23 @@ public Metadata(
128128
.collect(Collectors.toList())));
129129
}
130130

131+
/**
132+
* Returns a new metadata object that has a new configuration which is the combination of its
133+
* current configuration and {@code configuration}.
134+
*
135+
* <p>For overlapping keys the values from {@code configuration} take precedence.
136+
*/
131137
public Metadata withMergedConfiguration(Map<String, String> configuration) {
132138
Map<String, String> newConfiguration = new HashMap<>(getConfiguration());
133139
newConfiguration.putAll(configuration);
140+
return withReplacedConfiguration(newConfiguration);
141+
}
142+
143+
/**
144+
* Returns a new Metadata object with the configuration provided with newConfiguration (any prior
145+
* configuration is replaced).
146+
*/
147+
public Metadata withReplacedConfiguration(Map<String, String> newConfiguration) {
134148
return new Metadata(
135149
this.id,
136150
this.name,

kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode.NONE;
2121
import static io.delta.kernel.types.TimestampNTZType.TIMESTAMP_NTZ;
2222
import static io.delta.kernel.types.VariantType.VARIANT;
23+
import static java.util.stream.Collectors.toMap;
2324
import static java.util.stream.Collectors.toSet;
2425

2526
import io.delta.kernel.exceptions.KernelException;
@@ -39,6 +40,18 @@
3940
/** Contains utility methods related to the Delta table feature support in protocol. */
4041
public class TableFeatures {
4142

43+
/**
44+
* The prefix for setting an override of a feature option in {@linkplain Metadata} configuration.
45+
*
46+
* <p>Keys with this prefix should never be persisted in the Metadata action. The keys can be
47+
* filtered out by using {@linkplain #extractFeaturePropertyOverrides}.
48+
*
49+
* <p>These overrides only support add the feature as supported in the Protocol action.
50+
*
51+
* <p>Disabling features via this method is unsupported.
52+
*/
53+
public static String SET_TABLE_FEATURE_SUPPORTED_PREFIX = "delta.feature.";
54+
4255
/////////////////////////////////////////////////////////////////////////////////
4356
/// START: Define the {@link TableFeature}s ///
4457
/// If feature instance variable ends with ///
@@ -453,31 +466,24 @@ public static Tuple2<Integer, Integer> minimumRequiredVersions(Set<TableFeature>
453466
* metadata. If the current protocol already satisfies the metadata requirements, return empty.
454467
*
455468
* @param newMetadata the new metadata to be applied to the table.
456-
* @param needDomainMetadataSupport whether the table needs to explicitly support domain metadata.
457-
* @param needClusteringTableFeature whether the table needs to support clustering table feature
458-
* if true it would add domainMetadata support as well.
469+
* @param manuallyEnabledFeatures features that were requested to be added to the protocol.
459470
* @param currentProtocol the current protocol of the table.
460471
* @return the upgraded protocol and the set of new features that were enabled in the upgrade.
461472
*/
462473
public static Optional<Tuple2<Protocol, Set<TableFeature>>> autoUpgradeProtocolBasedOnMetadata(
463474
Metadata newMetadata,
464-
boolean needDomainMetadataSupport,
465-
boolean needClusteringTableFeature,
475+
Collection<TableFeature> manuallyEnabledFeatures,
466476
Protocol currentProtocol) {
467477

468478
Set<TableFeature> allNeededTableFeatures =
469479
extractAllNeededTableFeatures(newMetadata, currentProtocol);
470-
if (needDomainMetadataSupport) {
480+
if (manuallyEnabledFeatures != null && !manuallyEnabledFeatures.isEmpty()) {
481+
// Note that any dependent features are handled below in the withFeatures call.
471482
allNeededTableFeatures =
472-
Stream.concat(allNeededTableFeatures.stream(), Stream.of(DOMAIN_METADATA_W_FEATURE))
473-
.collect(toSet());
474-
}
475-
// Its dependency feature(domainMetadata) would be enabled automatically.
476-
if (needClusteringTableFeature) {
477-
allNeededTableFeatures =
478-
Stream.concat(allNeededTableFeatures.stream(), Stream.of(CLUSTERING_W_FEATURE))
483+
Stream.concat(allNeededTableFeatures.stream(), manuallyEnabledFeatures.stream())
479484
.collect(toSet());
480485
}
486+
481487
Protocol required =
482488
new Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)
483489
.withFeatures(allNeededTableFeatures)
@@ -495,6 +501,51 @@ public static Optional<Tuple2<Protocol, Set<TableFeature>>> autoUpgradeProtocolB
495501
}
496502
}
497503

504+
/**
505+
* Extracts features overrides from Metadata properties and returns an updated metadata if any
506+
* overrides are present.
507+
*
508+
* <p>Overrides are specified using a key in th form {@linkplain
509+
* #SET_TABLE_FEATURE_SUPPORTED_PREFIX} + {featureName}. (e.g. {@code
510+
* delta.feature.icebergWriterCompatV1}). The value must be "supported" to add the feature.
511+
* Currently, removing values is not handled.
512+
*
513+
* @return A set of features that had overrides and Metadata object with the properties removed if
514+
* any overrides were present.
515+
* @throws KernelException if the feature name for the override is invalid or the value is not
516+
* equal to "supported".
517+
*/
518+
public static Tuple2<Set<TableFeature>, Optional<Metadata>> extractFeaturePropertyOverrides(
519+
Metadata currentMetadata) {
520+
Set<TableFeature> features = new HashSet<>();
521+
Map<String, String> properties = currentMetadata.getConfiguration();
522+
for (Map.Entry<String, String> entry : properties.entrySet()) {
523+
if (entry.getKey().startsWith(SET_TABLE_FEATURE_SUPPORTED_PREFIX)) {
524+
String featureName = entry.getKey().substring(SET_TABLE_FEATURE_SUPPORTED_PREFIX.length());
525+
526+
TableFeature feature = getTableFeature(featureName);
527+
features.add(feature);
528+
if (!entry.getValue().equals("supported")) {
529+
throw DeltaErrors.invalidConfigurationValueException(
530+
entry.getKey(),
531+
entry.getValue(),
532+
"TableFeature override options may only have \"supported\" as there value");
533+
}
534+
}
535+
}
536+
537+
if (features.isEmpty()) {
538+
return new Tuple2<>(features, Optional.empty());
539+
}
540+
541+
Map<String, String> cleanedProperties =
542+
properties.entrySet().stream()
543+
.filter(e -> !e.getKey().startsWith(SET_TABLE_FEATURE_SUPPORTED_PREFIX))
544+
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
545+
return new Tuple2<>(
546+
features, Optional.of(currentMetadata.withReplacedConfiguration(cleanedProperties)));
547+
}
548+
498549
/** Utility method to check if the table with given protocol is readable by the Kernel. */
499550
public static void validateKernelCanReadTheTable(Protocol protocol, String tablePath) {
500551
if (protocol.getMinReaderVersion() > TABLE_FEATURES_MIN_READER_VERSION) {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.actions
17+
18+
import java.util.{Collections, Optional}
19+
20+
import scala.collection.JavaConverters._
21+
22+
import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue}
23+
import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector
24+
import io.delta.kernel.internal.util.VectorUtils.buildColumnVector
25+
import io.delta.kernel.types.{IntegerType, StringType, StructType}
26+
27+
import org.scalatest.funsuite.AnyFunSuite
28+
29+
class MetadataSuite extends AnyFunSuite {
30+
31+
test("withMergedConfig upserts values") {
32+
val metadata = testMetadata(Map("a" -> "b", "f" -> "g"))
33+
34+
val newMetadata = metadata.withMergedConfiguration(Map("a" -> "c", "d" -> "f").asJava)
35+
36+
assert(newMetadata.getConfiguration.equals(Map("a" -> "c", "d" -> "f", "f" -> "g").asJava))
37+
}
38+
39+
test("withReplacedConfiguration replaces values") {
40+
val metadata = testMetadata(Map("a" -> "b", "f" -> "g"))
41+
42+
val newMetadata = metadata.withReplacedConfiguration(Map("a" -> "c", "d" -> "f").asJava)
43+
44+
assert(newMetadata.getConfiguration.equals(Map("a" -> "c", "d" -> "f").asJava))
45+
}
46+
47+
def testMetadata(tblProps: Map[String, String] = Map.empty): Metadata = {
48+
val testSchema = new StructType()
49+
.add("c1", IntegerType.INTEGER)
50+
.add("c2", StringType.STRING)
51+
new Metadata(
52+
"id",
53+
Optional.of("name"),
54+
Optional.of("description"),
55+
new Format("parquet", Collections.emptyMap()),
56+
testSchema.toJson,
57+
testSchema,
58+
new ArrayValue() { // partitionColumns
59+
override def getSize = 1
60+
61+
override def getElements: ColumnVector = singletonStringColumnVector("c3")
62+
},
63+
Optional.empty(),
64+
new MapValue() { // conf
65+
override def getSize = tblProps.size
66+
override def getKeys: ColumnVector =
67+
buildColumnVector(tblProps.toSeq.map(_._1).asJava, StringType.STRING)
68+
override def getValues: ColumnVector =
69+
buildColumnVector(tblProps.toSeq.map(_._2).asJava, StringType.STRING)
70+
})
71+
}
72+
73+
}

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -856,13 +856,16 @@ class TableFeaturesSuite extends AnyFunSuite {
856856
s"$currentProtocol -> $expectedProtocol, $expectedNewFeatures") {
857857

858858
for (
859-
(needDomainMetadata, needClusteringTableFeature) <-
860-
Seq((false, false), (false, true), (true, false), (true, true))
859+
(manualFeatures) <-
860+
Seq(
861+
Set[TableFeature](),
862+
Set(TableFeatures.DOMAIN_METADATA_W_FEATURE),
863+
Set(TableFeatures.CLUSTERING_W_FEATURE),
864+
Set(TableFeatures.CLUSTERING_W_FEATURE, TableFeatures.DOMAIN_METADATA_W_FEATURE))
861865
) {
862866
val newProtocolAndNewFeaturesEnabled = TableFeatures.autoUpgradeProtocolBasedOnMetadata(
863867
newMetadata,
864-
needDomainMetadata,
865-
needClusteringTableFeature,
868+
manualFeatures.asJava,
866869
currentProtocol)
867870

868871
assert(newProtocolAndNewFeaturesEnabled.isPresent, "expected protocol upgrade")
@@ -874,24 +877,31 @@ class TableFeaturesSuite extends AnyFunSuite {
874877
assert(newProtocol.getMinReaderVersion == expectedProtocol.getMinReaderVersion)
875878

876879
// Writer version: upgrade to 7 if domain metadata or clustering feature is enabled
877-
val expectedWriterVersion = if (needDomainMetadata || needClusteringTableFeature) 7
878-
else expectedProtocol.getMinWriterVersion
880+
val expectedWriterVersion =
881+
if (
882+
!(manualFeatures & Set(
883+
TableFeatures.CLUSTERING_W_FEATURE,
884+
TableFeatures.DOMAIN_METADATA_W_FEATURE)).isEmpty
885+
) { 7 }
886+
else expectedProtocol.getMinWriterVersion
879887
assert(newProtocol.getMinWriterVersion == expectedWriterVersion)
880888

881889
// Expected enabled features
882-
val expectedEnabledFeatures = expectedNewFeatures.asScala ++ (
883-
if (needClusteringTableFeature) Set("domainMetadata", "clustering")
884-
else if (needDomainMetadata) Set("domainMetadata")
885-
else Set.empty
886-
)
890+
val expectedEnabledFeatures =
891+
expectedNewFeatures.asScala ++ manualFeatures.map(_.featureName()).toSet ++ (
892+
if (manualFeatures.contains(TableFeatures.CLUSTERING_W_FEATURE)) Set("domainMetadata")
893+
else Set.empty
894+
)
887895
assert(newFeaturesEnabled.asScala.map(_.featureName()).toSet == expectedEnabledFeatures)
888896

889897
// Expected supported features
898+
val implicitAndExplicitFeatures =
899+
expectedProtocol.getImplicitlyAndExplicitlySupportedFeatures.asScala
890900
val expectedSupportedFeatures =
891-
expectedProtocol.getImplicitlyAndExplicitlySupportedFeatures.asScala ++ (
892-
if (needClusteringTableFeature) Set(DOMAIN_METADATA_W_FEATURE, CLUSTERING_W_FEATURE)
893-
else if (needDomainMetadata) Set(DOMAIN_METADATA_W_FEATURE)
894-
else Set.empty
901+
implicitAndExplicitFeatures ++ manualFeatures ++ (
902+
if (manualFeatures.contains(TableFeatures.CLUSTERING_W_FEATURE)) {
903+
Set(TableFeatures.DOMAIN_METADATA_W_FEATURE)
904+
} else { Set.empty }
895905
)
896906
assert(newProtocol.getImplicitlyAndExplicitlySupportedFeatures.asScala
897907
== expectedSupportedFeatures)
@@ -937,13 +947,62 @@ class TableFeaturesSuite extends AnyFunSuite {
937947
val newProtocolAndNewFeaturesEnabled =
938948
TableFeatures.autoUpgradeProtocolBasedOnMetadata(
939949
newMetadata,
940-
/* needDomainMetadataSupport = */ false,
941-
/* needClusteringTableFeature = */ false,
950+
Set.empty.asJava,
942951
currentProtocol)
943952
assert(!newProtocolAndNewFeaturesEnabled.isPresent, "expected no-op upgrade")
944953
}
945954
}
946955

956+
test(
957+
"extractFeaturePropertyOverrides returns feature options and removes from them from metadata") {
958+
val metadata = testMetadata(tblProps = Map(
959+
"delta.feature.deletionVectors" -> "supported",
960+
"delta.feature.appendOnly" -> "supported",
961+
"anotherkey" -> "some_value",
962+
"delta.enableRowTracking" -> "true"))
963+
964+
val tableFeaturesAndMetadata =
965+
TableFeatures.extractFeaturePropertyOverrides(metadata)
966+
967+
val newFeatures = tableFeaturesAndMetadata._1
968+
assert(tableFeaturesAndMetadata._2.isPresent)
969+
val newMetadata = tableFeaturesAndMetadata._2.get
970+
assert(
971+
newFeatures.equals(Set(
972+
TableFeatures.APPEND_ONLY_W_FEATURE,
973+
TableFeatures.DELETION_VECTORS_RW_FEATURE).asJava),
974+
s"Explicit features: ${newFeatures}")
975+
976+
val tableConfig = newMetadata.getConfiguration
977+
val expectedMap = Map("anotherkey" -> "some_value", "delta.enableRowTracking" -> "true")
978+
assert(expectedMap.asJava.equals(tableConfig), s"$tableConfig != $expectedMap")
979+
}
980+
981+
test(
982+
"extractFeaturePropertyOverrides returns empty metadata with no change") {
983+
val metadata = testMetadata(tblProps = Map(
984+
"anotherkey" -> "some_value",
985+
"delta.enableRowTracking" -> "true"))
986+
987+
val tableFeaturesAndMetadata =
988+
TableFeatures.extractFeaturePropertyOverrides(metadata)
989+
990+
assert(tableFeaturesAndMetadata._1.isEmpty)
991+
assert(!tableFeaturesAndMetadata._2.isPresent)
992+
}
993+
994+
Seq(
995+
Map("delta.feature.deletionVectors" -> "not_valid_value"),
996+
Map("delta.feature.invalidFeatureName" -> "supported")).foreach {
997+
properties =>
998+
test(s"extractFeaturePropertyOverrides throws: $properties") {
999+
intercept[KernelException] {
1000+
TableFeatures.extractFeaturePropertyOverrides(
1001+
testMetadata(tblProps = properties))
1002+
}
1003+
}
1004+
}
1005+
9471006
/////////////////////////////////////////////////////////////////////////////////////////////////
9481007
// Test utility methods. //
9491008
/////////////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)