Skip to content

Commit 0fb85d8

Browse files
authored
[UniForm] Support converting Delta default values to Iceberg (delta-io#4898)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR allows Delta default values to be converted to Iceberg <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? UT <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 9ca77e1 commit 0fb85d8

File tree

3 files changed

+142
-7
lines changed

3 files changed

+142
-7
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/DeltaToIcebergConvert.scala

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,24 @@
1616

1717
package org.apache.spark.sql.delta.icebergShaded
1818

19+
import java.sql.Timestamp
20+
import java.time.{LocalDateTime, OffsetDateTime}
21+
import java.time.format._
22+
23+
import scala.util.control.NonFatal
24+
1925
import org.apache.spark.sql.delta.{DeltaConfig, DeltaConfigs, IcebergCompat, NoMapping, Snapshot, SnapshotDescriptor}
2026
import org.apache.spark.sql.delta.DeltaConfigs.{LOG_RETENTION, TOMBSTONE_RETENTION}
2127
import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils
2228
import org.apache.spark.sql.delta.metering.DeltaLogging
2329
import shadedForDelta.org.apache.iceberg.{PartitionSpec, Schema => IcebergSchema, StructLike, TableProperties => IcebergTableProperties}
30+
import shadedForDelta.org.apache.iceberg.expressions.Literal
2431
import shadedForDelta.org.apache.iceberg.types.{Type => IcebergType, Types => IcebergTypes}
32+
import shadedForDelta.org.apache.iceberg.util.DateTimeUtil
2533

2634
import org.apache.spark.sql.catalyst.catalog.CatalogTable
27-
import org.apache.spark.sql.types.DataType
35+
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
36+
import org.apache.spark.sql.types._
2837
import org.apache.spark.unsafe.types.CalendarInterval
2938

3039
class DeltaToIcebergConvert(val snapshot: SnapshotDescriptor, val catalogTable: CatalogTable) {
@@ -33,10 +42,13 @@ class DeltaToIcebergConvert(val snapshot: SnapshotDescriptor, val catalogTable:
3342
IcebergSchemaUtils(snapshot.metadata.columnMappingMode == NoMapping)
3443

3544

36-
val schema: IcebergSchema = {
37-
val icebergStruct = schemaUtils.convertStruct(snapshot.schema)
38-
new IcebergSchema(icebergStruct.fields())
39-
}
45+
val schema: IcebergSchema = IcebergCompat
46+
.getEnabledVersion(snapshot.metadata)
47+
.orElse(Some(0))
48+
.map { compatVersion =>
49+
val icebergStruct = schemaUtils.convertStruct(snapshot.schema)(compatVersion)
50+
new IcebergSchema(icebergStruct.fields())
51+
}.getOrElse(throw new IllegalArgumentException("No IcebergCompat available"))
4052

4153
val partition: PartitionSpec = IcebergTransactionUtils
4254
.createPartitionSpec(schema, snapshot.metadata.partitionColumns)
@@ -47,6 +59,108 @@ class DeltaToIcebergConvert(val snapshot: SnapshotDescriptor, val catalogTable:
4759
object DeltaToIcebergConvert
4860
extends DeltaLogging
4961
{
62+
/**
63+
* Utils used when converting Delta schema to Iceberg
64+
*/
65+
object Schema {
66+
/**
67+
* Extract Delta Column Default values in Iceberg Literal format
68+
* @param field column
69+
* @return Right(Some(Literal)) if the column contains a literal default
70+
* Right(None) if the column does not have a default
71+
* Left(errorMessage) if the column contains a non-literal default
72+
*/
73+
def extractLiteralDefault(field: StructField): Either[String, Option[Literal[_]]] = {
74+
if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
75+
val defaultValueStr = field.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
76+
try {
77+
Right(Some(stringToLiteral(defaultValueStr, field.dataType)))
78+
} catch {
79+
case NonFatal(e) =>
80+
Left("Unsupported default value:" +
81+
s"${field.dataType.typeName}:$defaultValueStr:${e.getMessage}")
82+
case unknown: Throwable => throw unknown
83+
}
84+
} else {
85+
Right(None)
86+
}
87+
}
88+
/**
89+
* Convert Delta default value string to an Iceberg Literal based on data type.
90+
* @param str default value in Delta column metadata
91+
* @param dataType Delta column data type
92+
* @return converted Literal
93+
*/
94+
def stringToLiteral(str: String, dataType: DataType): Literal[_] = {
95+
def parseString(input: String) = {
96+
if (input.length > 1 && ((input.head == '\'' && input.last == '\'')
97+
|| (input.head == '"' && input.last == '"'))) {
98+
Literal.of(input.substring(1, input.length - 1))
99+
} else {
100+
throw new UnsupportedOperationException(s"String missing quotation marks: $input")
101+
}
102+
}
103+
// Parse either hex encoded literal x'....' or string literal(utf8) into binary
104+
def parseBinary(input: String) = {
105+
if (input.startsWith("x") || input.startsWith("X")) {
106+
// Hex encoded literal
107+
Literal.of(BigInt(parseString(input.substring(1))
108+
.value().toString, 16).toByteArray.dropWhile(_ == 0))
109+
} else {
110+
Literal.of(parseString(input).value().toString
111+
.getBytes(java.nio.charset.StandardCharsets.UTF_8))
112+
}
113+
}
114+
// Parse timestamp string without time zone info
115+
def parseLocalTimestamp(input: String) = {
116+
val formats = Seq(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
117+
DateTimeFormatter.ISO_LOCAL_DATE_TIME)
118+
val stripped = parseString(input).value()
119+
val parsed = formats.flatMap { format =>
120+
try {
121+
Some(Literal.of(Timestamp.valueOf(LocalDateTime.parse(stripped, format)).getTime))
122+
} catch {
123+
case NonFatal(_) => None
124+
}
125+
}
126+
if (parsed.nonEmpty) {
127+
parsed.head
128+
} else {
129+
throw new IllegalArgumentException(input)
130+
}
131+
}
132+
// Parse string with time zone info. If the input has no time zone, assume its UTC.
133+
def parseTimestamp(input: String) = {
134+
val stripped = parseString(input).value()
135+
try {
136+
Literal.of(OffsetDateTime.parse(stripped, DateTimeFormatter.ISO_DATE_TIME)
137+
.toInstant.toEpochMilli)
138+
} catch {
139+
case NonFatal(_) => parseLocalTimestamp(input)
140+
}
141+
}
142+
143+
dataType match {
144+
case StringType => parseString(str)
145+
case LongType => Literal.of(java.lang.Long.valueOf(str.replaceAll("[lL]$", "")))
146+
case IntegerType | ShortType | ByteType => Literal.of(Integer.valueOf(str))
147+
case FloatType => Literal.of(java.lang.Float.valueOf(str))
148+
case DoubleType => Literal.of(java.lang.Double.valueOf(str))
149+
// The number should be correctly formatted without need to rounding
150+
case d: DecimalType => Literal.of(
151+
new java.math.BigDecimal(str, new java.math.MathContext(d.precision)).setScale(d.scale)
152+
)
153+
case BooleanType => Literal.of(java.lang.Boolean.valueOf(str))
154+
case BinaryType => parseBinary(str)
155+
case DateType => parseString(str).to(IcebergTypes.DateType.get())
156+
case TimestampType => parseTimestamp(str)
157+
case TimestampNTZType => parseLocalTimestamp(str)
158+
case _ =>
159+
throw new UnsupportedOperationException(
160+
s"Could not convert default value: $dataType: $str")
161+
}
162+
}
163+
}
50164

51165
object TableProperties {
52166
/**

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergSchemaUtils.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.icebergShaded
1919
import scala.collection.JavaConverters._
2020

2121
import org.apache.spark.sql.delta.{DeltaColumnMapping, SnapshotDescriptor}
22+
import org.apache.spark.sql.delta.actions.Protocol
2223
import org.apache.spark.sql.delta.metering.DeltaLogging
2324
import shadedForDelta.org.apache.iceberg.{Schema => IcebergSchema}
2425
import shadedForDelta.org.apache.iceberg.types.{Type => IcebergType, Types => IcebergTypes}
@@ -55,7 +56,8 @@ trait IcebergSchemaUtils extends DeltaLogging {
5556
private[delta] def getNestedFieldId(field: Option[StructField], path: Seq[String]): Int
5657

5758
/** Visible for testing */
58-
private[delta] def convertStruct(deltaSchema: StructType): IcebergTypes.StructType = {
59+
private[delta] def convertStruct(deltaSchema: StructType)(
60+
implicit compatVersion: Int = 0): IcebergTypes.StructType = {
5961
/**
6062
* Recursively (i.e. for all nested elements) transforms the delta DataType `elem` into its
6163
* corresponding Iceberg type.
@@ -69,13 +71,23 @@ trait IcebergSchemaUtils extends DeltaLogging {
6971
: IcebergType = elem match {
7072
case StructType(fields) =>
7173
IcebergTypes.StructType.of(fields.map { f =>
72-
IcebergTypes.NestedField.of(
74+
val icebergField = IcebergTypes.NestedField.of(
7375
getFieldId(Some(f)),
7476
f.nullable,
7577
f.name,
7678
transform(f.dataType, Some(f), Seq(DeltaColumnMapping.getPhysicalName(f))),
7779
f.getComment().orNull
7880
)
81+
// Translate column default value
82+
if (compatVersion >= 3) {
83+
DeltaToIcebergConvert.Schema.extractLiteralDefault(f) match {
84+
case Left(errorMsg) =>
85+
throw new UnsupportedOperationException(errorMsg)
86+
case _ => icebergField
87+
}
88+
} else {
89+
icebergField
90+
}
7991
}.toList.asJava)
8092

8193
case ArrayType(elementType, containsNull) =>

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3480,6 +3480,15 @@ trait DeltaErrorsBase
34803480
)
34813481
}
34823482

3483+
def icebergCompatUnsupportedFieldException(
3484+
version: Int, field: StructField, schema: StructType): Throwable = {
3485+
new DeltaUnsupportedOperationException(
3486+
errorClass = "DELTA_ICEBERG_COMPAT_VIOLATION.UNSUPPORTED_DATA_TYPE",
3487+
messageParameters = Array(version.toString, version.toString,
3488+
s"${field.dataType.typeName}:${field.name}", schema.treeString)
3489+
)
3490+
}
3491+
34833492
def icebergCompatUnsupportedPartitionDataTypeException(
34843493
version: Int, dataType: DataType, schema: StructType): Throwable = {
34853494
new DeltaUnsupportedOperationException(

0 commit comments

Comments
 (0)