Skip to content

Commit b9b7ee9

Browse files
authored
[Spark] Fix DomainMetadata handling for REPLACE TABLE concurrent with a transaction that adds a new domain (delta-io#4712)
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description If the winning transaction adds a new DomainMetadata for the first time in a table's history concurrent to a REPLACE TABLE operation, the REPLACE TABLE will incorrectly not mark this DomainMetadata as removed. This PR fixes this issue by marking any DomainMetadata added by the winning transaction as removed if they appear in the list of DomainMetadata to be removed and the domain does not appear in the DomainMetadata added by the REPLACE TABLE. ## How was this patch tested? N/A - no DomainMetadata currently exist that fall into this category.
1 parent aa6bc12 commit b9b7ee9

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,11 @@ private[delta] class ConflictChecker(
677677
}
678678
}
679679

680+
private lazy val currentTransactionIsReplaceTable: Boolean = currentTransactionInfo.op match {
681+
case _: DeltaOperations.ReplaceTable => true
682+
case _ => false
683+
}
684+
680685
/**
681686
* Checks [[DomainMetadata]] to capture whether the current transaction conflicts with the
682687
* winning transaction at any domain.
@@ -721,9 +726,26 @@ private[delta] class ConflictChecker(
721726
case other => other
722727
}
723728

729+
730+
// For the REPLACE TABLE command, if domain metadata of a given domain is added for the first
731+
// time by the winning transaction, it may need to be marked as removed.
732+
val replaceTableRemoveNewDomainMetadataEnabled = spark.conf.get(
733+
DeltaSQLConf.DELTA_CONFLICT_DETECTION_ALLOW_REPLACE_TABLE_TO_REMOVE_NEW_DOMAIN_METADATA)
734+
val (finalUpdatedActions, finalMergedDomainMetadata) =
735+
if (replaceTableRemoveNewDomainMetadataEnabled && currentTransactionIsReplaceTable) {
736+
val (domainMetadataActions, nonDomainMetadataActions) =
737+
currentTransactionInfo.actions.partition(_.isInstanceOf[DomainMetadata])
738+
val updatedDomainMetadataActions = DomainMetadataUtils.handleDomainMetadataForReplaceTable(
739+
winningDomainMetadataMap.values.toSeq,
740+
domainMetadataActions.map(_.asInstanceOf[DomainMetadata]))
741+
((nonDomainMetadataActions ++ updatedDomainMetadataActions), updatedDomainMetadataActions)
742+
} else {
743+
(updatedActions, mergedDomainMetadata)
744+
}
745+
724746
currentTransactionInfo = currentTransactionInfo.copy(
725-
domainMetadata = mergedDomainMetadata.toSeq,
726-
actions = updatedActions)
747+
domainMetadata = finalMergedDomainMetadata.toSeq,
748+
actions = finalUpdatedActions)
727749
}
728750

729751
/**

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,16 @@ trait DeltaSQLConfBase {
16861686
.checkValues(NonDeterministicPredicateWidening.list)
16871687
.createWithDefault(NonDeterministicPredicateWidening.ON)
16881688

1689+
val DELTA_CONFLICT_DETECTION_ALLOW_REPLACE_TABLE_TO_REMOVE_NEW_DOMAIN_METADATA =
1690+
buildConf("conflictDetection.allowReplaceTableToRemoveNewDomainMetadata")
1691+
.doc("Whether to allow removing new domain metadatas from concurrent transactions during " +
1692+
"conflict resolution for a REPLACE TABLE operation. Note that this flag applies only " +
1693+
"to metadata domains where the table snapshot read by the REPLACE TABLE command did " +
1694+
"not contain a domain metadata of the same domain.")
1695+
.internal()
1696+
.booleanConf
1697+
.createWithDefault(true)
1698+
16891699
val DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED =
16901700
buildConf("uniform.iceberg.sync.convert.enabled")
16911701
.doc("If enabled, iceberg conversion will be done synchronously. " +

0 commit comments

Comments
 (0)