Skip to content

Commit 39af931

Browse files
authored
[Uniform]Propagate the user-defined table properties to Iceberg metadata (delta-io#4357)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> This PR propagate the user-defined table properties to iceberg metadata, user-defined properties refer to the ones not starting with "delta.". To avoid breaking change(some user might have started using "delta.universalformat.config.iceberg.abc" to set properties "abc"), we still keep propagating the ones with prefix "delta.universalformat.config.iceberg" by stripping the prefix instead of ignoring them. However, we will do a duplication check to avoid both "delta.universalformat.config.iceberg.abc" and "abc" are set #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (uniform) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Tested with running set tblProperties on spark ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 732ad5c commit 39af931

File tree

3 files changed

+36
-3
lines changed

3 files changed

+36
-3
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.icebergShaded
1818

1919
import org.apache.spark.sql.delta.{DeltaConfig, DeltaConfigs, IcebergCompat}
2020
import org.apache.spark.sql.delta.DeltaConfigs.{LOG_RETENTION, TOMBSTONE_RETENTION}
21+
import org.apache.spark.sql.delta.DeltaErrors.icebergTablePropertiesConflictException
2122
import shadedForDelta.org.apache.iceberg.{TableProperties => IcebergTableProperties}
2223

2324
import org.apache.spark.unsafe.types.CalendarInterval
@@ -30,24 +31,42 @@ object DeltaToIcebergConvert {
3031
object TableProperties {
3132
/**
3233
* We generate Iceberg Table properties from Delta table properties
33-
* using two methods.
34+
* using three methods.
3435
* 1. If a Delta property key starts with "delta.universalformat.config.iceberg"
3536
* we strip the prefix from the key and include the property pair.
3637
* Note the key is already normalized to lower case.
37-
* 2. We compute Iceberg properties from Delta using custom logic
38+
* 2. If it is a non delta property, i.e. property not starting with "delta.",
39+
* include the property pair.
40+
* 3. We compute Iceberg properties from Delta using custom logic
3841
* This now includes
3942
* a) Iceberg format version
4043
* b) Iceberg snapshot retention
4144
*/
4245
def apply(deltaProperties: Map[String, String]): Map[String, String] = {
4346
val prefix = DeltaConfigs.DELTA_UNIVERSAL_FORMAT_ICEBERG_CONFIG_PREFIX
44-
val copiedFromDelta =
47+
// Key with delta.universalformat.config.iceberg prefix
48+
// will have the prefix stripped and copied to iceberg
49+
val stripedPrefixProperties =
4550
deltaProperties
4651
.filterKeys(_.startsWith(prefix))
4752
.map { case (key, value) => key.stripPrefix(prefix) -> value }
4853
.toSeq
4954
.toMap
5055

56+
// Key without delta. prefix will be copied to Iceberg directly.
57+
val customProperties = deltaProperties.filterKeys(!_.startsWith("delta."))
58+
59+
// Setting "delta.universalformat.config.iceberg.property_name_foo"
60+
// and "property_name_foo" will be considered as duplicate because
61+
// "delta.universalformat.config.iceberg.property_name_foo" will be
62+
// converted to "property_name_foo" after stripping prefix.
63+
val duplicateUserSpecifiedProperties =
64+
stripedPrefixProperties.keySet.intersect(customProperties.keySet)
65+
if (duplicateUserSpecifiedProperties.nonEmpty) {
66+
throw icebergTablePropertiesConflictException(duplicateUserSpecifiedProperties)
67+
}
68+
69+
val copiedFromDelta = stripedPrefixProperties ++ customProperties
5170
val computers = Seq(FormatVersionComputer, RetentionPeriodComputer)
5271
val computed: Map[String, String] = computers
5372
.map(_.apply(deltaProperties, copiedFromDelta))

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,6 +3104,13 @@
31043104
],
31053105
"sqlState" : "55019"
31063106
},
3107+
"DUPLICATE_KEY_IN_ICEBERG_TABLE_PROPERTY" : {
3108+
"message" : [
3109+
"Duplicate keys <tblProperties> found in table properties on Iceberg Table, please check the ",
3110+
"if these properties with and without prefix \"delta.universalformat.config.iceberg\" are both set"
3111+
],
3112+
"sqlState" : "23505"
3113+
},
31073114
"INCORRECT_NUMBER_OF_ARGUMENTS" : {
31083115
"message" : [
31093116
"<failure>, <functionName> requires at least <minArgs> arguments and at most <maxArgs> arguments."

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3092,6 +3092,13 @@ trait DeltaErrorsBase
30923092
new DeltaIllegalArgumentException(errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES")
30933093
}
30943094

3095+
def icebergTablePropertiesConflictException(duplicatedKeys: Set[String]): Throwable = {
3096+
new DeltaIllegalStateException(
3097+
errorClass = "DUPLICATE_KEY_IN_ICEBERG_TABLE_PROPERTY",
3098+
messageParameters = Array(duplicatedKeys.mkString(", "))
3099+
)
3100+
}
3101+
30953102
def icebergClassMissing(sparkConf: SparkConf, cause: Throwable): Throwable = {
30963103
new DeltaIllegalStateException(
30973104
errorClass = "DELTA_MISSING_ICEBERG_CLASS",

0 commit comments

Comments
 (0)