Skip to content

Commit da35574

Browse files
wzhfygatorsmile
authored andcommitted
[SPARK-22515][SQL] Estimation relation size based on numRows * rowSize
## What changes were proposed in this pull request? Currently, relation size is computed as the sum of file size, which is error-prone because storage format like parquet may have a much smaller file size compared to in-memory size. When we choose broadcast join based on file size, there's a risk of OOM. But if the number of rows is available in statistics, we can get a better estimation by `numRows * rowSize`, which helps to alleviate this problem. ## How was this patch tested? Added a new test case for data source table and hive table. Author: Zhenhua Wang <[email protected]> Author: Zhenhua Wang <[email protected]> Closes #19743 from wzhfy/better_leaf_size.
1 parent b70e483 commit da35574

File tree

4 files changed

+38
-9
lines changed

4 files changed

+38
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden
2727
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2828
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
2929
import org.apache.spark.sql.catalyst.plans.logical._
30+
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
3031
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3132
import org.apache.spark.sql.catalyst.util.quoteIdentifier
3233
import org.apache.spark.sql.types.StructType
@@ -367,13 +368,14 @@ case class CatalogStatistics(
367368
* on column names.
368369
*/
369370
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
370-
if (cboEnabled) {
371-
val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
372-
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
373-
attributeStats = AttributeMap(attrStats))
371+
if (cboEnabled && rowCount.isDefined) {
372+
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
373+
// Estimate size as number of rows * row size.
374+
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
375+
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
374376
} else {
375-
// When CBO is disabled, we apply the size-only estimation strategy, so there's no need to
376-
// propagate other statistics from catalog to the plan.
377+
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only
378+
// estimation strategy and only propagate sizeInBytes in statistics.
377379
Statistics(sizeInBytes = sizeInBytes)
378380
}
379381
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
1919

2020
import org.apache.spark.sql.catalyst.expressions.AttributeMap
2121
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
22-
import org.apache.spark.sql.catalyst.plans.logical
2322
import org.apache.spark.sql.catalyst.plans.logical._
2423

2524
/**

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
224224

225225
// Check relation statistics
226226
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
227-
assert(relation.stats.sizeInBytes == 0)
227+
assert(relation.stats.sizeInBytes == 1)
228228
assert(relation.stats.rowCount == Some(0))
229229
assert(relation.stats.attributeStats.size == 1)
230230
val (attribute, colStat) = relation.stats.attributeStats.head

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,35 @@ import org.apache.spark.sql.types._
4141

4242

4343
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
44-
test("Hive serde tables should fallback to HDFS for size estimation") {
44+
45+
test("size estimation for relations is based on row size * number of rows") {
46+
val dsTbl = "rel_est_ds_table"
47+
val hiveTbl = "rel_est_hive_table"
48+
withTable(dsTbl, hiveTbl) {
49+
spark.range(1000L).write.format("parquet").saveAsTable(dsTbl)
50+
spark.range(1000L).write.format("hive").saveAsTable(hiveTbl)
51+
52+
Seq(dsTbl, hiveTbl).foreach { tbl =>
53+
sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")
54+
val catalogStats = getCatalogStatistics(tbl)
55+
withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
56+
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
57+
assert(relationStats.sizeInBytes == catalogStats.sizeInBytes)
58+
assert(relationStats.rowCount.isEmpty)
59+
}
60+
spark.sessionState.catalog.refreshTable(TableIdentifier(tbl))
61+
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
62+
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
63+
// Due to compression in parquet files, in this test, file size is smaller than
64+
// in-memory size.
65+
assert(catalogStats.sizeInBytes < relationStats.sizeInBytes)
66+
assert(catalogStats.rowCount == relationStats.rowCount)
67+
}
68+
}
69+
}
70+
}
71+
72+
test("Hive serde tables should fallback to HDFS for size estimation") {
4573
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
4674
withTable("csv_table") {
4775
withTempDir { tempDir =>

0 commit comments

Comments
 (0)