Skip to content

Commit ab7346f

Browse files
CodingCatcloud-fan
authored andcommitted
[SPARK-22673][SQL] InMemoryRelation should utilize existing stats whenever possible
## What changes were proposed in this pull request? The current implementation of InMemoryRelation always uses the most expensive execution plan when writing cache With CBO enabled, we can actually have a more exact estimation of the underlying table size... ## How was this patch tested? existing test Author: CodingCat <[email protected]> Author: Nan Zhu <[email protected]> Author: Nan Zhu <[email protected]> Closes #19864 from CodingCat/SPARK-22673.
1 parent d4e6959 commit ab7346f

File tree

3 files changed

+75
-27
lines changed

3 files changed

+75
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Dataset, SparkSession}
2828
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
29-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
3030
import org.apache.spark.sql.execution.columnar.InMemoryRelation
3131
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3232
import org.apache.spark.storage.StorageLevel
@@ -94,14 +94,13 @@ class CacheManager extends Logging {
9494
logWarning("Asked to cache already cached data.")
9595
} else {
9696
val sparkSession = query.sparkSession
97-
cachedData.add(CachedData(
98-
planToCache,
99-
InMemoryRelation(
100-
sparkSession.sessionState.conf.useCompression,
101-
sparkSession.sessionState.conf.columnBatchSize,
102-
storageLevel,
103-
sparkSession.sessionState.executePlan(planToCache).executedPlan,
104-
tableName)))
97+
val inMemoryRelation = InMemoryRelation(
98+
sparkSession.sessionState.conf.useCompression,
99+
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
100+
sparkSession.sessionState.executePlan(planToCache).executedPlan,
101+
tableName,
102+
planToCache.stats)
103+
cachedData.add(CachedData(planToCache, inMemoryRelation))
105104
}
106105
}
107106

@@ -148,7 +147,8 @@ class CacheManager extends Logging {
148147
batchSize = cd.cachedRepresentation.batchSize,
149148
storageLevel = cd.cachedRepresentation.storageLevel,
150149
child = spark.sessionState.executePlan(cd.plan).executedPlan,
151-
tableName = cd.cachedRepresentation.tableName)
150+
tableName = cd.cachedRepresentation.tableName,
151+
statsOfPlanToCache = cd.plan.stats)
152152
needToRecache += cd.copy(cachedRepresentation = newCache)
153153
}
154154
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ object InMemoryRelation {
3737
batchSize: Int,
3838
storageLevel: StorageLevel,
3939
child: SparkPlan,
40-
tableName: Option[String]): InMemoryRelation =
41-
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
40+
tableName: Option[String],
41+
statsOfPlanToCache: Statistics): InMemoryRelation =
42+
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
43+
statsOfPlanToCache = statsOfPlanToCache)
4244
}
4345

4446

@@ -60,7 +62,8 @@ case class InMemoryRelation(
6062
@transient child: SparkPlan,
6163
tableName: Option[String])(
6264
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
63-
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
65+
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
66+
statsOfPlanToCache: Statistics = null)
6467
extends logical.LeafNode with MultiInstanceRelation {
6568

6669
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -71,9 +74,8 @@ case class InMemoryRelation(
7174

7275
override def computeStats(): Statistics = {
7376
if (batchStats.value == 0L) {
74-
// Underlying columnar RDD hasn't been materialized, no useful statistics information
75-
// available, return the default statistics.
76-
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
77+
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
78+
statsOfPlanToCache
7779
} else {
7880
Statistics(sizeInBytes = batchStats.value.longValue)
7981
}
@@ -142,7 +144,7 @@ case class InMemoryRelation(
142144
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
143145
InMemoryRelation(
144146
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
145-
_cachedColumnBuffers, batchStats)
147+
_cachedColumnBuffers, batchStats, statsOfPlanToCache)
146148
}
147149

148150
override def newInstance(): this.type = {
@@ -154,11 +156,12 @@ case class InMemoryRelation(
154156
child,
155157
tableName)(
156158
_cachedColumnBuffers,
157-
batchStats).asInstanceOf[this.type]
159+
batchStats,
160+
statsOfPlanToCache).asInstanceOf[this.type]
158161
}
159162

160163
def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers
161164

162165
override protected def otherCopyArgs: Seq[AnyRef] =
163-
Seq(_cachedColumnBuffers, batchStats)
166+
Seq(_cachedColumnBuffers, batchStats, statsOfPlanToCache)
164167
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
3030
import org.apache.spark.sql.test.SQLTestData._
3131
import org.apache.spark.sql.types._
3232
import org.apache.spark.storage.StorageLevel._
33+
import org.apache.spark.util.Utils
3334

3435
class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
3536
import testImplicits._
@@ -40,7 +41,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
4041
data.createOrReplaceTempView(s"testData$dataType")
4142
val storageLevel = MEMORY_ONLY
4243
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
43-
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None)
44+
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None,
45+
data.logicalPlan.stats)
4446

4547
assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel)
4648
inMemoryRelation.cachedColumnBuffers.collect().head match {
@@ -116,7 +118,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
116118

117119
test("simple columnar query") {
118120
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
119-
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
121+
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
122+
testData.logicalPlan.stats)
120123

121124
checkAnswer(scan, testData.collect().toSeq)
122125
}
@@ -132,8 +135,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
132135
}
133136

134137
test("projection") {
135-
val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
136-
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
138+
val logicalPlan = testData.select('value, 'key).logicalPlan
139+
val plan = spark.sessionState.executePlan(logicalPlan).sparkPlan
140+
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
141+
logicalPlan.stats)
137142

138143
checkAnswer(scan, testData.collect().map {
139144
case Row(key: Int, value: String) => value -> key
@@ -149,7 +154,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
149154

150155
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
151156
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
152-
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
157+
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None,
158+
testData.logicalPlan.stats)
153159

154160
checkAnswer(scan, testData.collect().toSeq)
155161
checkAnswer(scan, testData.collect().toSeq)
@@ -323,7 +329,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
323329
test("SPARK-17549: cached table size should be correctly calculated") {
324330
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
325331
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
326-
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
332+
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan.stats)
327333

328334
// Materialize the data.
329335
val expectedAnswer = data.collect()
@@ -448,8 +454,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
448454

449455
test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") {
450456
val attribute = AttributeReference("a", IntegerType)()
451-
val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY,
452-
LocalTableScanExec(Seq(attribute), Nil), None)
457+
val localTableScanExec = LocalTableScanExec(Seq(attribute), Nil)
458+
val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, localTableScanExec, None, null)
453459
val tableScanExec = InMemoryTableScanExec(Seq(attribute),
454460
Seq(In(attribute, Nil)), testRelation)
455461
assert(tableScanExec.partitionFilters.isEmpty)
@@ -479,4 +485,43 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
479485
}
480486
}
481487
}
488+
489+
test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
490+
withSQLConf("spark.sql.cbo.enabled" -> "true") {
491+
withTempPath { workDir =>
492+
withTable("table1") {
493+
val workDirPath = workDir.getAbsolutePath
494+
val data = Seq(100, 200, 300, 400).toDF("count")
495+
data.write.parquet(workDirPath)
496+
val dfFromFile = spark.read.parquet(workDirPath).cache()
497+
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
498+
case plan: InMemoryRelation => plan
499+
}.head
500+
// InMemoryRelation's stats is file size before the underlying RDD is materialized
501+
assert(inMemoryRelation.computeStats().sizeInBytes === 740)
502+
503+
// InMemoryRelation's stats is updated after materializing RDD
504+
dfFromFile.collect()
505+
assert(inMemoryRelation.computeStats().sizeInBytes === 16)
506+
507+
// test of catalog table
508+
val dfFromTable = spark.catalog.createTable("table1", workDirPath).cache()
509+
val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
510+
collect { case plan: InMemoryRelation => plan }.head
511+
512+
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
513+
// is calculated
514+
assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
515+
516+
// InMemoryRelation's stats should be updated after calculating stats of the table
517+
// clear cache to simulate a fresh environment
518+
dfFromTable.unpersist(blocking = true)
519+
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
520+
val inMemoryRelation3 = spark.read.table("table1").cache().queryExecution.optimizedPlan.
521+
collect { case plan: InMemoryRelation => plan }.head
522+
assert(inMemoryRelation3.computeStats().sizeInBytes === 48)
523+
}
524+
}
525+
}
526+
}
482527
}

0 commit comments

Comments
 (0)