Skip to content

Commit 01e6305

Browse files
maropudongjoon-hyun
authored andcommitted
[SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache
## What changes were proposed in this pull request? This is a follow-up of apache#24047; to follow the `CacheManager.cachedData` lock semantics, this pr wrapped the `statsOfPlanToCache` update with `synchronized`. ## How was this patch tested? Pass Jenkins Closes apache#24178 from maropu/SPARK-24047-FOLLOWUP. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6242885 commit 01e6305

File tree

3 files changed

+39
-26
lines changed

3 files changed

+39
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import java.util.concurrent.locks.ReentrantReadWriteLock
21-
2220
import scala.collection.immutable.IndexedSeq
2321

2422
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -163,12 +161,7 @@ class CacheManager extends Logging {
163161
val relation = cachedData.cachedRepresentation
164162
val (rowCount, newColStats) =
165163
CommandUtils.computeColumnStats(sparkSession, relation, column)
166-
val oldStats = relation.statsOfPlanToCache
167-
val newStats = oldStats.copy(
168-
rowCount = Some(rowCount),
169-
attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
170-
)
171-
relation.statsOfPlanToCache = newStats
164+
relation.updateStats(rowCount, newColStats)
172165
}
173166

174167
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.QueryPlan
2828
import org.apache.spark.sql.catalyst.plans.logical
29-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
29+
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
3030
import org.apache.spark.sql.catalyst.util.truncatedString
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.storage.StorageLevel
@@ -145,37 +145,60 @@ object InMemoryRelation {
145145
tableName: Option[String],
146146
logicalPlan: LogicalPlan): InMemoryRelation = {
147147
val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)
148-
new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)(
149-
statsOfPlanToCache = logicalPlan.stats)
148+
val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)
149+
relation.statsOfPlanToCache = logicalPlan.stats
150+
relation
150151
}
151152

152153
def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
153-
new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)(
154-
statsOfPlanToCache = logicalPlan.stats)
154+
val relation = new InMemoryRelation(
155+
cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)
156+
relation.statsOfPlanToCache = logicalPlan.stats
157+
relation
158+
}
159+
160+
def apply(
161+
output: Seq[Attribute],
162+
cacheBuilder: CachedRDDBuilder,
163+
outputOrdering: Seq[SortOrder],
164+
statsOfPlanToCache: Statistics): InMemoryRelation = {
165+
val relation = InMemoryRelation(output, cacheBuilder, outputOrdering)
166+
relation.statsOfPlanToCache = statsOfPlanToCache
167+
relation
155168
}
156169
}
157170

158171
case class InMemoryRelation(
159172
output: Seq[Attribute],
160173
@transient cacheBuilder: CachedRDDBuilder,
161-
override val outputOrdering: Seq[SortOrder])(
162-
@volatile var statsOfPlanToCache: Statistics)
174+
override val outputOrdering: Seq[SortOrder])
163175
extends logical.LeafNode with MultiInstanceRelation {
164176

177+
@volatile var statsOfPlanToCache: Statistics = null
178+
165179
override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
166180

167181
override def doCanonicalize(): logical.LogicalPlan =
168182
copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)),
169183
cacheBuilder,
170-
outputOrdering)(
171-
statsOfPlanToCache)
184+
outputOrdering)
172185

173186
override def producedAttributes: AttributeSet = outputSet
174187

175188
@transient val partitionStatistics = new PartitionStatistics(output)
176189

177190
def cachedPlan: SparkPlan = cacheBuilder.cachedPlan
178191

192+
private[sql] def updateStats(
193+
rowCount: Long,
194+
newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized {
195+
val newStats = statsOfPlanToCache.copy(
196+
rowCount = Some(rowCount),
197+
attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ newColStats).toSeq)
198+
)
199+
statsOfPlanToCache = newStats
200+
}
201+
179202
override def computeStats(): Statistics = {
180203
if (cacheBuilder.sizeInBytesStats.value == 0L) {
181204
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
@@ -185,20 +208,17 @@ case class InMemoryRelation(
185208
}
186209
}
187210

188-
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
189-
InMemoryRelation(newOutput, cacheBuilder, outputOrdering)(statsOfPlanToCache)
190-
}
211+
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
212+
InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)
191213

192214
override def newInstance(): this.type = {
193-
new InMemoryRelation(
215+
InMemoryRelation(
194216
output.map(_.newInstance()),
195217
cacheBuilder,
196-
outputOrdering)(
197-
statsOfPlanToCache).asInstanceOf[this.type]
218+
outputOrdering,
219+
statsOfPlanToCache).asInstanceOf[this.type]
198220
}
199221

200-
override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
201-
202222
override def simpleString(maxFields: Int): String =
203223
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
204224
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
488488
test("SPARK-25727 - otherCopyArgs in InMemoryRelation does not include outputOrdering") {
489489
val data = Seq(100).toDF("count").cache()
490490
val json = data.queryExecution.optimizedPlan.toJSON
491-
assert(json.contains("outputOrdering") && json.contains("statsOfPlanToCache"))
491+
assert(json.contains("outputOrdering"))
492492
}
493493

494494
test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {

0 commit comments

Comments
 (0)