Skip to content

Commit 2d89954

Browse files
authored
[Spark] Small refactor for the Source Materialization Framework (delta-io#4891)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description In the future, we will need the materialization framework for other commands than MERGE, for now we do some small refactoring in advance for the sake of small PRs. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Existing UTs, just a refactor. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 6a73c17 commit 2d89954

File tree

1 file changed

+43
-16
lines changed

1 file changed

+43
-16
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
7171

7272
import MergeIntoMaterializeSource._
7373

74+
protected def operation: String = "MERGE"
75+
76+
protected def enableColumnPruningBeforeMaterialize: Boolean = true
77+
78+
protected def materializeSourceErrorOpType: String =
79+
MergeIntoMaterializeSourceError.OP_TYPE
80+
81+
protected def getMaterializeSourceMode(spark: SparkSession): String =
82+
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE)
83+
7484
/**
7585
* Prepared Dataframe with source data.
7686
* If needed, it is materialized, @see prepareMergeSource
@@ -97,31 +107,34 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
97107
spark: SparkSession,
98108
deltaLog: DeltaLog,
99109
metrics: Map[String, SQLMetric],
100-
runMergeFunc: SparkSession => Seq[Row]): Seq[Row] = {
110+
runOperationFunc: SparkSession => Seq[Row]): Seq[Row] = {
101111
var doRetry = false
102112
var runResult: Seq[Row] = null
103113
attempt = 1
104114
do {
105115
doRetry = false
106116
metrics.values.foreach(_.reset())
107117
try {
108-
runResult = runMergeFunc(spark)
118+
runResult = runOperationFunc(spark)
109119
} catch {
110120
case NonFatal(ex) =>
111121
val isLastAttempt =
112122
attempt == spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS)
113123
handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match {
114124
case RetryHandling.Retry =>
115-
logInfo(log"Retrying MERGE with materialized source. Attempt " +
116-
log"${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)} failed.")
125+
logInfo(log"Retrying ${MDC(DeltaLogKeys.OPERATION, operation)} " +
126+
log"with materialized source." +
127+
log"Attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)} failed.")
117128
doRetry = true
118129
attempt += 1
119130
case RetryHandling.ExhaustedRetries =>
120131
logError(log"Exhausted retries after ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}" +
121-
log" attempts in MERGE with materialized source. Logging latest exception.", ex)
132+
log" attempts in ${MDC(DeltaLogKeys.OPERATION, operation)} " +
133+
log"with materialized source. Logging latest exception.", ex)
122134
throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge
123135
case RetryHandling.RethrowException =>
124-
logError(log"Fatal error in MERGE with materialized source in " +
136+
logError(log"Fatal error in ${MDC(DeltaLogKeys.OPERATION, operation)} " +
137+
log"with materialized source in " +
125138
log"attempt ${MDC(DeltaLogKeys.NUM_ATTEMPT, attempt)}", ex)
126139
throw ex
127140
}
@@ -165,15 +178,16 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
165178
if materializedSourceRDD.nonEmpty &&
166179
s.getMessage.matches(
167180
mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) =>
168-
log.warn("Materialized Merge source RDD block lost. Merge needs to be restarted. " +
181+
log.warn(log"Materialized ${MDC(DeltaLogKeys.OPERATION, operation)} source RDD block lost. " +
182+
log"${MDC(DeltaLogKeys.OPERATION, operation)} needs to be restarted. " +
169183
s"This was attempt number $attempt.")
170184
if (!isLastAttempt) {
171185
RetryHandling.Retry
172186
} else {
173187
// Record situations where we lost RDD materialized source blocks, despite retries.
174188
recordDeltaEvent(
175189
deltaLog,
176-
MergeIntoMaterializeSourceError.OP_TYPE,
190+
materializeSourceErrorOpType,
177191
data = MergeIntoMaterializeSourceError(
178192
errorType = MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString,
179193
attempt = attempt,
@@ -192,7 +206,7 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
192206
// by the materialized RDD.
193207
recordDeltaEvent(
194208
deltaLog,
195-
MergeIntoMaterializeSourceError.OP_TYPE,
209+
materializeSourceErrorOpType,
196210
data = MergeIntoMaterializeSourceError(
197211
errorType = MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString,
198212
attempt = attempt,
@@ -240,7 +254,7 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
240254
protected def shouldMaterializeSource(
241255
spark: SparkSession, source: LogicalPlan, isInsertOnly: Boolean
242256
): (Boolean, MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) = {
243-
val materializeType = spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE)
257+
val materializeType = getMaterializeSourceMode(spark)
244258
val forceMaterializationWithUnreadableFiles =
245259
spark.conf.get(DeltaSQLConf.MERGE_FORCE_SOURCE_MATERIALIZATION_WITH_UNREADABLE_FILES)
246260
import DeltaSQLConf.MergeMaterializeSource._
@@ -312,11 +326,24 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
312326
return
313327
}
314328

315-
val referencedSourceColumns =
329+
val referencedSourceColumns = if (enableColumnPruningBeforeMaterialize) {
316330
getReferencedSourceColumns(source, condition, matchedClauses, notMatchedClauses)
317-
// When we materialize the source, we want to make sure that columns got pruned before caching.
318-
val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
319-
val baseSourcePlanDF = DataFrameUtils.ofRows(spark, sourceWithSelectedColumns)
331+
} else {
332+
assert(matchedClauses.isEmpty && notMatchedClauses.isEmpty,
333+
"If column pruning is disabled, then there should be no MERGE clauses.")
334+
assert(operation != "MERGE",
335+
"Column pruning before materialization must be done for MERGE.")
336+
source.output
337+
}
338+
339+
val baseSourcePlanDF = if (enableColumnPruningBeforeMaterialize) {
340+
// When we materialize the source, we want to make sure that columns got pruned
341+
// before caching.
342+
val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
343+
DataFrameUtils.ofRows(spark, sourceWithSelectedColumns)
344+
} else {
345+
DataFrameUtils.ofRows(spark, source)
346+
}
320347

321348
// Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage,
322349
// which shall ensure that the source cannot be recomputed and thus become inconsistent.
@@ -372,8 +399,8 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
372399
assert(rdd.isCheckpointed)
373400
}
374401

375-
logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns.")
376-
logDebug(s"Materialized MERGE source plan:\n${getMergeSource.df.queryExecution}")
402+
logDebug(s"Materializing $operation with pruned columns $referencedSourceColumns.")
403+
logDebug(s"Materialized $operation source plan:\n${getMergeSource.df.queryExecution}")
377404
}
378405

379406
/** Returns the prepared merge source. */

0 commit comments

Comments
 (0)