Skip to content

Commit 8e6d9ec

Browse files
authored
[Kernel][#1] IcebergCompatV3 - add icebergCompatV3 table feature (delta-io#4731)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Add the IcebergCompatV3 table feature, which requires the columnMapping and rowTracking to be enabled. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Unit tests ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 07f6636 commit 8e6d9ec

File tree

6 files changed

+146
-9
lines changed

6 files changed

+146
-9
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,19 @@ public class TableConfig<T> {
242242
"needs to be a boolean.",
243243
true);
244244

245+
/**
246+
* Table property that enables modifying the table in accordance with the Delta-Iceberg
247+
* Compatibility V3 protocol. TODO: add the delta protocol link once updated
248+
*/
249+
public static final TableConfig<Boolean> ICEBERG_COMPAT_V3_ENABLED =
250+
new TableConfig<>(
251+
"delta.enableIcebergCompatV3",
252+
"false",
253+
Boolean::valueOf,
254+
value -> true,
255+
"needs to be a boolean.",
256+
true);
257+
245258
/**
246259
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
247260
* stats for all columns.
@@ -357,6 +370,7 @@ public static class UniversalFormats {
357370
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP);
358371
addConfig(this, COLUMN_MAPPING_MODE);
359372
addConfig(this, ICEBERG_COMPAT_V2_ENABLED);
373+
addConfig(this, ICEBERG_COMPAT_V3_ENABLED);
360374
addConfig(this, ICEBERG_WRITER_COMPAT_V1_ENABLED);
361375
addConfig(this, COLUMN_MAPPING_MAX_COLUMN_ID);
362376
addConfig(this, DATA_SKIPPING_NUM_INDEXED_COLS);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ public static boolean isIcebergCompatV2Enabled(Row transactionState) {
114114
.getOrDefault(TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey(), "false"));
115115
}
116116

117+
/**
118+
* Get the iceberg compatibility enabled or not from the transaction state {@link Row} returned by
119+
* {@link Transaction#getTransactionState(Engine)}
120+
*
121+
* @param transactionState Transaction state state {@link Row}
122+
* @return True if iceberg compatibility is enabled, false otherwise.
123+
*/
124+
public static boolean isIcebergCompatV3Enabled(Row transactionState) {
125+
return Boolean.parseBoolean(
126+
getConfiguration(transactionState)
127+
.getOrDefault(TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey(), "false"));
128+
}
129+
117130
/**
118131
* Get the column mapping mode from the transaction state {@link Row} returned by {@link
119132
* Transaction#getTransactionState(Engine)}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/icebergcompat/IcebergUniversalFormatMetadataValidatorAndUpdater.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
1919
import io.delta.kernel.internal.TableConfig;
2020
import io.delta.kernel.internal.actions.Metadata;
21+
import java.util.Arrays;
22+
import java.util.List;
2123
import java.util.Set;
2224

2325
/**
@@ -51,14 +53,33 @@ public static void validate(Metadata metadata) {
5153

5254
Set<String> targetFormats = TableConfig.UNIVERSAL_FORMAT_ENABLED_FORMATS.fromMetadata(metadata);
5355
boolean isIcebergEnabled = targetFormats.contains(TableConfig.UniversalFormats.FORMAT_ICEBERG);
54-
boolean isIcebergCompatV2Enabled = TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata);
55-
if (isIcebergEnabled && !isIcebergCompatV2Enabled) {
56+
57+
List<TableConfig<Boolean>> icebergCompatOptions =
58+
Arrays.asList(TableConfig.ICEBERG_COMPAT_V2_ENABLED, TableConfig.ICEBERG_COMPAT_V3_ENABLED);
59+
long enabledCompatCount =
60+
icebergCompatOptions.stream().filter(opt -> opt.fromMetadata(metadata)).count();
61+
62+
if (isIcebergEnabled && enabledCompatCount == 0) {
63+
String optionKeys =
64+
String.join(
65+
" or ",
66+
icebergCompatOptions.stream().map(TableConfig::getKey).toArray(String[]::new));
5667
throw new InvalidConfigurationValueException(
5768
TableConfig.UNIVERSAL_FORMAT_ENABLED_FORMATS.getKey(),
5869
TableConfig.UniversalFormats.FORMAT_ICEBERG,
5970
String.format(
60-
"'%s' must be set to \"true\" to enable iceberg uniform format.",
61-
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey()));
71+
"One of %s must be set to \"true\" to enable iceberg uniform format.", optionKeys));
72+
}
73+
74+
if (enabledCompatCount > 1) {
75+
String optionKeys =
76+
String.join(
77+
"' and '",
78+
icebergCompatOptions.stream().map(TableConfig::getKey).toArray(String[]::new));
79+
throw new InvalidConfigurationValueException(
80+
TableConfig.UNIVERSAL_FORMAT_ENABLED_FORMATS.getKey(),
81+
TableConfig.UniversalFormats.FORMAT_ICEBERG,
82+
String.format("'%s' cannot be enabled at the same time.", optionKeys));
6283
}
6384
}
6485
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,25 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me
322322
}
323323
}
324324

325+
public static final TableFeature ICEBERG_COMPAT_V3_W_FEATURE = new IcebergCompatV3TableFeature();
326+
327+
private static class IcebergCompatV3TableFeature extends TableFeature.WriterFeature
328+
implements FeatureAutoEnabledByMetadata {
329+
IcebergCompatV3TableFeature() {
330+
super("icebergCompatV3", /* minWriterVersion = */ 7);
331+
}
332+
333+
@Override
334+
public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) {
335+
return TableConfig.ICEBERG_COMPAT_V3_ENABLED.fromMetadata(metadata);
336+
}
337+
338+
public @Override Set<TableFeature> requiredFeatures() {
339+
return Collections.unmodifiableSet(
340+
new HashSet<>(Arrays.asList(COLUMN_MAPPING_RW_FEATURE, ROW_TRACKING_W_FEATURE)));
341+
}
342+
}
343+
325344
/* ---- Start: type widening ---- */
326345
// Base class for typeWidening and typeWidening-preview features. Both features are same in terms
327346
// of behavior and given the feature is graduated, we will enable the `typeWidening` by default
@@ -459,6 +478,7 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me
459478
GENERATED_COLUMNS_W_FEATURE,
460479
DOMAIN_METADATA_W_FEATURE,
461480
ICEBERG_COMPAT_V2_W_FEATURE,
481+
ICEBERG_COMPAT_V3_W_FEATURE,
462482
IDENTITY_COLUMNS_W_FEATURE,
463483
IN_COMMIT_TIMESTAMP_W_FEATURE,
464484
INVARIANTS_W_FEATURE,

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergUniversalFormatMetadataValidatorAndUpdaterSuite.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,37 @@ class IcebergUniversalFormatMetadataValidatorAndUpdaterSuite extends AnyFunSuite
5151

5252
Seq(
5353
Map(TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "false"),
54-
Map[String, String]()).foreach { disableIcebergCompatV2 =>
54+
Map(TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey -> "false"),
55+
Map[String, String]()).foreach { disableIcebergCompat =>
5556
test(
56-
"validate should throw when iceberg universal format is enabled and "
57-
+ s"icebergCompatV2 isn't $disableIcebergCompatV2") {
57+
"validate should throw when iceberg universal format is enabled but "
58+
+ s"no IcebergCompat version is enabled: $disableIcebergCompat") {
5859
val metadata = createMetadata(Map(
5960
TableConfig.UNIVERSAL_FORMAT_ENABLED_FORMATS.getKey -> "iceberg",
60-
"unrelated_key" -> "unrelated_value") ++ disableIcebergCompatV2)
61+
"unrelated_key" -> "unrelated_value") ++ disableIcebergCompat)
6162
val exc = intercept[InvalidConfigurationValueException] {
6263
IcebergUniversalFormatMetadataValidatorAndUpdater.validate(metadata)
6364
}
6465
assert(exc.getMessage == "Invalid value for table property " +
6566
"'delta.universalFormat.enabledFormats': 'iceberg'. " +
66-
"'delta.enableIcebergCompatV2' must be set to \"true\" to enable iceberg uniform format.")
67+
"One of delta.enableIcebergCompatV2 or delta.enableIcebergCompatV3 " +
68+
"must be set to \"true\" to enable iceberg uniform format.")
6769
}
6870
}
6971

72+
test("validate should throw when both IcebergCompatV2 and V3 are enabled") {
73+
val metadata = createMetadata(Map(
74+
TableConfig.UNIVERSAL_FORMAT_ENABLED_FORMATS.getKey -> "iceberg",
75+
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true",
76+
TableConfig.ICEBERG_COMPAT_V3_ENABLED.getKey -> "true"))
77+
val exc = intercept[InvalidConfigurationValueException] {
78+
IcebergUniversalFormatMetadataValidatorAndUpdater.validate(metadata)
79+
}
80+
assert(exc.getMessage.contains(
81+
"'delta.enableIcebergCompatV2' and 'delta.enableIcebergCompatV3' " +
82+
"cannot be enabled at the same time."))
83+
}
84+
7085
def createMetadata(tblProps: Map[String, String] = Map.empty): Metadata = {
7186
val schema = new StructType()
7287
.add("c1", IntegerType.INTEGER)

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class TableFeaturesSuite extends AnyFunSuite {
6161
"rowTracking",
6262
"domainMetadata",
6363
"icebergCompatV2",
64+
"icebergCompatV3",
6465
"inCommitTimestamp",
6566
"icebergWriterCompatV1",
6667
"clustering")
@@ -153,6 +154,14 @@ class TableFeaturesSuite extends AnyFunSuite {
153154
"icebergCompatV2",
154155
testMetadata(tblProps = Map("delta.enableIcebergCompatV2" -> "false")),
155156
false),
157+
(
158+
"icebergCompatV3",
159+
testMetadata(tblProps = Map("delta.enableIcebergCompatV3" -> "true")),
160+
true),
161+
(
162+
"icebergCompatV3",
163+
testMetadata(tblProps = Map("delta.enableIcebergCompatV3" -> "false")),
164+
false),
156165
(
157166
"inCommitTimestamp",
158167
testMetadata(tblProps = Map("delta.enableInCommitTimestamps" -> "true")),
@@ -238,6 +247,7 @@ class TableFeaturesSuite extends AnyFunSuite {
238247
"rowTracking",
239248
"domainMetadata",
240249
"icebergCompatV2",
250+
"icebergCompatV3",
241251
"inCommitTimestamp",
242252
"appendOnly",
243253
"invariants",
@@ -568,6 +578,11 @@ class TableFeaturesSuite extends AnyFunSuite {
568578
new Protocol(3, 7, emptySet(), singleton("icebergCompatV2")),
569579
testMetadata(tblProps = Map("delta.enableIcebergCompatV2" -> "true")))
570580

581+
checkWriteSupported(
582+
"validateKernelCanWriteToTable: protocol 7 with icebergCompatV3",
583+
new Protocol(3, 7, emptySet(), singleton("icebergCompatV3")),
584+
testMetadata(tblProps = Map("delta.enableIcebergCompatV3" -> "true")))
585+
571586
checkWriteSupported(
572587
"validateKernelCanWriteToTable: protocol 7 with v2Checkpoint, " +
573588
"metadata enables v2Checkpoint",
@@ -791,6 +806,21 @@ class TableFeaturesSuite extends AnyFunSuite {
791806
set(),
792807
set("columnMapping", "appendOnly", "invariants", "icebergCompatV2")),
793808
set("icebergCompatV2", "columnMapping")),
809+
(
810+
testMetadata(tblProps = Map("delta.enableIcebergCompatV3" -> "true")),
811+
new Protocol(1, 2),
812+
new Protocol(
813+
2,
814+
7,
815+
set(),
816+
set(
817+
"columnMapping",
818+
"appendOnly",
819+
"invariants",
820+
"icebergCompatV3",
821+
"domainMetadata",
822+
"rowTracking")),
823+
set("icebergCompatV3", "domainMetadata", "columnMapping", "rowTracking")),
794824
(
795825
testMetadata(tblProps =
796826
Map("delta.enableIcebergCompatV2" -> "true", "delta.enableDeletionVectors" -> "true")),
@@ -819,6 +849,21 @@ class TableFeaturesSuite extends AnyFunSuite {
819849
set("columnMapping", "deletionVectors"),
820850
set("columnMapping", "icebergCompatV2", "deletionVectors")),
821851
set("icebergCompatV2")),
852+
(
853+
testMetadata(tblProps =
854+
Map("delta.enableIcebergCompatV3" -> "true")),
855+
new Protocol(3, 7, set("columnMapping", "deletionVectors"), set("columnMapping")),
856+
new Protocol(
857+
3,
858+
7,
859+
set("columnMapping", "deletionVectors"),
860+
set(
861+
"columnMapping",
862+
"icebergCompatV3",
863+
"deletionVectors",
864+
"domainMetadata",
865+
"rowTracking")),
866+
set("icebergCompatV3", "domainMetadata", "rowTracking")),
822867
(
823868
testMetadata(tblProps = Map("delta.enableIcebergWriterCompatV1" -> "true")),
824869
new Protocol(1, 2),
@@ -963,6 +1008,15 @@ class TableFeaturesSuite extends AnyFunSuite {
9631008
// that is of partial (writer only) table feature support
9641009
testMetadata(tblProps = Map("delta.enableIcebergCompatV2" -> "true")),
9651010
new Protocol(2, 7, set(), set("columnMapping", "icebergCompatV2"))),
1011+
(
1012+
// try to enable the feature that is already supported on a protocol
1013+
// that is of partial (writer only) table feature support
1014+
testMetadata(tblProps = Map("delta.enableIcebergCompatV3" -> "true")),
1015+
new Protocol(
1016+
2,
1017+
7,
1018+
set(),
1019+
set("columnMapping", "icebergCompatV3", "domainMetadata", "rowTracking"))),
9661020
(
9671021
// try to enable the feature that is already supported on a protocol
9681022
// that is of table feature support

0 commit comments

Comments
 (0)