Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit cb2c842

Browse files
cloud-fanrxin
authored andcommitted
[SPARK-18856][SQL] non-empty partitioned table should not report zero size
## What changes were proposed in this pull request? In `DataSource`, if the table is not analyzed, we will use 0 as the default value for table size. This is dangerous, we may broadcast a large table and cause OOM. We should use `defaultSizeInBytes` instead. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes apache#16280 from cloud-fan/bug. (cherry picked from commit d6f11a1) Signed-off-by: Reynold Xin <[email protected]>
1 parent 0d94201 commit cb2c842

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,11 @@ case class DataSource(
388388

389389
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
390390
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
391+
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
391392
new CatalogFileIndex(
392393
sparkSession,
393394
catalogTable.get,
394-
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
395+
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
395396
} else {
396397
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
397398
}

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.util.Random
2626
import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.execution.datasources.LogicalRelation
29+
import org.apache.spark.sql.internal.StaticSQLConf
2930
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3031
import org.apache.spark.sql.test.SQLTestData.ArrayData
3132
import org.apache.spark.sql.types._
@@ -176,6 +177,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
176177
* when using the Hive external catalog) as well as in the sql/core module.
177178
*/
178179
abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
180+
import testImplicits._
179181

180182
private val dec1 = new java.math.BigDecimal("1.000000000000000000")
181183
private val dec2 = new java.math.BigDecimal("8.000000000000000000")
@@ -242,4 +244,20 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
242244
}
243245
}
244246
}
247+
248+
// This test will be run twice: with and without Hive support
249+
test("SPARK-18856: non-empty partitioned table should not report zero size") {
250+
withTable("ds_tbl", "hive_tbl") {
251+
spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
252+
val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.statistics
253+
assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
254+
255+
if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
256+
sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
257+
sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
258+
val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.statistics
259+
assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
260+
}
261+
}
262+
}
245263
}

0 commit comments

Comments
 (0)