Skip to content

Commit 732ad5c

Browse files
authored
[UniForm] Refactor IcebergCompat (delta-io#4356)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR refactors existing IcebergCompat code and test cases. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? UT/Integration <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent ceeb035 commit 732ad5c

File tree

8 files changed

+137
-216
lines changed

8 files changed

+137
-216
lines changed

iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniversalFormatSuite.scala

Lines changed: 2 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -56,114 +56,14 @@ trait UniversalFormatSuiteUtilsBase
5656
.filterNot { case (r, w) => w < 7 && r >= 3 }
5757
}
5858

59-
/** Contains shared tests for both IcebergCompatV1 and IcebergCompatV2 suites. */
60-
trait UniversalFormatSuiteTestBase extends UniversalFormatSuiteUtilsBase {
61-
private def loadIcebergTable(
62-
spark: SparkSession,
63-
id: TableIdentifier): Option[shadedForDelta.org.apache.iceberg.Table] = {
64-
val deltaLog = DeltaLog.forTable(spark, id)
65-
val catalogTable = spark.sessionState.catalog.getTableMetadata(id)
66-
val hiveCatalog = IcebergTransactionUtils
67-
.createHiveCatalog(deltaLog.newDeltaHadoopConf())
68-
val icebergTableId = IcebergTransactionUtils
69-
.convertSparkTableIdentifierToIcebergHive(catalogTable.identifier)
70-
if (hiveCatalog.tableExists(icebergTableId)) {
71-
Some(hiveCatalog.loadTable(icebergTableId))
72-
} else {
73-
None
74-
}
75-
}
76-
77-
protected def getCompatVersionOtherThan(version: Int): Int
78-
79-
protected def getCompatVersionsOtherThan(version: Int): Seq[Int]
80-
81-
test("CTAS new UniForm (Iceberg) table without manually enabling column mapping") {
82-
// These are the versions with column mapping enabled
83-
allReaderWriterVersions.foreach { case (r, w) =>
84-
withTempTableAndDir { case (from_id, from_loc) =>
85-
withTempTableAndDir { case (to_id1, to_loc1) =>
86-
withTempTableAndDir { case (to_id2, to_loc2) =>
87-
executeSql(
88-
s"""
89-
|CREATE TABLE $from_id (PID INT, PCODE INT) USING DELTA LOCATION $from_loc
90-
| TBLPROPERTIES (
91-
| 'delta.minReaderVersion' = $r,
92-
| 'delta.minWriterVersion' = $w
93-
|)""".stripMargin)
94-
executeSql(
95-
s"""
96-
|INSERT INTO TABLE $from_id (PID, PCODE)
97-
| VALUES (1,2),(2,3),(3,4),(4,5),(5,6),(6,7),(7,8)""".stripMargin)
98-
executeSql(
99-
s"""
100-
|CREATE TABLE $to_id1 USING DELTA LOCATION $to_loc1 TBLPROPERTIES (
101-
| 'delta.universalFormat.enabledFormats' = 'iceberg',
102-
| 'delta.enableIcebergCompatV$compatVersion' = 'true',
103-
| 'delta.minReaderVersion' = $r,
104-
| 'delta.minWriterVersion' = $w
105-
|) AS SELECT * FROM $from_id""".stripMargin)
106-
executeSql(
107-
s"""
108-
|CREATE TABLE $to_id2 USING DELTA LOCATION $to_loc2 TBLPROPERTIES (
109-
| 'delta.universalFormat.enabledFormats' = 'iceberg',
110-
| 'delta.columnMapping.mode' = 'name',
111-
| 'delta.enableIcebergCompatV$compatVersion' = 'true',
112-
| 'delta.minReaderVersion' = $r,
113-
| 'delta.minWriterVersion' = $w
114-
|) AS SELECT * FROM $from_id""".stripMargin)
115-
116-
val icebergTable1 = loadIcebergTable(spark, new TableIdentifier(to_id1))
117-
if (icebergTable1.isDefined) {
118-
assert(icebergTable1.get.schema().asStruct().fields().toArray.length == 2)
119-
}
120-
121-
val expected1: Array[Row] = (1 to 7).map { i => Row(i, i + 1) }.toArray
122-
readAndVerify(to_id1, "PID, PCODE", "PID", expected1)
123-
124-
val icebergTable2 = loadIcebergTable(spark, new TableIdentifier(to_id2))
125-
if (icebergTable2.isDefined) {
126-
assert(icebergTable2.get.schema().asStruct().fields().toArray.length == 2)
127-
}
128-
129-
val expected2: Array[Row] = (1 to 7).map { i => Row(i, i + 1) }.toArray
130-
readAndVerify(to_id2, "PID, PCODE", "PID", expected2)
131-
}
132-
}
133-
}
134-
}
135-
}
136-
137-
test("REORG TABLE: command does not support where clause") {
138-
withTempTableAndDir { case (id, loc) =>
139-
val anotherCompatVersion = getCompatVersionOtherThan(compatVersion)
140-
executeSql(s"""
141-
| CREATE TABLE $id (ID INT) USING DELTA LOCATION $loc TBLPROPERTIES (
142-
| 'delta.universalFormat.enabledFormats' = 'iceberg',
143-
| 'delta.enableIcebergCompatV$anotherCompatVersion' = 'true'
144-
|)""".stripMargin)
145-
val e = intercept[ParseException] {
146-
spark.sessionState.sqlParser.parsePlan(
147-
s"""
148-
| REORG TABLE $id
149-
| WHERE ID > 0
150-
| APPLY (UPGRADE UNIFORM (ICEBERGCOMPATVERSION = $compatVersion))
151-
""".stripMargin).asInstanceOf[DeltaReorgTableCommand]
152-
}
153-
assert(e.getErrorClass === "PARSE_SYNTAX_ERROR")
154-
assert(e.getMessage.contains("Syntax error at or near 'REORG'"))
155-
}
156-
}
157-
}
158-
15959
class UniversalFormatSuite
16060
extends UniversalFormatMiscSuiteBase
16161
with UniversalFormatSuiteUtilsBase
16262

16363
class UniFormWithIcebergCompatV1Suite
164-
extends UniversalFormatSuiteTestBase
64+
extends UniversalFormatSuiteUtilsBase
16565
with UniFormWithIcebergCompatV1SuiteBase
16666

16767
class UniFormWithIcebergCompatV2Suite
168-
extends UniversalFormatSuiteTestBase
68+
extends UniversalFormatSuiteUtilsBase
16969
with UniFormWithIcebergCompatV2SuiteBase

spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,8 @@ trait DeltaColumnMappingBase extends DeltaLogging {
491491
field.copy(metadata = builder.build())
492492
})
493493

494-
val (finalSchema, newMaxId) = if (IcebergCompatV2.isEnabled(newMetadata)) {
494+
// Starting from IcebergCompatV2, we require writing field-id for List/Map nested fields
495+
val (finalSchema, newMaxId) = if (IcebergCompat.isGeqEnabled(newMetadata, 2)) {
495496
rewriteFieldIdsForIceberg(newSchema, maxId)
496497
} else {
497498
(newSchema, maxId)

spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,13 +269,13 @@ case class DeltaParquetFileFormat(
269269
dataSchema: StructType): OutputWriterFactory = {
270270
val factory = super.prepareWrite(sparkSession, job, options, dataSchema)
271271
val conf = ContextUtil.getConfiguration(job)
272-
// Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on Iceberg spec
273-
if (IcebergCompatV1.isEnabled(metadata) || IcebergCompatV2.isEnabled(metadata)) {
272+
// Always write timestamp as TIMESTAMP_MICROS for IcebergCompat based on Iceberg spec
273+
if (IcebergCompat.isAnyEnabled(metadata)) {
274274
conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
275275
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
276276
}
277-
if (IcebergCompatV2.isEnabled(metadata)) {
278-
// For Uniform with IcebergCompatV2, we need to write nested field IDs for list and map
277+
if (IcebergCompat.isGeqEnabled(metadata, 2)) {
278+
// Starting from IcebergCompatV2, we need to write nested field IDs for list and map
279279
// types to the parquet schema. Spark currently does not support it so we hook in our
280280
// own write support class.
281281
ParquetOutputFormat.setWriteSupportClass(job, classOf[DeltaParquetWriteSupport])

spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,21 +244,18 @@ case class IcebergCompatVersionBase(knownVersions: Set[IcebergCompatBase]) {
244244
.map{ _.version }
245245

246246
/**
247-
* Get the DeltaConfig for the given IcebergCompat version. If version is not valid,
247+
* Get the IcebergCompat by version. If version is not valid,
248248
* throw an exception.
249-
* @return the DeltaConfig for the given version. E.g.,
250-
* [[DeltaConfigs.ICEBERG_COMPAT_V1_ENABLED]] for version 1.
249+
* @return the IcebergCompatVx object
251250
*/
252-
def getConfigForVersion(version: Int): DeltaConfig[Option[Boolean]] = {
251+
def getForVersion(version: Int): IcebergCompatBase =
253252
knownVersions
254253
.find(_.version == version)
255-
.map(_.config)
256254
.getOrElse(
257255
throw DeltaErrors.icebergCompatVersionNotSupportedException(
258256
version, knownVersions.size
259257
)
260258
)
261-
}
262259

263260
/**
264261
* @return any enabled IcebergCompat in the conf
@@ -268,17 +265,22 @@ case class IcebergCompatVersionBase(knownVersions: Set[IcebergCompatBase]) {
268265
conf.getOrElse[String](compat.config.key, "false").toBoolean
269266
}
270267

268+
def anyEnabled(metadata: Metadata): Option[IcebergCompatBase] =
269+
knownVersions.find { _.config.fromMetaData(metadata).getOrElse(false) }
270+
271271
/**
272272
* @return true if any version of IcebergCompat is enabled
273273
*/
274274
def isAnyEnabled(conf: Map[String, String]): Boolean = anyEnabled(conf).nonEmpty
275275

276-
/**
277-
* @return true if any version of IcebergCompat is enabled
278-
*/
279276
def isAnyEnabled(metadata: Metadata): Boolean =
280277
knownVersions.exists { _.config.fromMetaData(metadata).getOrElse(false) }
281278

279+
/**
280+
* @return true if a CompatVx greater or eq to the required version is enabled
281+
*/
282+
def isGeqEnabled(metadata: Metadata, requiredVersion: Int): Boolean =
283+
anyEnabled(metadata).exists(_.version >= requiredVersion)
282284
/**
283285
* @return true if any version of IcebergCompat is enabled, and is incompatible
284286
* with the given table feature
@@ -291,8 +293,9 @@ case class IcebergCompatVersionBase(knownVersions: Set[IcebergCompatBase]) {
291293
}
292294
}
293295

294-
object IcebergCompat
295-
extends IcebergCompatVersionBase(Set(IcebergCompatV1, IcebergCompatV2)) with DeltaLogging
296+
object IcebergCompat extends IcebergCompatVersionBase(
297+
Set(IcebergCompatV1, IcebergCompatV2)
298+
) with DeltaLogging
296299

297300

298301

spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.sql.delta.commands
1919
// scalastyle:off import.ordering.noEmptyLine
2020
import scala.util.control.NonFatal
2121

22-
import org.apache.spark.sql.delta.{DeltaConfig, DeltaConfigs, DeltaErrors, DeltaOperations, Snapshot}
23-
import org.apache.spark.sql.delta.IcebergCompat.{getConfigForVersion, getEnabledVersion}
22+
import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeltaConfigs, DeltaErrors, DeltaOperations, IcebergCompatBase, Snapshot}
23+
import org.apache.spark.sql.delta.IcebergCompat.{getEnabledVersion, getForVersion}
2424
import org.apache.spark.sql.delta.UniversalFormat.{icebergEnabled, ICEBERG_FORMAT}
2525
import org.apache.spark.sql.delta.actions.{AddFile, Protocol}
2626
import org.apache.spark.sql.delta.catalog.DeltaTableV2
@@ -37,15 +37,15 @@ import org.apache.spark.sql.functions.col
3737
*/
3838
trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
3939

40-
private val versionChangesRequireRewrite: Map[Int, Set[Int]] =
41-
Map(0 -> Set(2), 1 -> Set(2), 2 -> Set(2))
40+
private val rewriteCheckTable: Map[Int, Set[Int]] =
41+
Map(0 -> Set(2, 3), 1 -> Set(2, 3), 2 -> Set(2, 3), 3 -> Set(2, 3))
4242

4343
/**
44-
* Helper function to check if the table data may need to be rewritten to be iceberg compatible.
45-
* Only if not all addFiles has the tag, Rewriting would be performed.
44+
* Check if the given pair of (old_version, new_version) should trigger a rewrite check.
45+
* NOTE: Actual rewrite only happens when not all addFiles has tags with newVersion.
4646
*/
47-
private def reorgMayNeedRewrite(oldVersion: Int, newVersion: Int): Boolean = {
48-
versionChangesRequireRewrite.getOrElse(oldVersion, Set.empty[Int]).contains(newVersion)
47+
private def shallCheckRewrite(oldVersion: Int, newVersion: Int): Boolean = {
48+
rewriteCheckTable.getOrElse(oldVersion, Set.empty[Int]).contains(newVersion)
4949
}
5050

5151
/**
@@ -54,42 +54,38 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
5454
def optimizeByReorg(sparkSession: SparkSession): Seq[Row]
5555

5656
/**
57-
* Helper function to update the table icebergCompat properties.
58-
* We can not use AlterTableSetPropertiesDeltaCommand here because we don't allow customer to
59-
* change icebergCompatVersion by using Alter Table command.
57+
* Enable the new IcebergCompat on the table by updating table conf.
6058
*/
6159
private def enableIcebergCompat(
62-
target: DeltaTableV2,
63-
currIcebergCompatVersionOpt: Option[Int],
64-
targetVersionDeltaConfig: DeltaConfig[Option[Boolean]]): Unit = {
65-
var enableIcebergCompatConf = Map(
66-
targetVersionDeltaConfig.key -> "true",
67-
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key -> "false",
68-
DeltaConfigs.COLUMN_MAPPING_MODE.key -> "name"
69-
)
70-
if (currIcebergCompatVersionOpt.nonEmpty) {
71-
val currIcebergCompatVersionDeltaConfig = getConfigForVersion(
72-
currIcebergCompatVersionOpt.get)
73-
enableIcebergCompatConf ++= Map(currIcebergCompatVersionDeltaConfig.key -> "false")
60+
table: DeltaTableV2,
61+
currentCompatVersion: Option[Int],
62+
compatToEnable: IcebergCompatBase): Unit = {
63+
var newConf: Map[String, String] = Map(
64+
compatToEnable.config.key -> "true",
65+
DeltaConfigs.COLUMN_MAPPING_MODE.key -> "name") ++
66+
currentCompatVersion.map(getForVersion(_).config.key -> "false") // Disable old IcebergCompat
67+
68+
if (compatToEnable.incompatibleTableFeatures.contains(DeletionVectorsTableFeature)) {
69+
newConf += DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key -> "false"
7470
}
7571

76-
val alterConfTxn = target.startTransaction()
72+
val alterConfTxn = table.startTransaction()
7773

7874
if (alterConfTxn.protocol.minWriterVersion < 7) {
79-
enableIcebergCompatConf += Protocol.MIN_WRITER_VERSION_PROP -> "7"
75+
newConf += Protocol.MIN_WRITER_VERSION_PROP -> "7"
8076
}
8177
if (alterConfTxn.protocol.minReaderVersion < 3) {
82-
enableIcebergCompatConf += Protocol.MIN_READER_VERSION_PROP -> "3"
78+
newConf += Protocol.MIN_READER_VERSION_PROP -> "3"
8379
}
8480

8581
val metadata = alterConfTxn.metadata
8682
val newMetadata = metadata.copy(
8783
description = metadata.description,
88-
configuration = metadata.configuration ++ enableIcebergCompatConf)
84+
configuration = metadata.configuration ++ newConf)
8985
alterConfTxn.updateMetadata(newMetadata)
9086
alterConfTxn.commit(
9187
Nil,
92-
DeltaOperations.UpgradeUniformProperties(enableIcebergCompatConf)
88+
DeltaOperations.UpgradeUniformProperties(newConf)
9389
)
9490
}
9591

@@ -131,13 +127,13 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
131127
*
132128
* * There are six possible write combinations:
133129
* | CurrentIcebergCompatVersion | TargetIcebergCompatVersion | Required steps|
134-
* | --------------- | --------------- | --------------- |
135-
* | None | 1 | 1, 3 |
136-
* | None | 2 | 1, 2, 3 |
137-
* | 1 | 1 | 3 |
138-
* | 1 | 2 | 1, 2, 3 |
139-
* | 2 | 1 | 1, 3 |
140-
* | 2 | 2 | 2, 3 |
130+
* | --------------------------- | -------------------------- | ------------- |
131+
* | None | 1 | 1, 3 |
132+
* | None | 2+ | 1, 2, 3 |
133+
* | 1 | 1 | 3 |
134+
* | 1 | 2+ | 1, 2, 3 |
135+
* | 2+ | 1 | 1, 3 |
136+
* | 2+ | 2+ | 2, 3 |
141137
*/
142138
private def doRewrite(
143139
target: DeltaTableV2,
@@ -146,15 +142,14 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
146142

147143
val snapshot = target.update()
148144
val currIcebergCompatVersionOpt = getEnabledVersion(snapshot.metadata)
149-
val targetVersionDeltaConfig = getConfigForVersion(
150-
targetIcebergCompatVersion)
151-
val versionChangeMayNeedRewrite = reorgMayNeedRewrite(
145+
val targetIcebergCompatObject = getForVersion(targetIcebergCompatVersion)
146+
val mayNeedRewrite = shallCheckRewrite(
152147
currIcebergCompatVersionOpt.getOrElse(0), targetIcebergCompatVersion)
153148

154149
// Step 1: Update the table properties to enable the target iceberg compat version
155150
val didUpdateIcebergCompatVersion =
156151
if (!currIcebergCompatVersionOpt.contains(targetIcebergCompatVersion)) {
157-
enableIcebergCompat(target, currIcebergCompatVersionOpt, targetVersionDeltaConfig)
152+
enableIcebergCompat(target, currIcebergCompatVersionOpt, targetIcebergCompatObject)
158153
logInfo(log"Update table ${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} " +
159154
log"to iceberg compat version = " +
160155
log"${MDC(DeltaLogKeys.VERSION, targetIcebergCompatVersion)} successfully.")
@@ -170,7 +165,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
170165
// The table needs to be rewritten if:
171166
// 1. The target iceberg compat version requires rewrite.
172167
// 2. Not all addFile have ICEBERG_COMPAT_VERSION=targetVersion tag
173-
val (metricsOpt, didRewrite) = if (versionChangeMayNeedRewrite && !allAddFilesHaveTag) {
168+
val (metricsOpt, didRewrite) = if (mayNeedRewrite && !allAddFilesHaveTag) {
174169
logInfo(log"Reorg Table ${MDC(DeltaLogKeys.TABLE_NAME, target.tableIdentifier)} to " +
175170
log"iceberg compat version = ${MDC(DeltaLogKeys.VERSION, targetIcebergCompatVersion)} " +
176171
log"need rewrite data files.")
@@ -191,7 +186,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
191186
val updatedSnapshot = target.deltaLog.update()
192187
val (numOfAddFiles, numOfAddFilesWithIcebergCompatTag) = getNumOfAddFiles(
193188
targetIcebergCompatVersion, target, updatedSnapshot)
194-
if (versionChangeMayNeedRewrite && numOfAddFilesWithIcebergCompatTag != numOfAddFiles) {
189+
if (mayNeedRewrite && numOfAddFilesWithIcebergCompatTag != numOfAddFiles) {
195190
throw DeltaErrors.icebergCompatReorgAddFileTagsMissingException(
196191
updatedSnapshot.version,
197192
targetIcebergCompatVersion,
@@ -215,7 +210,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
215210
"targetIcebergCompatVersion" -> targetIcebergCompatVersion.toString,
216211
"metrics" -> metricsOpt.toString,
217212
"didUpdateIcebergCompatVersion" -> didUpdateIcebergCompatVersion.toString,
218-
"needRewrite" -> versionChangeMayNeedRewrite.toString,
213+
"needRewrite" -> mayNeedRewrite.toString,
219214
"didRewrite" -> didRewrite.toString,
220215
"numOfAddFilesBefore" -> numOfAddFilesBefore.toString,
221216
"numOfAddFilesWithIcebergCompatTagBefore" -> numOfAddFilesWithTagBefore.toString,

spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,10 +518,13 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
518518
}
519519

520520
// add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
521-
if (IcebergCompatV2.isEnabled(metadata)) {
521+
// starting from IcebergCompatV2
522+
val enabledCompat = IcebergCompat.anyEnabled(metadata)
523+
if (enabledCompat.exists(_.version >= 2)) {
522524
resultFiles = resultFiles.map { addFile =>
523-
val tags = if (addFile.tags != null) addFile.tags else Map.empty[String, String]
524-
addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name -> "2"))
525+
addFile.copy(tags = Option(addFile.tags).getOrElse(Map.empty[String, String]) +
526+
(AddFile.Tags.ICEBERG_COMPAT_VERSION.name -> enabledCompat.get.version.toString)
527+
)
525528
}
526529
}
527530

0 commit comments

Comments
 (0)