@@ -160,46 +160,44 @@ class CacheManager extends Logging {
160
160
}
161
161
// Re-compile dependent cached queries after removing the cached query.
162
162
if (! cascade) {
163
- recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false )
163
+ recacheByCondition(spark, cd => {
164
+ // If the cache buffer has already been loaded, we don't need to recompile the cached plan,
165
+ // as it does not rely on the plan that has been uncached anymore, it will just produce
166
+ // data from the cache buffer.
167
+ // Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking
168
+ // status test and may not return the most accurate cache buffer state. So the worse case
169
+ // scenario can be:
170
+ // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
171
+ // will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect
172
+ // correctness.
173
+ // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
174
+ // will keep it as it is. It means the physical plan has been re-compiled already in the
175
+ // other thread.
176
+ val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
177
+ cd.plan.find(_.sameResult(plan)).isDefined && ! cacheAlreadyLoaded
178
+ })
164
179
}
165
180
}
166
181
167
182
/**
168
183
* Tries to re-cache all the cache entries that refer to the given plan.
169
184
*/
170
185
def recacheByPlan (spark : SparkSession , plan : LogicalPlan ): Unit = {
171
- recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
186
+ recacheByCondition(spark, _.plan. find(_.sameResult(plan)).isDefined)
172
187
}
173
188
189
+ /**
190
+ * Re-caches all the cache entries that satisfies the given `condition`.
191
+ */
174
192
private def recacheByCondition (
175
193
spark : SparkSession ,
176
- condition : LogicalPlan => Boolean ,
177
- clearCache : Boolean = true ): Unit = {
194
+ condition : CachedData => Boolean ): Unit = {
178
195
val needToRecache = scala.collection.mutable.ArrayBuffer .empty[CachedData ]
179
196
writeLock {
180
197
val it = cachedData.iterator()
181
198
while (it.hasNext) {
182
199
val cd = it.next()
183
- // If `clearCache` is false (which means the recache request comes from a non-cascading
184
- // cache invalidation) and the cache buffer has already been loaded, we do not need to
185
- // re-compile a physical plan because the old plan will not be used any more by the
186
- // CacheManager although it still lives in compiled `Dataset`s and it could still work.
187
- // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
188
- // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
189
- // buffer is still empty, then we could have a more efficient new plan by removing
190
- // dependency on the previously removed cache entries.
191
- // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
192
- // status test and may not return the most accurate cache buffer state. So the worse case
193
- // scenario can be:
194
- // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
195
- // will clear the buffer and build a new plan. It is inefficient but doesn't affect
196
- // correctness.
197
- // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
198
- // will keep it as it is. It means the physical plan has been re-compiled already in the
199
- // other thread.
200
- val buildNewPlan =
201
- clearCache || ! cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
202
- if (condition(cd.plan) && buildNewPlan) {
200
+ if (condition(cd)) {
203
201
needToRecache += cd
204
202
// Remove the cache entry before we create a new one, so that we can have a different
205
203
// physical plan.
@@ -267,7 +265,7 @@ class CacheManager extends Logging {
267
265
(fs, fs.makeQualified(path))
268
266
}
269
267
270
- recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
268
+ recacheByCondition(spark, _.plan. find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
271
269
}
272
270
273
271
/**
0 commit comments