Skip to content

Commit bac50aa

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-24596][SQL] Non-cascading Cache Invalidation
## What changes were proposed in this pull request? 1. Add parameter 'cascade' in CacheManager.uncacheQuery(). Under 'cascade=false' mode, only invalidate the current cache, and for other dependent caches, rebuild execution plan and reuse cached buffer. 2. Pass true/false from callers in different uncache scenarios: - Drop tables and regular (persistent) views: regular mode - Drop temporary views: non-cascading mode - Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode - Call `DataSet.unpersist()`: non-cascading mode - Call `Catalog.uncacheTable()`: follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the rest Note that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets. ## How was this patch tested? New tests in CachedTableSuite and DatasetCacheSuite. Author: Maryann Xue <[email protected]> Closes apache#21594 from maryannxue/noncascading-cache.
1 parent 8ab8ef7 commit bac50aa

File tree

9 files changed

+197
-26
lines changed

9 files changed

+197
-26
lines changed

docs/sql-programming-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,6 +1827,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18271827
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
18281828
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
18291829
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
1830+
- Since Spark 2.4, Spark has enabled non-cascading SQL cache invalidation in addition to the traditional cache invalidation mechanism. The non-cascading cache invalidation mechanism allows users to remove a cache without impacting its dependent caches. This new cache invalidation mechanism is used in scenarios where the data of the cache to be removed is still valid, e.g., calling unpersist() on a Dataset, or dropping a temporary view. This allows users to free up memory and keep the desired caches valid at the same time.
18301831
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
18311832
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
18321833
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2964,19 +2964,21 @@ class Dataset[T] private[sql](
29642964

29652965
/**
29662966
* Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
2967+
* This will not un-persist any cached data that is built upon this Dataset.
29672968
*
29682969
* @param blocking Whether to block until all blocks are deleted.
29692970
*
29702971
* @group basic
29712972
* @since 1.6.0
29722973
*/
29732974
def unpersist(blocking: Boolean): this.type = {
2974-
sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
2975+
sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, blocking)
29752976
this
29762977
}
29772978

29782979
/**
29792980
* Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
2981+
* This will not un-persist any cached data that is built upon this Dataset.
29802982
*
29812983
* @group basic
29822984
* @since 1.6.0

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,50 @@ class CacheManager extends Logging {
105105
}
106106

107107
/**
108-
* Un-cache all the cache entries that refer to the given plan.
108+
* Un-cache the given plan or all the cache entries that refer to the given plan.
109+
* @param query The [[Dataset]] to be un-cached.
110+
* @param cascade If true, un-cache all the cache entries that refer to the given
111+
* [[Dataset]]; otherwise un-cache the given [[Dataset]] only.
112+
* @param blocking Whether to block until all blocks are deleted.
109113
*/
110-
def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
111-
uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
114+
def uncacheQuery(
115+
query: Dataset[_],
116+
cascade: Boolean,
117+
blocking: Boolean = true): Unit = writeLock {
118+
uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
112119
}
113120

114121
/**
115-
* Un-cache all the cache entries that refer to the given plan.
122+
* Un-cache the given plan or all the cache entries that refer to the given plan.
123+
* @param spark The Spark session.
124+
* @param plan The plan to be un-cached.
125+
* @param cascade If true, un-cache all the cache entries that refer to the given
126+
* plan; otherwise un-cache the given plan only.
127+
* @param blocking Whether to block until all blocks are deleted.
116128
*/
117-
def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock {
129+
def uncacheQuery(
130+
spark: SparkSession,
131+
plan: LogicalPlan,
132+
cascade: Boolean,
133+
blocking: Boolean): Unit = writeLock {
134+
val shouldRemove: LogicalPlan => Boolean =
135+
if (cascade) {
136+
_.find(_.sameResult(plan)).isDefined
137+
} else {
138+
_.sameResult(plan)
139+
}
118140
val it = cachedData.iterator()
119141
while (it.hasNext) {
120142
val cd = it.next()
121-
if (cd.plan.find(_.sameResult(plan)).isDefined) {
143+
if (shouldRemove(cd.plan)) {
122144
cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
123145
it.remove()
124146
}
125147
}
148+
// Re-compile dependent cached queries after removing the cached query.
149+
if (!cascade) {
150+
recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false)
151+
}
126152
}
127153

128154
/**
@@ -132,20 +158,24 @@ class CacheManager extends Logging {
132158
recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
133159
}
134160

135-
private def recacheByCondition(spark: SparkSession, condition: LogicalPlan => Boolean): Unit = {
161+
private def recacheByCondition(
162+
spark: SparkSession,
163+
condition: LogicalPlan => Boolean,
164+
clearCache: Boolean = true): Unit = {
136165
val it = cachedData.iterator()
137166
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
138167
while (it.hasNext) {
139168
val cd = it.next()
140169
if (condition(cd.plan)) {
141-
cd.cachedRepresentation.cacheBuilder.clearCache()
170+
if (clearCache) {
171+
cd.cachedRepresentation.cacheBuilder.clearCache()
172+
}
142173
// Remove the cache entry before we create a new one, so that we can have a different
143174
// physical plan.
144175
it.remove()
145176
val plan = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
146177
val newCache = InMemoryRelation(
147-
cacheBuilder = cd.cachedRepresentation
148-
.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
178+
cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
149179
logicalPlan = cd.plan)
150180
needToRecache += cd.copy(cachedRepresentation = newCache)
151181
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ case class CachedRDDBuilder(
7474
}
7575
}
7676

77+
def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
78+
new CachedRDDBuilder(
79+
useCompression,
80+
batchSize,
81+
storageLevel,
82+
cachedPlan = cachedPlan,
83+
tableName
84+
)(_cachedColumnBuffers)
85+
}
86+
7787
private def buildBuffers(): RDD[CachedBatch] = {
7888
val output = cachedPlan.output
7989
val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator =>

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ case class DropTableCommand(
189189

190190
override def run(sparkSession: SparkSession): Seq[Row] = {
191191
val catalog = sparkSession.sessionState.catalog
192+
val isTempView = catalog.isTemporaryTable(tableName)
192193

193-
if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) {
194+
if (!isTempView && catalog.tableExists(tableName)) {
194195
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
195196
// issue an exception.
196197
catalog.getTableMetadata(tableName).tableType match {
@@ -204,9 +205,10 @@ case class DropTableCommand(
204205
}
205206
}
206207

207-
if (catalog.isTemporaryTable(tableName) || catalog.tableExists(tableName)) {
208+
if (isTempView || catalog.tableExists(tableName)) {
208209
try {
209-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
210+
sparkSession.sharedState.cacheManager.uncacheQuery(
211+
sparkSession.table(tableName), cascade = !isTempView)
210212
} catch {
211213
case NonFatal(e) => log.warn(e.toString, e)
212214
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ case class TruncateTableCommand(
493493
spark.sessionState.refreshTable(tableName.unquotedString)
494494
// Also try to drop the contents of the table from the columnar cache
495495
try {
496-
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
496+
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true)
497497
} catch {
498498
case NonFatal(e) =>
499499
log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
364364
*/
365365
override def dropTempView(viewName: String): Boolean = {
366366
sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef =>
367-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, viewDef, blocking = true)
367+
sparkSession.sharedState.cacheManager.uncacheQuery(
368+
sparkSession, viewDef, cascade = false, blocking = true)
368369
sessionCatalog.dropTempView(viewName)
369370
}
370371
}
@@ -379,7 +380,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
379380
*/
380381
override def dropGlobalTempView(viewName: String): Boolean = {
381382
sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef =>
382-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, viewDef, blocking = true)
383+
sparkSession.sharedState.cacheManager.uncacheQuery(
384+
sparkSession, viewDef, cascade = false, blocking = true)
383385
sessionCatalog.dropGlobalTempView(viewName)
384386
}
385387
}
@@ -438,7 +440,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
438440
* @since 2.0.0
439441
*/
440442
override def uncacheTable(tableName: String): Unit = {
441-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
443+
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
444+
val cascade = !sessionCatalog.isTemporaryTable(tableIdent)
445+
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade)
442446
}
443447

444448
/**
@@ -490,7 +494,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
490494
// cached version and make the new version cached lazily.
491495
if (isCached(table)) {
492496
// Uncache the logicalPlan.
493-
sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true)
497+
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, blocking = true)
494498
// Cache it again.
495499
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
496500
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2828
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
2929
import org.apache.spark.sql.execution.columnar._
3030
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
31-
import org.apache.spark.sql.functions._
3231
import org.apache.spark.sql.internal.SQLConf
3332
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3433
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
@@ -801,4 +800,69 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
801800
}
802801
assert(cachedData.collect === Seq(1001))
803802
}
803+
804+
test("SPARK-24596 Non-cascading Cache Invalidation - uncache temporary view") {
805+
withTempView("t1", "t2") {
806+
sql("CACHE TABLE t1 AS SELECT * FROM testData WHERE key > 1")
807+
sql("CACHE TABLE t2 as SELECT * FROM t1 WHERE value > 1")
808+
809+
assert(spark.catalog.isCached("t1"))
810+
assert(spark.catalog.isCached("t2"))
811+
sql("UNCACHE TABLE t1")
812+
assert(!spark.catalog.isCached("t1"))
813+
assert(spark.catalog.isCached("t2"))
814+
}
815+
}
816+
817+
test("SPARK-24596 Non-cascading Cache Invalidation - drop temporary view") {
818+
withTempView("t1", "t2") {
819+
sql("CACHE TABLE t1 AS SELECT * FROM testData WHERE key > 1")
820+
sql("CACHE TABLE t2 as SELECT * FROM t1 WHERE value > 1")
821+
822+
assert(spark.catalog.isCached("t1"))
823+
assert(spark.catalog.isCached("t2"))
824+
sql("DROP VIEW t1")
825+
assert(spark.catalog.isCached("t2"))
826+
}
827+
}
828+
829+
test("SPARK-24596 Non-cascading Cache Invalidation - drop persistent view") {
830+
withTable("t") {
831+
spark.range(1, 10).toDF("key").withColumn("value", 'key * 2)
832+
.write.format("json").saveAsTable("t")
833+
withView("t1") {
834+
withTempView("t2") {
835+
sql("CREATE VIEW t1 AS SELECT * FROM t WHERE key > 1")
836+
837+
sql("CACHE TABLE t1")
838+
sql("CACHE TABLE t2 AS SELECT * FROM t1 WHERE value > 1")
839+
840+
assert(spark.catalog.isCached("t1"))
841+
assert(spark.catalog.isCached("t2"))
842+
sql("DROP VIEW t1")
843+
assert(!spark.catalog.isCached("t2"))
844+
}
845+
}
846+
}
847+
}
848+
849+
test("SPARK-24596 Non-cascading Cache Invalidation - uncache table") {
850+
withTable("t") {
851+
spark.range(1, 10).toDF("key").withColumn("value", 'key * 2)
852+
.write.format("json").saveAsTable("t")
853+
withTempView("t1", "t2") {
854+
sql("CACHE TABLE t")
855+
sql("CACHE TABLE t1 AS SELECT * FROM t WHERE key > 1")
856+
sql("CACHE TABLE t2 AS SELECT * FROM t1 WHERE value > 1")
857+
858+
assert(spark.catalog.isCached("t"))
859+
assert(spark.catalog.isCached("t1"))
860+
assert(spark.catalog.isCached("t2"))
861+
sql("UNCACHE TABLE t")
862+
assert(!spark.catalog.isCached("t"))
863+
assert(!spark.catalog.isCached("t1"))
864+
assert(!spark.catalog.isCached("t2"))
865+
}
866+
}
867+
}
804868
}

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ import org.apache.spark.storage.StorageLevel
2929
class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits {
3030
import testImplicits._
3131

32+
/**
33+
* Asserts that a cached [[Dataset]] will be built using the given number of other cached results.
34+
*/
35+
private def assertCacheDependency(df: DataFrame, numOfCachesDependedUpon: Int = 1): Unit = {
36+
val plan = df.queryExecution.withCachedData
37+
assert(plan.isInstanceOf[InMemoryRelation])
38+
val internalPlan = plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
39+
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).size == numOfCachesDependedUpon)
40+
}
41+
3242
test("get storage level") {
3343
val ds1 = Seq("1", "2").toDS().as("a")
3444
val ds2 = Seq(2, 3).toDS().as("b")
@@ -117,7 +127,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
117127
}
118128

119129
test("cache UDF result correctly") {
120-
val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
130+
val expensiveUDF = udf({x: Int => Thread.sleep(5000); x})
121131
val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
122132
val df2 = df.agg(sum(df("b")))
123133

@@ -126,7 +136,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
126136
assertCached(df2)
127137

128138
// udf has been evaluated during caching, and thus should not be re-evaluated here
129-
failAfter(5 seconds) {
139+
failAfter(3 seconds) {
130140
df2.collect()
131141
}
132142

@@ -143,9 +153,57 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
143153
df.count()
144154
df2.cache()
145155

146-
val plan = df2.queryExecution.withCachedData
147-
assert(plan.isInstanceOf[InMemoryRelation])
148-
val internalPlan = plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
149-
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
156+
assertCacheDependency(df2)
157+
}
158+
159+
test("SPARK-24596 Non-cascading Cache Invalidation") {
160+
val df = Seq(("a", 1), ("b", 2)).toDF("s", "i")
161+
val df2 = df.filter('i > 1)
162+
val df3 = df.filter('i < 2)
163+
164+
df2.cache()
165+
df.cache()
166+
df.count()
167+
df3.cache()
168+
169+
df.unpersist()
170+
171+
// df un-cached; df2 and df3's cache plan re-compiled
172+
assert(df.storageLevel == StorageLevel.NONE)
173+
assertCacheDependency(df2, 0)
174+
assertCacheDependency(df3, 0)
175+
}
176+
177+
test("SPARK-24596 Non-cascading Cache Invalidation - verify cached data reuse") {
178+
val expensiveUDF = udf({ x: Int => Thread.sleep(5000); x })
179+
val df = spark.range(0, 5).toDF("a")
180+
val df1 = df.withColumn("b", expensiveUDF($"a"))
181+
val df2 = df1.groupBy('a).agg(sum('b))
182+
val df3 = df.agg(sum('a))
183+
184+
df1.cache()
185+
df2.cache()
186+
df2.collect()
187+
df3.cache()
188+
189+
assertCacheDependency(df2)
190+
191+
df1.unpersist(blocking = true)
192+
193+
// df1 un-cached; df2's cache plan re-compiled
194+
assert(df1.storageLevel == StorageLevel.NONE)
195+
assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
196+
197+
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
198+
assertCached(df4)
199+
// reuse loaded cache
200+
failAfter(3 seconds) {
201+
checkDataset(df4, Row(10))
202+
}
203+
204+
val df5 = df.agg(sum('a)).filter($"sum(a)" > 1)
205+
assertCached(df5)
206+
// first time use, load cache
207+
checkDataset(df5, Row(10))
150208
}
151209
}

0 commit comments

Comments
 (0)