Skip to content

Commit 1a16d33

Browse files
gengliangwangRobert Kruszewski
authored andcommitted
[SPARK-19724][SQL] create a managed table with an existed default table should throw an exception
## What changes were proposed in this pull request? This PR is to finish apache#17272 This JIRA is a follow up work after SPARK-19583 As we discussed in that PR The following DDL for a managed table with an existed default location should throw an exception: CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ... CREATE TABLE ... (PARTITIONED BY ...) Currently there are some situations which are not consist with above logic: CREATE TABLE ... (PARTITIONED BY ...) succeed with an existed default location situation: for both hive/datasource(with HiveExternalCatalog/InMemoryCatalog) CREATE TABLE ... (PARTITIONED BY ...) AS SELECT ... situation: hive table succeed with an existed default location This PR is going to make above two situations consist with the logic that it should throw an exception with an existed default location. ## How was this patch tested? unit test added Author: Gengliang Wang <[email protected]> Closes apache#20886 from gengliangwang/pr-17272.
1 parent 9ff54b1 commit 1a16d33

File tree

6 files changed

+110
-4
lines changed

6 files changed

+110
-4
lines changed

docs/sql-programming-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18091809
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
18101810
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
18111811
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
1812+
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
18121813

18131814
## Upgrading From Spark SQL 2.2 to 2.3
18141815

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ class SessionCatalog(
289289
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
290290
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
291291
val table = formatTableName(tableDefinition.identifier.table)
292+
val tableIdentifier = TableIdentifier(table, Some(db))
292293
validateName(table)
293294

294295
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
@@ -298,15 +299,33 @@ class SessionCatalog(
298299
makeQualifiedPath(tableDefinition.storage.locationUri.get)
299300
tableDefinition.copy(
300301
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
301-
identifier = TableIdentifier(table, Some(db)))
302+
identifier = tableIdentifier)
302303
} else {
303-
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
304+
tableDefinition.copy(identifier = tableIdentifier)
304305
}
305306

306307
requireDbExists(db)
308+
if (!ignoreIfExists) {
309+
validateTableLocation(newTableDefinition)
310+
}
307311
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
308312
}
309313

314+
def validateTableLocation(table: CatalogTable): Unit = {
315+
// SPARK-19724: the default location of a managed table should be non-existent or empty.
316+
if (table.tableType == CatalogTableType.MANAGED &&
317+
!conf.allowCreatingManagedTableUsingNonemptyLocation) {
318+
val tableLocation =
319+
new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier)))
320+
val fs = tableLocation.getFileSystem(hadoopConf)
321+
322+
if (fs.exists(tableLocation) && fs.listStatus(tableLocation).nonEmpty) {
323+
throw new AnalysisException(s"Can not create the managed table('${table.identifier}')" +
324+
s". The associated location('${tableLocation.toString}') already exists.")
325+
}
326+
}
327+
}
328+
310329
/**
311330
* Alter the metadata of an existing metastore table identified by `tableDefinition`.
312331
*

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,14 @@ object SQLConf {
11631163
.booleanConf
11641164
.createWithDefault(false)
11651165

1166+
val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION =
1167+
buildConf("spark.sql.allowCreatingManagedTableUsingNonemptyLocation")
1168+
.internal()
1169+
.doc("When this option is set to true, creating managed tables with nonempty location " +
1170+
"is allowed. Otherwise, an analysis exception is thrown. ")
1171+
.booleanConf
1172+
.createWithDefault(false)
1173+
11661174
val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
11671175
buildConf("spark.sql.streaming.continuous.executorQueueSize")
11681176
.internal()
@@ -1595,6 +1603,9 @@ class SQLConf extends Serializable with Logging {
15951603

15961604
def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
15971605

1606+
def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
1607+
getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)
1608+
15981609
def partitionOverwriteMode: PartitionOverwriteMode.Value =
15991610
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
16001611

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ case class CreateDataSourceTableAsSelectCommand(
167167
sparkSession, table, table.storage.locationUri, child, SaveMode.Append, tableExists = true)
168168
} else {
169169
assert(table.schema.isEmpty)
170-
170+
sparkSession.sessionState.catalog.validateTableLocation(table)
171171
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
172172
Some(sessionState.catalog.defaultTablePath(table.identifier))
173173
} else {
@@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
181181
// the schema of df). It is important since the nullability may be changed by the relation
182182
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
183183
schema = result.schema)
184-
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
184+
// Table location is already validated. No need to check it again during table creation.
185+
sessionState.catalog.createTable(newTable, ignoreIfExists = true)
185186

186187
result match {
187188
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.io.File
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -26,6 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
2628
import org.apache.spark.sql.test.SharedSQLContext
2729
import org.apache.spark.sql.test.SQLTestData.ArrayData
2830
import org.apache.spark.sql.types._
31+
import org.apache.spark.util.Utils
2932

3033

3134
/**
@@ -242,6 +245,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
242245

243246
test("change stats after set location command") {
244247
val table = "change_stats_set_location_table"
248+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier(table)))
245249
Seq(false, true).foreach { autoUpdate =>
246250
withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) {
247251
withTable(table) {
@@ -269,6 +273,9 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
269273
assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
270274
} else {
271275
checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
276+
// SPARK-19724: clean up the previous table location.
277+
waitForTasksToFinish()
278+
Utils.deleteRecursively(tableLoc)
272279
}
273280
}
274281
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
180180

181181
private val escapedIdentifier = "`(.+)`".r
182182

183+
private def dataSource: String = {
184+
if (isUsingHiveMetastore) {
185+
"HIVE"
186+
} else {
187+
"PARQUET"
188+
}
189+
}
183190
protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
184191

185192
private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
@@ -365,6 +372,66 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
365372
}
366373
}
367374

375+
test("CTAS a managed table with the existing empty directory") {
376+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
377+
try {
378+
tableLoc.mkdir()
379+
withTable("tab1") {
380+
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
381+
checkAnswer(spark.table("tab1"), Row(1, "a"))
382+
}
383+
} finally {
384+
waitForTasksToFinish()
385+
Utils.deleteRecursively(tableLoc)
386+
}
387+
}
388+
389+
test("create a managed table with the existing empty directory") {
390+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
391+
try {
392+
tableLoc.mkdir()
393+
withTable("tab1") {
394+
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
395+
sql("INSERT INTO tab1 VALUES (1, 'a')")
396+
checkAnswer(spark.table("tab1"), Row(1, "a"))
397+
}
398+
} finally {
399+
waitForTasksToFinish()
400+
Utils.deleteRecursively(tableLoc)
401+
}
402+
}
403+
404+
test("create a managed table with the existing non-empty directory") {
405+
withTable("tab1") {
406+
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
407+
try {
408+
// create an empty hidden file
409+
tableLoc.mkdir()
410+
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
411+
hiddenGarbageFile.createNewFile()
412+
val exMsg = "Can not create the managed table('`tab1`'). The associated location"
413+
val exMsgWithDefaultDB =
414+
"Can not create the managed table('`default`.`tab1`'). The associated location"
415+
var ex = intercept[AnalysisException] {
416+
sql(s"CREATE TABLE tab1 USING ${dataSource} AS SELECT 1, 'a'")
417+
}.getMessage
418+
if (isUsingHiveMetastore) {
419+
assert(ex.contains(exMsgWithDefaultDB))
420+
} else {
421+
assert(ex.contains(exMsg))
422+
}
423+
424+
ex = intercept[AnalysisException] {
425+
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING ${dataSource}")
426+
}.getMessage
427+
assert(ex.contains(exMsgWithDefaultDB))
428+
} finally {
429+
waitForTasksToFinish()
430+
Utils.deleteRecursively(tableLoc)
431+
}
432+
}
433+
}
434+
368435
private def checkSchemaInCreatedDataSourceTable(
369436
path: File,
370437
userSpecifiedSchema: Option[String],

0 commit comments

Comments
 (0)