Skip to content

Commit 239082d

Browse files
sujith71955dongjoon-hyun
authored andcommitted
[SPARK-27403][SQL] Fix updateTableStats to update table stats always with new stats or None
## What changes were proposed in this pull request? System shall update the table stats automatically if user set spark.sql.statistics.size.autoUpdate.enabled as true, currently this property is not having any significance even if it is enabled or disabled. This feature is similar to Hives auto-gather feature where statistics are automatically computed by default if this feature is enabled. Reference: https://cwiki.apache.org/confluence/display/Hive/StatsDev As part of fix , autoSizeUpdateEnabled validation is been done initially so that system will calculate the table size for the user automatically and record it in metastore as per user expectation. ## How was this patch tested? UT is written and manually verified in cluster. Tested with unit tests + some internal tests on real cluster. Before fix: ![image](https://user-images.githubusercontent.com/12999161/55688682-cd8d4780-5998-11e9-85da-e1a4e34419f6.png) After fix ![image](https://user-images.githubusercontent.com/12999161/55688654-7d15ea00-5998-11e9-973f-1f4cee27018f.png) Closes apache#24315 from sujith71955/master_autoupdate. Authored-by: s71955 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d33ae2e commit 239082d

File tree

2 files changed

+28
-10
lines changed

2 files changed

+28
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@ object CommandUtils extends Logging {
4242

4343
/** Change statistics after changing data by commands. */
4444
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
45-
if (table.stats.nonEmpty) {
46-
val catalog = sparkSession.sessionState.catalog
47-
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
48-
val newTable = catalog.getTableMetadata(table.identifier)
49-
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
50-
val newStats = CatalogStatistics(sizeInBytes = newSize)
51-
catalog.alterTableStats(table.identifier, Some(newStats))
52-
} else {
53-
catalog.alterTableStats(table.identifier, None)
54-
}
45+
val catalog = sparkSession.sessionState.catalog
46+
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
47+
val newTable = catalog.getTableMetadata(table.identifier)
48+
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
49+
val newStats = CatalogStatistics(sizeInBytes = newSize)
50+
catalog.alterTableStats(table.identifier, Some(newStats))
51+
} else if (table.stats.nonEmpty) {
52+
catalog.alterTableStats(table.identifier, None)
5553
}
5654
}
5755

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,26 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
337337
}
338338
}
339339

340+
test("auto gather stats after insert command") {
341+
val table = "change_stats_insert_datasource_table"
342+
Seq(false, true).foreach { autoUpdate =>
343+
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
344+
withTable(table) {
345+
sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
346+
// insert into command
347+
sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
348+
val stats = getCatalogTable(table).stats
349+
if (autoUpdate) {
350+
assert(stats.isDefined)
351+
assert(stats.get.sizeInBytes >= 0)
352+
} else {
353+
assert(stats.isEmpty)
354+
}
355+
}
356+
}
357+
}
358+
}
359+
340360
test("invalidation of tableRelationCache after inserts") {
341361
val table = "invalidate_catalog_cache_table"
342362
Seq(false, true).foreach { autoUpdate =>

0 commit comments

Comments
 (0)