Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit ee13f3e

Browse files
aokolnychyigatorsmile
authored andcommitted
[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable
## What changes were proposed in this pull request? Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below. ``` // step 1 spark.range(100).write.saveAsTable("tab1") // step 2 spark.sql("analyze table tab1 compute statistics") // step 3 spark.sql("explain cost select distinct * from tab1").show(false) // step 4 spark.range(100).write.mode("append").saveAsTable("tab1") // step 5 spark.sql("explain cost select distinct * from tab1").show(false) ``` After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache. By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem. ## How was this patch tested? Current and additional unit tests. Author: aokolnychyi <[email protected]> Closes apache#19252 from aokolnychyi/spark-21969.
1 parent d5aefa8 commit ee13f3e

File tree

5 files changed

+87
-7
lines changed

5 files changed

+87
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,8 @@ class SessionCatalog(
377377
requireDbExists(db)
378378
requireTableExists(tableIdentifier)
379379
externalCatalog.alterTableStats(db, table, newStats)
380+
// Invalidate the table relation cache
381+
refreshTable(identifier)
380382
}
381383

382384
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ case class AnalyzeColumnCommand(
5656

5757
sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
5858

59-
// Refresh the cached data source table in the catalog.
60-
sessionState.catalog.refreshTable(tableIdentWithDB)
61-
6259
Seq.empty[Row]
6360
}
6461

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ case class AnalyzeTableCommand(
4848
val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
4949
if (newStats.isDefined) {
5050
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
51-
// Refresh the cached data source table in the catalog.
52-
sessionState.catalog.refreshTable(tableIdentWithDB)
5351
}
5452

5553
Seq.empty[Row]

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
261261
assert(fetched1.get.sizeInBytes == 0)
262262
assert(fetched1.get.colStats.size == 2)
263263

264+
// table lookup will make the table cached
265+
spark.table(table)
266+
assert(isTableInCatalogCache(table))
267+
264268
// insert into command
265269
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
266270
if (autoUpdate) {
@@ -270,9 +274,78 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
270274
} else {
271275
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
272276
}
277+
278+
// check that tableRelationCache inside the catalog was invalidated after insert
279+
assert(!isTableInCatalogCache(table))
280+
}
281+
}
282+
}
283+
}
284+
285+
test("invalidation of tableRelationCache after inserts") {
286+
val table = "invalidate_catalog_cache_table"
287+
Seq(false, true).foreach { autoUpdate =>
288+
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
289+
withTable(table) {
290+
spark.range(100).write.saveAsTable(table)
291+
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
292+
spark.table(table)
293+
val initialSizeInBytes = getTableFromCatalogCache(table).stats.sizeInBytes
294+
spark.range(100).write.mode(SaveMode.Append).saveAsTable(table)
295+
spark.table(table)
296+
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * initialSizeInBytes)
297+
}
298+
}
299+
}
300+
}
301+
302+
test("invalidation of tableRelationCache after table truncation") {
303+
val table = "invalidate_catalog_cache_table"
304+
Seq(false, true).foreach { autoUpdate =>
305+
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
306+
withTable(table) {
307+
spark.range(100).write.saveAsTable(table)
308+
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
309+
spark.table(table)
310+
sql(s"TRUNCATE TABLE $table")
311+
spark.table(table)
312+
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
273313
}
274314
}
275315
}
276316
}
277317

318+
test("invalidation of tableRelationCache after alter table add partition") {
319+
val table = "invalidate_catalog_cache_table"
320+
Seq(false, true).foreach { autoUpdate =>
321+
withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
322+
withTempDir { dir =>
323+
withTable(table) {
324+
val path = dir.getCanonicalPath
325+
sql(s"""
326+
|CREATE TABLE $table (col1 int, col2 int)
327+
|USING PARQUET
328+
|PARTITIONED BY (col2)
329+
|LOCATION '${dir.toURI}'""".stripMargin)
330+
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
331+
spark.table(table)
332+
assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
333+
spark.catalog.recoverPartitions(table)
334+
val df = Seq((1, 2), (1, 2)).toDF("col2", "col1")
335+
df.write.parquet(s"$path/col2=1")
336+
sql(s"ALTER TABLE $table ADD PARTITION (col2=1) LOCATION '${dir.toURI}'")
337+
spark.table(table)
338+
val cachedTable = getTableFromCatalogCache(table)
339+
val cachedTableSizeInBytes = cachedTable.stats.sizeInBytes
340+
val defaultSizeInBytes = conf.defaultSizeInBytes
341+
if (autoUpdate) {
342+
assert(cachedTableSizeInBytes != defaultSizeInBytes && cachedTableSizeInBytes > 0)
343+
} else {
344+
assert(cachedTableSizeInBytes == defaultSizeInBytes)
345+
}
346+
}
347+
}
348+
}
349+
}
350+
}
278351
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import java.sql.{Date, Timestamp}
2323
import scala.collection.mutable
2424
import scala.util.Random
2525

26-
import org.apache.spark.sql.catalyst.TableIdentifier
26+
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
2727
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
28-
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
28+
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3030
import org.apache.spark.sql.execution.datasources.LogicalRelation
3131
import org.apache.spark.sql.internal.StaticSQLConf
@@ -85,6 +85,16 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
8585
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
8686
}
8787

88+
def getTableFromCatalogCache(tableName: String): LogicalPlan = {
89+
val catalog = spark.sessionState.catalog
90+
val qualifiedTableName = QualifiedTableName(catalog.getCurrentDatabase, tableName)
91+
catalog.getCachedTable(qualifiedTableName)
92+
}
93+
94+
def isTableInCatalogCache(tableName: String): Boolean = {
95+
getTableFromCatalogCache(tableName) != null
96+
}
97+
8898
def getCatalogStatistics(tableName: String): CatalogStatistics = {
8999
getCatalogTable(tableName).stats.get
90100
}

0 commit comments

Comments
 (0)