Skip to content

Commit 0627850

Browse files
maropudongjoon-hyun
authored andcommitted
[SPARK-25196][SQL] Extends the analyze column command for cached tables
## What changes were proposed in this pull request? This pr extended `ANALYZE` commands to analyze column stats for cached table. In common use cases, users read catalog table data, join/aggregate them, and then cache the result for following reuse. Since we are only allowed to analyze column statistics in catalog tables via ANALYZE commands, the current optimization depends on non-existing or inaccurate column statistics of cached data. So, it would be great if we could analyze cached data as follows; ```scala scala> def printColumnStats(tableName: String) = { | spark.table(tableName).queryExecution.optimizedPlan.stats.attributeStats.foreach { | case (k, v) => println(s"[$k]: $v") | } | } scala> sql("SET spark.sql.cbo.enabled=true") scala> sql("SET spark.sql.statistics.histogram.enabled=true") scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t") scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2") scala> spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2")).createTempView("temp") // Prints column statistics in catalog table `t` scala> printColumnStats("t") [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;9f7c1c)),2) [c1#7074]: ColumnStat(Some(944),Some(3.2108484832404915E-4),Some(0.997584797423909),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;60a386b1)),2) [c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;5ffd29e8)),2) // Prints column statistics on cached table `temp` scala> sql("CACHE TABLE temp") scala> printColumnStats("temp") <No Column Statistics> // Analyzes columns `v1` and `v2` on cached table `temp` scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS v1, v2") // Then, prints again scala> printColumnStats("temp") [v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2) [v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2) // Analyzes one left column and prints again scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS c0") scala> printColumnStats("temp") [v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2) [v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2) [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;1f5c1b81)),2) ``` ## How was this patch tested? Added tests in `CachedTableSuite` and `StatisticsCollectionSuite`. Closes apache#24047 from maropu/SPARK-25196-4. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 22c9ed6 commit 0627850

File tree

6 files changed

+192
-26
lines changed

6 files changed

+192
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2525

2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Dataset, SparkSession}
28-
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
2929
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
3030
import org.apache.spark.sql.execution.columnar.InMemoryRelation
31+
import org.apache.spark.sql.execution.command.CommandUtils
3132
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3233
import org.apache.spark.storage.StorageLevel
3334
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -154,6 +155,22 @@ class CacheManager extends Logging {
154155
}
155156
}
156157

158+
// Analyzes column statistics in the given cache data
159+
private[sql] def analyzeColumnCacheQuery(
160+
sparkSession: SparkSession,
161+
cachedData: CachedData,
162+
column: Seq[Attribute]): Unit = {
163+
val relation = cachedData.cachedRepresentation
164+
val (rowCount, newColStats) =
165+
CommandUtils.computeColumnStats(sparkSession, relation, column)
166+
val oldStats = relation.statsOfPlanToCache
167+
val newStats = oldStats.copy(
168+
rowCount = Some(rowCount),
169+
attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
170+
)
171+
relation.statsOfPlanToCache = newStats
172+
}
173+
157174
/**
158175
* Tries to re-cache all the cache entries that refer to the given plan.
159176
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ case class InMemoryRelation(
159159
output: Seq[Attribute],
160160
@transient cacheBuilder: CachedRDDBuilder,
161161
override val outputOrdering: Seq[SortOrder])(
162-
statsOfPlanToCache: Statistics)
162+
@volatile var statsOfPlanToCache: Statistics)
163163
extends logical.LeafNode with MultiInstanceRelation {
164164

165165
override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
@@ -181,7 +181,7 @@ case class InMemoryRelation(
181181
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
182182
statsOfPlanToCache
183183
} else {
184-
Statistics(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
184+
statsOfPlanToCache.copy(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
185185
}
186186
}
187187

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2223
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
2324
import org.apache.spark.sql.catalyst.expressions.Attribute
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -39,30 +40,41 @@ case class AnalyzeColumnCommand(
3940
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
4041
"mutually exclusive. Only one of them should be specified.")
4142
val sessionState = sparkSession.sessionState
42-
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
43-
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
44-
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
45-
if (tableMeta.tableType == CatalogTableType.VIEW) {
46-
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
47-
}
48-
val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
49-
val relation = sparkSession.table(tableIdent).logicalPlan
50-
val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)
5143

52-
// Compute stats for the computed list of columns.
53-
val (rowCount, newColStats) =
54-
CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze)
44+
tableIdent.database match {
45+
case Some(db) if db == sparkSession.sharedState.globalTempViewManager.database =>
46+
val plan = sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse {
47+
throw new NoSuchTableException(db = db, table = tableIdent.identifier)
48+
}
49+
analyzeColumnInTempView(plan, sparkSession)
50+
case Some(_) =>
51+
analyzeColumnInCatalog(sparkSession)
52+
case None =>
53+
sessionState.catalog.getTempView(tableIdent.identifier) match {
54+
case Some(tempView) => analyzeColumnInTempView(tempView, sparkSession)
55+
case _ => analyzeColumnInCatalog(sparkSession)
56+
}
57+
}
5558

56-
// We also update table-level stats in order to keep them consistent with column-level stats.
57-
val statistics = CatalogStatistics(
58-
sizeInBytes = sizeInBytes,
59-
rowCount = Some(rowCount),
60-
// Newly computed column stats should override the existing ones.
61-
colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
59+
Seq.empty[Row]
60+
}
6261

63-
sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
62+
private def analyzeColumnInCachedData(plan: LogicalPlan, sparkSession: SparkSession): Boolean = {
63+
val cacheManager = sparkSession.sharedState.cacheManager
64+
cacheManager.lookupCachedData(plan).map { cachedData =>
65+
val columnsToAnalyze = getColumnsToAnalyze(
66+
tableIdent, cachedData.plan, columnNames, allColumns)
67+
cacheManager.analyzeColumnCacheQuery(sparkSession, cachedData, columnsToAnalyze)
68+
cachedData
69+
}.isDefined
70+
}
6471

65-
Seq.empty[Row]
72+
private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = {
73+
if (!analyzeColumnInCachedData(plan, sparkSession)) {
74+
val catalog = sparkSession.sessionState.catalog
75+
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
76+
throw new NoSuchTableException(db = db, table = tableIdent.identifier)
77+
}
6678
}
6779

6880
private def getColumnsToAnalyze(
@@ -89,6 +101,40 @@ case class AnalyzeColumnCommand(
89101
columnsToAnalyze
90102
}
91103

104+
private def analyzeColumnInCatalog(sparkSession: SparkSession): Unit = {
105+
val sessionState = sparkSession.sessionState
106+
val tableMeta = sessionState.catalog.getTableMetadata(tableIdent)
107+
if (tableMeta.tableType == CatalogTableType.VIEW) {
108+
// Analyzes a catalog view if the view is cached
109+
val plan = sparkSession.table(tableIdent.quotedString).logicalPlan
110+
if (!analyzeColumnInCachedData(plan, sparkSession)) {
111+
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
112+
}
113+
} else {
114+
val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
115+
val relation = sparkSession.table(tableIdent).logicalPlan
116+
val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)
117+
118+
// Compute stats for the computed list of columns.
119+
val (rowCount, newColStats) =
120+
CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze)
121+
122+
val newColCatalogStats = newColStats.map {
123+
case (attr, columnStat) =>
124+
attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType)
125+
}
126+
127+
// We also update table-level stats in order to keep them consistent with column-level stats.
128+
val statistics = CatalogStatistics(
129+
sizeInBytes = sizeInBytes,
130+
rowCount = Some(rowCount),
131+
// Newly computed column stats should override the existing ones.
132+
colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats)
133+
134+
sessionState.catalog.alterTableStats(tableIdent, Some(statistics))
135+
}
136+
}
137+
92138
/** Returns true iff the we support gathering column statistics on column of the given type. */
93139
private def supportsType(dataType: DataType): Boolean = dataType match {
94140
case _: IntegralType => true

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ object CommandUtils extends Logging {
168168
private[sql] def computeColumnStats(
169169
sparkSession: SparkSession,
170170
relation: LogicalPlan,
171-
columns: Seq[Attribute]): (Long, Map[String, CatalogColumnStat]) = {
171+
columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
172172
val conf = sparkSession.sessionState.conf
173173

174174
// Collect statistics per column.
@@ -195,8 +195,8 @@ object CommandUtils extends Logging {
195195
val rowCount = statsRow.getLong(0)
196196
val columnStats = columns.zipWithIndex.map { case (attr, i) =>
197197
// according to `statExprs`, the stats struct always have 7 fields.
198-
(attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
199-
attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType))
198+
(attr, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
199+
attributePercentiles.get(attr)))
200200
}.toMap
201201
(rowCount, columnStats)
202202
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,4 +946,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
946946
// Clean-up
947947
df.unpersist()
948948
}
949+
950+
test("analyzes column statistics in cached query") {
951+
def query(): DataFrame = {
952+
spark.range(100)
953+
.selectExpr("id % 3 AS c0", "id % 5 AS c1", "2 AS c2")
954+
.groupBy("c0")
955+
.agg(avg("c1").as("v1"), sum("c2").as("v2"))
956+
}
957+
// First, checks if there is no column statistic in cached query
958+
val queryStats1 = query().cache.queryExecution.optimizedPlan.stats.attributeStats
959+
assert(queryStats1.map(_._1.name).isEmpty)
960+
961+
val cacheManager = spark.sharedState.cacheManager
962+
val cachedData = cacheManager.lookupCachedData(query().logicalPlan)
963+
assert(cachedData.isDefined)
964+
val queryAttrs = cachedData.get.plan.output
965+
assert(queryAttrs.size === 3)
966+
val (c0, v1, v2) = (queryAttrs(0), queryAttrs(1), queryAttrs(2))
967+
968+
// Analyzes one column in the query output
969+
cacheManager.analyzeColumnCacheQuery(spark, cachedData.get, v1 :: Nil)
970+
val queryStats2 = query().queryExecution.optimizedPlan.stats.attributeStats
971+
assert(queryStats2.map(_._1.name).toSet === Set("v1"))
972+
973+
// Analyzes two more columns
974+
cacheManager.analyzeColumnCacheQuery(spark, cachedData.get, c0 :: v2 :: Nil)
975+
val queryStats3 = query().queryExecution.optimizedPlan.stats.attributeStats
976+
assert(queryStats3.map(_._1.name).toSet === Set("c0", "v1", "v2"))
977+
}
949978
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
2424
import scala.collection.mutable
2525

2626
import org.apache.spark.sql.catalyst.TableIdentifier
27+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2728
import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
2829
import org.apache.spark.sql.catalyst.plans.logical._
2930
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -470,4 +471,77 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
470471
}
471472
}
472473
}
474+
475+
def getStatAttrNames(tableName: String): Set[String] = {
476+
val queryStats = spark.table(tableName).queryExecution.optimizedPlan.stats.attributeStats
477+
queryStats.map(_._1.name).toSet
478+
}
479+
480+
test("analyzes column statistics in cached query") {
481+
withTempView("cachedQuery") {
482+
sql(
483+
"""CACHE TABLE cachedQuery AS
484+
| SELECT c0, avg(c1) AS v1, avg(c2) AS v2
485+
| FROM (SELECT id % 3 AS c0, id % 5 AS c1, 2 AS c2 FROM range(1, 30))
486+
| GROUP BY c0
487+
""".stripMargin)
488+
489+
// Analyzes one column in the cached logical plan
490+
sql("ANALYZE TABLE cachedQuery COMPUTE STATISTICS FOR COLUMNS v1")
491+
assert(getStatAttrNames("cachedQuery") === Set("v1"))
492+
493+
// Analyzes two more columns
494+
sql("ANALYZE TABLE cachedQuery COMPUTE STATISTICS FOR COLUMNS c0, v2")
495+
assert(getStatAttrNames("cachedQuery") === Set("c0", "v1", "v2"))
496+
}
497+
}
498+
499+
test("analyzes column statistics in cached local temporary view") {
500+
withTempView("tempView") {
501+
// Analyzes in a temporary view
502+
sql("CREATE TEMPORARY VIEW tempView AS SELECT * FROM range(1, 30)")
503+
val errMsg = intercept[AnalysisException] {
504+
sql("ANALYZE TABLE tempView COMPUTE STATISTICS FOR COLUMNS id")
505+
}.getMessage
506+
assert(errMsg.contains(s"Table or view 'tempView' not found in database 'default'"))
507+
508+
// Cache the view then analyze it
509+
sql("CACHE TABLE tempView")
510+
assert(getStatAttrNames("tempView") !== Set("id"))
511+
sql("ANALYZE TABLE tempView COMPUTE STATISTICS FOR COLUMNS id")
512+
assert(getStatAttrNames("tempView") === Set("id"))
513+
}
514+
}
515+
516+
test("analyzes column statistics in cached global temporary view") {
517+
withGlobalTempView("gTempView") {
518+
val globalTempDB = spark.sharedState.globalTempViewManager.database
519+
val errMsg1 = intercept[NoSuchTableException] {
520+
sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
521+
}.getMessage
522+
assert(errMsg1.contains(s"Table or view 'gTempView' not found in database '$globalTempDB'"))
523+
// Analyzes in a global temporary view
524+
sql("CREATE GLOBAL TEMP VIEW gTempView AS SELECT * FROM range(1, 30)")
525+
val errMsg2 = intercept[AnalysisException] {
526+
sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
527+
}.getMessage
528+
assert(errMsg2.contains(s"Table or view 'gTempView' not found in database '$globalTempDB'"))
529+
530+
// Cache the view then analyze it
531+
sql(s"CACHE TABLE $globalTempDB.gTempView")
532+
assert(getStatAttrNames(s"$globalTempDB.gTempView") !== Set("id"))
533+
sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
534+
assert(getStatAttrNames(s"$globalTempDB.gTempView") === Set("id"))
535+
}
536+
}
537+
538+
test("analyzes column statistics in cached catalog view") {
539+
withTempDatabase { database =>
540+
sql(s"CREATE VIEW $database.v AS SELECT 1 c")
541+
sql(s"CACHE TABLE $database.v")
542+
assert(getStatAttrNames(s"$database.v") !== Set("id"))
543+
sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS FOR COLUMNS c")
544+
assert(getStatAttrNames(s"$database.v") !== Set("id"))
545+
}
546+
}
473547
}

0 commit comments

Comments
 (0)