Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit d8e3a4a

Browse files
mbasmanovagatorsmile
authored andcommitted
[SPARK-21079][SQL] Calculate total size of a partition table as a sum of individual partitions
## What changes were proposed in this pull request? Storage URI of a partitioned table may or may not point to a directory under which individual partitions are stored. In fact, individual partitions may be located in totally unrelated directories. Before this change, ANALYZE TABLE table COMPUTE STATISTICS command calculated total size of a table by adding up sizes of files found under table's storage URI. This calculation could produce 0 if partitions are stored elsewhere. This change uses storage URIs of individual partitions to calculate the sizes of all partitions of a table and adds these up to produce the total size of a table. CC: wzhfy ## How was this patch tested? Added unit test. Ran ANALYZE TABLE xxx COMPUTE STATISTICS on a partitioned Hive table and verified that sizeInBytes is calculated correctly. Before this change, the size would be zero. Author: Masha Basmanova <[email protected]> Closes apache#18309 from mbasmanova/mbasmanova-analyze-part-table. (cherry picked from commit b449a1d) Signed-off-by: gatorsmile <[email protected]>
1 parent ad44ab5 commit d8e3a4a

File tree

2 files changed

+95
-6
lines changed

2 files changed

+95
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20+
import java.net.URI
21+
2022
import scala.util.control.NonFatal
2123

2224
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -81,6 +83,21 @@ case class AnalyzeTableCommand(
8183
object AnalyzeTableCommand extends Logging {
8284

8385
def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long = {
86+
if (catalogTable.partitionColumnNames.isEmpty) {
87+
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
88+
} else {
89+
// Calculate table size as a sum of the visible partitions. See SPARK-21079
90+
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
91+
partitions.map(p =>
92+
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
93+
).sum
94+
}
95+
}
96+
97+
private def calculateLocationSize(
98+
sessionState: SessionState,
99+
tableId: TableIdentifier,
100+
locationUri: Option[URI]): Long = {
84101
// This method is mainly based on
85102
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
86103
// in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -91,13 +108,13 @@ object AnalyzeTableCommand extends Logging {
91108
// countFileSize to count the table size.
92109
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
93110

94-
def calculateTableSize(fs: FileSystem, path: Path): Long = {
111+
def calculateLocationSize(fs: FileSystem, path: Path): Long = {
95112
val fileStatus = fs.getFileStatus(path)
96113
val size = if (fileStatus.isDirectory) {
97114
fs.listStatus(path)
98115
.map { status =>
99116
if (!status.getPath.getName.startsWith(stagingDir)) {
100-
calculateTableSize(fs, status.getPath)
117+
calculateLocationSize(fs, status.getPath)
101118
} else {
102119
0L
103120
}
@@ -109,16 +126,16 @@ object AnalyzeTableCommand extends Logging {
109126
size
110127
}
111128

112-
catalogTable.storage.locationUri.map { p =>
129+
locationUri.map { p =>
113130
val path = new Path(p)
114131
try {
115132
val fs = path.getFileSystem(sessionState.newHadoopConf())
116-
calculateTableSize(fs, path)
133+
calculateLocationSize(fs, path)
117134
} catch {
118135
case NonFatal(e) =>
119136
logWarning(
120-
s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
121-
s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
137+
s"Failed to get the size of table ${tableId.table} in the " +
138+
s"database ${tableId.database} because of ${e.toString}", e)
122139
0L
123140
}
124141
}.getOrElse(0L)

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.joins._
3030
import org.apache.spark.sql.hive.test.TestHiveSingleton
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.types._
33+
import org.apache.spark.util.Utils
3334

3435
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
3536

@@ -125,6 +126,77 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
125126
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
126127
}
127128

129+
test("SPARK-21079 - analyze table with location different than that of individual partitions") {
130+
def queryTotalSize(tableName: String): BigInt =
131+
spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
132+
133+
val tableName = "analyzeTable_part"
134+
withTable(tableName) {
135+
withTempPath { path =>
136+
sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
137+
138+
val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
139+
partitionDates.foreach { ds =>
140+
sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') SELECT * FROM src")
141+
}
142+
143+
sql(s"ALTER TABLE $tableName SET LOCATION '$path'")
144+
145+
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
146+
147+
assert(queryTotalSize(tableName) === BigInt(17436))
148+
}
149+
}
150+
}
151+
152+
test("SPARK-21079 - analyze partitioned table with only a subset of partitions visible") {
153+
def queryTotalSize(tableName: String): BigInt =
154+
spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
155+
156+
val sourceTableName = "analyzeTable_part"
157+
val tableName = "analyzeTable_part_vis"
158+
withTable(sourceTableName, tableName) {
159+
withTempPath { path =>
160+
// Create a table with 3 partitions all located under a single top-level directory 'path'
161+
sql(
162+
s"""
163+
|CREATE TABLE $sourceTableName (key STRING, value STRING)
164+
|PARTITIONED BY (ds STRING)
165+
|LOCATION '$path'
166+
""".stripMargin)
167+
168+
val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
169+
partitionDates.foreach { ds =>
170+
sql(
171+
s"""
172+
|INSERT INTO TABLE $sourceTableName PARTITION (ds='$ds')
173+
|SELECT * FROM src
174+
""".stripMargin)
175+
}
176+
177+
// Create another table referring to the same location
178+
sql(
179+
s"""
180+
|CREATE TABLE $tableName (key STRING, value STRING)
181+
|PARTITIONED BY (ds STRING)
182+
|LOCATION '$path'
183+
""".stripMargin)
184+
185+
// Register only one of the partitions found on disk
186+
val ds = partitionDates.head
187+
sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')").collect()
188+
189+
// Analyze original table - expect 3 partitions
190+
sql(s"ANALYZE TABLE $sourceTableName COMPUTE STATISTICS noscan")
191+
assert(queryTotalSize(sourceTableName) === BigInt(3 * 5812))
192+
193+
// Analyze partial-copy table - expect only 1 partition
194+
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
195+
assert(queryTotalSize(tableName) === BigInt(5812))
196+
}
197+
}
198+
}
199+
128200
test("analyzing views is not supported") {
129201
def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
130202
val err = intercept[AnalysisException] {

0 commit comments

Comments
 (0)