Skip to content

Commit b9a6f74

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-24613][SQL] Cache with UDF could not be matched with subsequent dependent caches
## What changes were proposed in this pull request? Wrap the logical plan with a `AnalysisBarrier` for execution plan compilation in CacheManager, in order to avoid the plan being analyzed again. ## How was this patch tested? Add one test in `DatasetCacheSuite` Author: Maryann Xue <[email protected]> Closes apache#21602 from maryannxue/cache-mismatch.
1 parent c8e909c commit b9a6f74

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Dataset, SparkSession}
2828
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
29-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
29+
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, LogicalPlan, ResolvedHint}
3030
import org.apache.spark.sql.execution.columnar.InMemoryRelation
3131
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3232
import org.apache.spark.storage.StorageLevel
@@ -97,7 +97,7 @@ class CacheManager extends Logging {
9797
val inMemoryRelation = InMemoryRelation(
9898
sparkSession.sessionState.conf.useCompression,
9999
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
100-
sparkSession.sessionState.executePlan(planToCache).executedPlan,
100+
sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
101101
tableName,
102102
planToCache)
103103
cachedData.add(CachedData(planToCache, inMemoryRelation))
@@ -142,7 +142,7 @@ class CacheManager extends Logging {
142142
// Remove the cache entry before we create a new one, so that we can have a different
143143
// physical plan.
144144
it.remove()
145-
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
145+
val plan = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
146146
val newCache = InMemoryRelation(
147147
cacheBuilder = cd.cachedRepresentation
148148
.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql
2020
import org.scalatest.concurrent.TimeLimits
2121
import org.scalatest.time.SpanSugar._
2222

23+
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
2324
import org.apache.spark.sql.functions._
2425
import org.apache.spark.sql.test.SharedSQLContext
2526
import org.apache.spark.storage.StorageLevel
@@ -132,4 +133,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
132133
df.unpersist()
133134
assert(df.storageLevel == StorageLevel.NONE)
134135
}
136+
137+
test("SPARK-24613 Cache with UDF could not be matched with subsequent dependent caches") {
138+
val udf1 = udf({x: Int => x + 1})
139+
val df = spark.range(0, 10).toDF("a").withColumn("b", udf1($"a"))
140+
val df2 = df.agg(sum(df("b")))
141+
142+
df.cache()
143+
df.count()
144+
df2.cache()
145+
146+
val plan = df2.queryExecution.withCachedData
147+
assert(plan.isInstanceOf[InMemoryRelation])
148+
val internalPlan = plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
149+
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
150+
}
135151
}

0 commit comments

Comments
 (0)