Skip to content

Commit 7e2a01c

Browse files
authored
[Spark] Materialize cached Delta sources in MERGE (delta-io#4629)
## Description Fixes a source of non-determinism in MERGE due to query caching. Query caching doesn't pin the version of Delta tables, a source cached using `df.cache()` can then return different results between the two internal MERGE jobs if it is updated in the meantime. Guarded by flag `spark.databricks.delta.merge.materializeCachedSource`. ## How was this patch tested? This is tough to cover with a test since it requires precise timing to update the source while the MERGE operation is running. I did validate that the fix works manually, running with a debugger. ## Does this PR introduce _any_ user-facing changes? No
1 parent c06ba36 commit 7e2a01c

File tree

7 files changed

+78
-1
lines changed

7 files changed

+78
-1
lines changed

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,6 +1689,12 @@
16891689
],
16901690
"sqlState" : "42601"
16911691
},
1692+
"DELTA_MERGE_SOURCE_CACHED_DURING_EXECUTION" : {
1693+
"message" : [
1694+
"The MERGE operation failed because (part of) the source plan was cached while the MERGE operation was running."
1695+
],
1696+
"sqlState" : "25000"
1697+
},
16921698
"DELTA_MERGE_UNEXPECTED_ASSIGNMENT_KEY" : {
16931699
"message" : [
16941700
"Unexpected assignment key: <unexpectedKeyClass> - <unexpectedKeyObject>"

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,9 @@ trait DeltaErrorsBase
13251325
)
13261326
}
13271327

1328+
def mergeConcurrentOperationCachedSourceException(): Throwable =
1329+
new DeltaRuntimeException(errorClass = "DELTA_MERGE_SOURCE_CACHED_DURING_EXECUTION")
1330+
13281331
def columnOfTargetTableNotFoundInMergeException(targetCol: String,
13291332
colNames: String): Throwable = {
13301333
new DeltaAnalysisException(

spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,27 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
481481
}
482482
}
483483

484+
/**
485+
* Check whether (part of) the give source dataframe is cached and logs an assertion or fails if
486+
* it is. Query caching doesn't pin versions of delta tables and can lead to incorrect results so
487+
* cached source plans must be materialized.
488+
*/
489+
def checkSourcePlanIsNotCached(spark: SparkSession, source: LogicalPlan): Unit = {
490+
val sourceIsCached = planContainsCachedRelation(DataFrameUtils.ofRows(spark, source))
491+
if (sourceIsCached &&
492+
spark.conf.get(DeltaSQLConf.MERGE_FAIL_SOURCE_CACHED_AFTER_MATERIALIZATION)) {
493+
throw DeltaErrors.mergeConcurrentOperationCachedSourceException()
494+
}
495+
496+
deltaAssert(
497+
!sourceIsCached,
498+
name = "merge.sourceCachedAfterMaterializationStep",
499+
msg = "Cached source plans must be materialized in MERGE but the source only got cached " +
500+
"after the decision to materialize was taken.",
501+
deltaLog = targetDeltaLog
502+
)
503+
}
504+
484505
override protected def prepareMergeSource(
485506
spark: SparkSession,
486507
source: LogicalPlan,

spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
165165
.collect()
166166
.head
167167

168+
checkSourcePlanIsNotCached(spark, getMergeSource.df.queryExecution.logical)
169+
168170
val hasMultipleMatches = multipleMatchCount > 0
169171
throwErrorOnMultipleMatches(hasMultipleMatches, spark)
170172
if (hasMultipleMatches) {
@@ -465,6 +467,8 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
465467
// Write to Delta
466468
val newFiles = writeFiles(spark, deltaTxn, outputDF)
467469

470+
checkSourcePlanIsNotCached(spark, getMergeSource.df.queryExecution.logical)
471+
468472
// Update metrics
469473
val (addedBytes, addedPartitions) = totalBytesAndDistinctPartitionValues(newFiles)
470474
metrics("numTargetFilesAdded") += newFiles.count(_.isInstanceOf[AddFile])

spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
246246
import DeltaSQLConf.MergeMaterializeSource._
247247
val checkDeterministicOptions =
248248
DeltaSparkPlanUtils.CheckDeterministicOptions(allowDeterministicUdf = true)
249+
val materializeCachedSource = spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_CACHED_SOURCE)
250+
249251
materializeType match {
250252
case ALL =>
251253
(true, MergeIntoMaterializeSourceReason.MATERIALIZE_ALL)
@@ -270,6 +272,12 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
270272
// the user defined function is marked as deterministic, as it is often incorrectly marked
271273
// as such.
272274
(true, MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF)
275+
} else if (materializeCachedSource &&
276+
planContainsCachedRelation(DataFrameUtils.ofRows(spark, source))) {
277+
// The query cache doesn't pin the version of cached Delta tables, cache can get
278+
// concurrently updated in the middle of MERGE execution. We materialize the source in
279+
// that case to avoid this issue.
280+
(true, MergeIntoMaterializeSourceReason.SOURCE_CACHED)
273281
} else {
274282
(false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO)
275283
}
@@ -460,6 +468,8 @@ object MergeIntoMaterializeSourceReason extends Enumeration {
460468
Value("materializeNonDeterministicSourceWithDeterministicUdf")
461469
// Materialize when the configuration is invalid
462470
val INVALID_CONFIG = Value("invalidConfigurationFailsafe")
471+
// Materialize when the source is cached.
472+
val SOURCE_CACHED = Value("materializeCachedSource")
463473
// Catch-all case.
464474
val UNKNOWN = Value("unknown")
465475

@@ -470,7 +480,8 @@ object MergeIntoMaterializeSourceReason extends Enumeration {
470480
NON_DETERMINISTIC_SOURCE_OPERATORS,
471481
IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET,
472482
NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF,
473-
INVALID_CONFIG
483+
INVALID_CONFIG,
484+
SOURCE_CACHED
474485
)
475486
}
476487

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,32 @@ trait DeltaSQLConfBase {
861861
.booleanConf
862862
.createWithDefault(true)
863863

864+
val MERGE_MATERIALIZE_CACHED_SOURCE =
865+
buildConf("merge.materializeCachedSource")
866+
.internal()
867+
.doc(
868+
"""
869+
|When enabled, materialize the source in MERGE if it is cached (e.g. via df.cache()). This
870+
|prevents incorrect results due to query caching not pinning the version of cached Delta
871+
|tables.
872+
|""".stripMargin)
873+
.booleanConf
874+
.createWithDefault(true)
875+
876+
val MERGE_FAIL_SOURCE_CACHED_AFTER_MATERIALIZATION =
877+
buildConf("merge.failSourceCachedAfterMaterialization")
878+
.internal()
879+
.doc(
880+
"""
881+
|Enables a check that fails the MERGE operation if the source was cached (using
882+
|df.cache()) after the source materialization phase. Query caching doesn't pin the version
883+
|of Delta tables and we should materialize cached source plans. In rare cases, the source
884+
|might get cached after the decision to materialize, which could lead to incorrect results
885+
|if we let the operation succeed.
886+
|""".stripMargin)
887+
.booleanConf
888+
.createWithDefault(true)
889+
864890
val MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL =
865891
buildConf("merge.materializeSource.rddStorageLevel")
866892
.internal()

spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaSparkPlanUtils.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package org.apache.spark.sql.delta.util
1818

1919
import org.apache.spark.sql.delta.{DeltaTable, DeltaTableReadPredicate}
2020

21+
import org.apache.spark.sql.DataFrame
2122
import org.apache.spark.sql.catalyst.expressions.{Exists, Expression, InSubquery, LateralSubquery, ScalarSubquery, SubqueryExpression => SparkSubqueryExpression, UserDefinedExpression}
2223
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, LeafNode, LogicalPlan, OneRowRelation, Project, SubqueryAlias, Union}
24+
import org.apache.spark.sql.execution.columnar.InMemoryRelation
2325
import org.apache.spark.sql.execution.datasources.LogicalRelation
2426

2527

@@ -45,6 +47,10 @@ trait DeltaSparkPlanUtils {
4547
)
4648
}
4749

50+
/** Returns whether part of the plan was cached using df.cache() or similar. */
51+
protected def planContainsCachedRelation(df: DataFrame): Boolean =
52+
df.queryExecution.withCachedData.exists(_.isInstanceOf[InMemoryRelation])
53+
4854
/**
4955
* Returns `true` if `plan` has a safe level of determinism. This is a conservative
5056
* approximation of `plan` being a truly deterministic query.

0 commit comments

Comments
 (0)